* UN-2470 [MISC] Remove Django dependency from Celery workers This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix pre-commit issues: file permissions and ruff errors Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactoreed, Dockerfiles,fixes * flexibility on celery run commands * added debug logs * handled filehistory for API * cleanup * cleanup * cloud plugin structure * minor changes in import plugin * added notification and logger workers under new worker module * add docker compatibility for new workers * handled docker issues * log consumer worker fixes * added scheduler worker * minor env changes * cleanup the logs * minor changes in logs * resolved scheduler worker issues * cleanup and refactor * ensuring backward compatibbility to existing wokers * added configuration internal apis and cache utils * optimization * Fix API client singleton pattern to share HTTP sessions - Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation * cleanup and structuring * cleanup in callback * file system connectors issue * celery env values changes * optional gossip * variables for sync, mingle and gossip * Fix for file type check * Task pipeline issue resolving * api deployement failed response handled * Task pipline fixes * updated file history cleanup with active file execution * pipline status update and workflow ui page execution * cleanup and resolvinf conflicts * remove unstract-core from conenctoprs * Commit uv.lock changes * uv locks updates * resolve migration issues * defer connector-metadtda * Fix connector migration for production scale - Add encryption key handling with defer() to prevent decryption failures - Add final cleanup step to fix duplicate connector names - Optimize for large datasets with batch processing and bulk operations - Ensure unique constraint in migration 0004 can be created successfully 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * hitl fixes * minor fixes on hitl * api_hub related changes * dockerfile fixes * api client cache fixes with actual response class * fix: tags and llm_profile_id * optimized clear cache * cleanup * enhanced logs * added more handling on is file dir and added loggers * cleanup the runplatform script * internal apis are excempting from csrf * sonal cloud issues * sona-cloud issues * resolving sonar cloud issues * resolving sonar cloud issues * Delta: added Batch size fix in workers * comments addressed * celery configurational changes for new workers * fiixes in callback regaurding the pipline type check * change internal url registry logic * gitignore changes * gitignore changes * addressng pr cmmnets and cleanup the codes * adding missed profiles for v2 * sonal cloud blocker issues resolved * imlement otel * Commit uv.lock changes * handle execution time and some cleanup * adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544 * scheduler backward compatibitlity * replace user_data with custom_data * Commit uv.lock changes * celery worker command issue resolved * enhance package imports in connectors by changing to lazy imports * Update runner.py by removing the otel from it Update runner.py by removing the otel from it Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> * added delta changes * handle erro to destination db * resolve tool instances id validation and hitl queu name in API * handled direct execution from workflow page to worker and logs * handle cost logs * Update health.py Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * minor log changes * introducing log consumer scheduler to bulk create, and socket .emit from worker for ws * Commit uv.lock changes * time limit or timeout celery config cleanup * implemented redis client class in worker * pipline status enum mismatch * notification worker fixes * resolve uv lock conflicts * workflow log fixes * ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys * default TTL changed for unified logs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
7.7 KiB
7.7 KiB
Unstract Workers Architecture
Overview
This package implements lightweight Celery workers that communicate with the Unstract Django backend via internal APIs, eliminating Django ORM dependencies and enabling independent deployment and scaling.
Architecture Decision
✅ CHOSEN: Clean Microservices Architecture
unstract/
├── workers/ # Independent worker package
│ ├── shared/ # Common utilities (API client, logging, health)
│ ├── api_deployment/ # API deployment worker
│ ├── general/ # General worker (webhooks, general workflows)
│ ├── file_processing/ # File processing worker
│ ├── callback/ # Result aggregation and finalization worker
│ ├── docker/ # Docker configurations
│ ├── scripts/ # Deployment and management scripts
│ └── pyproject.toml # Independent package definition
└── backend/ # Django backend with internal APIs
❌ REJECTED: Backend-Coupled Architecture
backend/
└── workers/ # Workers inside Django backend
├── shared/ # Would still have Django coupling risk
└── ... # Tight coupling to backend deployment
Benefits of Clean Architecture
🎯 Complete Separation
- Zero Django Dependencies: Workers don't import anything from Django
- Independent Packaging: Own
pyproject.tomlwith minimal dependencies - Microservices Alignment: Follows existing pattern (
platform-service/,prompt-service/)
🚀 Deployment Flexibility
- Independent Versioning: Workers can be versioned separately from backend
- Separate Scaling: Scale workers independently based on workload
- Different Infrastructure: Workers can run on different machines/containers
- Fault Isolation: Worker failures don't affect Django backend
📦 Resource Efficiency
- Minimal Dependencies: Only essential packages for task processing
- Smaller Images: Docker images without Django bloat
- Faster Startup: No Django initialization overhead
- Lower Memory: ~50MB vs ~200MB for Django workers
Communication Pattern
┌─────────────────┐ HTTP API ┌──────────────────┐ ORM/DB ┌──────────────┐
│ Workers │───────────────→│ Django Backend │─────────────→│ PostgreSQL │
│ (Lightweight) │ │ (Internal APIs) │ │ Database │
└─────────────────┘ └──────────────────┘ └──────────────┘
│ │
├── Task Coordination ├── Business Logic
├── Error Handling ├── Tool Execution
├── Result Aggregation ├── Database Operations
└── Health Monitoring └── Complex Processing
Worker Responsibilities
Lightweight Workers Handle:
- Task orchestration and coordination
- HTTP communication with Django backend
- Error handling and retry logic
- Result aggregation and status tracking
- Health monitoring and metrics collection
Django Backend Handles:
- Complex business logic (tool execution, file processing pipeline)
- Database operations and ORM queries
- Authentication and authorization
- Multi-tenant organization scoping
- Integration with external services
Package Structure
unstract/workers/
├── __init__.py # Package interface
├── pyproject.toml # Package definition and dependencies
├── README.md # Documentation
├── ARCHITECTURE.md # This file
├── uv.lock # Dependency lock file
├── shared/ # Shared infrastructure
│ ├── __init__.py
│ ├── api_client.py # Internal API HTTP client
│ ├── config.py # Configuration management
│ ├── health.py # Health checking system
│ ├── logging_utils.py # Structured logging
│ └── retry_utils.py # Circuit breakers and retry logic
├── api_deployment/ # API deployment worker
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ └── tasks.py # async_execute_bin_api task
├── general/ # General tasks worker
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ └── tasks.py # webhooks, general async_execute_bin
├── file_processing/ # File processing worker
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ └── tasks.py # process_file_batch tasks
├── callback/ # Result aggregation worker
│ ├── __init__.py
│ ├── worker.py # Celery app configuration
│ └── tasks.py # process_batch_callback tasks
├── docker/ # Container configurations
│ ├── api_deployment.Dockerfile
│ ├── general.Dockerfile
│ ├── file_processing.Dockerfile
│ ├── callback.Dockerfile
│ └── docker-compose.workers.yml
├── scripts/ # Management scripts
│ ├── deploy.sh # Deployment automation
│ └── fix_imports.py # Import path utilities
├── monitoring/ # Monitoring and metrics
│ └── prometheus_metrics.py # Prometheus metrics collection
└── config/ # Configuration
└── queue_routing.py # Queue routing and scaling rules
Development Workflow
Setup
cd unstract/workers
uv sync # Install dependencies
Local Development
# Run individual worker
cd api_deployment
python -m worker
# Run with specific queue
celery -A worker worker --loglevel=debug -Q celery_api_deployments
Testing
pytest # Run tests
pytest --cov # Run with coverage
Deployment
# Deploy all workers
./scripts/deploy.sh --environment production --action deploy
# Deploy specific worker type
./scripts/deploy.sh --workers file --action deploy
Migration Path
- ✅ Phase 1: Created lightweight workers alongside existing heavy workers
- ✅ Phase 2: Implemented file processing and callback workers
- ✅ Phase 3: Moved to clean microservices architecture
- 🔮 Future: Gradual traffic migration and deprecation of heavy workers
Scalability Benefits
Independent Scaling
- Scale each worker type based on specific workload patterns
- Different concurrency settings per worker type
- Auto-scaling rules based on queue depth
Resource Optimization
- Deploy file processing workers on high-memory nodes
- Deploy callback workers on standard nodes
- Deploy API workers with high network bandwidth
Fault Tolerance
- Worker failures isolated from Django backend
- Circuit breaker patterns prevent cascade failures
- Independent health monitoring and recovery
This architecture provides the foundation for a highly scalable, maintainable, and efficient distributed task processing system for the Unstract platform.