Files
unstract/workers/README.md
ali 0c5997f9a9 UN-2470 [FEAT] Remove Django dependency from Celery workers with internal APIs (#1494)
* UN-2470 [MISC] Remove Django dependency from Celery workers

This commit introduces a new worker architecture that decouples
Celery workers from Django where possible, enabling support for
gevent/eventlet pool types and reducing worker startup overhead.

Key changes:
- Created separate worker modules (api-deployment, callback, file_processing, general)
- Added internal API endpoints for worker communication
- Implemented Django-free task execution where appropriate
- Added shared utilities and client facades
- Updated container configurations for new worker architecture

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix pre-commit issues: file permissions and ruff errors

Setup the docker for new workers

- Add executable permissions to worker entrypoint files
- Fix import order in namespace package __init__.py
- Remove unused variable api_status in general worker
- Address ruff E402 and F841 errors

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactoreed, Dockerfiles,fixes

* flexibility on celery run commands

* added debug logs

* handled filehistory for API

* cleanup

* cleanup

* cloud plugin structure

* minor changes in import plugin

* added notification and logger workers under new worker module

* add docker compatibility for new workers

* handled docker issues

* log consumer worker fixes

* added scheduler worker

* minor env changes

* cleanup the logs

* minor changes in logs

* resolved scheduler worker issues

* cleanup and refactor

* ensuring backward compatibbility to existing wokers

* added configuration internal apis and cache utils

* optimization

* Fix API client singleton pattern to share HTTP sessions

- Fix flawed singleton implementation that was trying to share BaseAPIClient instances
- Now properly shares HTTP sessions between specialized clients
- Eliminates 6x BaseAPIClient initialization by reusing the same underlying session
- Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session)
- Added debug logging to verify singleton pattern activation

* cleanup and structuring

* cleanup in callback

* file system connectors  issue

* celery env values changes

* optional gossip

* variables for sync, mingle and gossip

* Fix for file type check

* Task pipeline issue resolving

* api deployement failed response handled

* Task pipline fixes

* updated file history cleanup with active file execution

* pipline status update and workflow ui page execution

* cleanup and resolvinf conflicts

* remove unstract-core from conenctoprs

* Commit uv.lock changes

* uv locks updates

* resolve migration issues

* defer connector-metadtda

* Fix connector migration for production scale

- Add encryption key handling with defer() to prevent decryption failures
- Add final cleanup step to fix duplicate connector names
- Optimize for large datasets with batch processing and bulk operations
- Ensure unique constraint in migration 0004 can be created successfully

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* hitl fixes

* minor fixes on hitl

* api_hub related changes

* dockerfile fixes

* api client cache fixes with actual response class

* fix: tags and llm_profile_id

* optimized clear cache

* cleanup

* enhanced logs

* added more handling on is file dir and added loggers

* cleanup the runplatform script

* internal apis are excempting from csrf

* sonal cloud issues

* sona-cloud issues

* resolving sonar cloud issues

* resolving sonar cloud issues

* Delta: added Batch size fix in workers

* comments addressed

* celery configurational changes for new workers

* fiixes in callback regaurding the pipline type check

* change internal url registry logic

* gitignore changes

* gitignore changes

* addressng pr cmmnets and cleanup the codes

* adding missed profiles for v2

* sonal cloud blocker issues resolved

* imlement otel

* Commit uv.lock changes

* handle execution time and some cleanup

* adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544

* scheduler backward compatibitlity

* replace user_data with custom_data

* Commit uv.lock changes

* celery worker command issue resolved

* enhance package imports in connectors by changing to lazy imports

* Update runner.py by removing the otel from it

Update runner.py by removing the otel from it

Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>

* added delta changes

* handle erro to destination db

* resolve tool instances id validation and hitl queu name in API

* handled direct execution from workflow page to worker and logs

* handle cost logs

* Update health.py

Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor log changes

* introducing log consumer scheduler to bulk create, and socket .emit from worker for ws

* Commit uv.lock changes

* time limit or timeout celery config cleanup

* implemented redis client class in worker

* pipline status enum mismatch

* notification worker fixes

* resolve uv lock conflicts

* workflow log fixes

* ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys

* default TTL changed for unified logs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2025-10-03 11:24:07 +05:30

4.4 KiB

Unstract Workers

Lightweight Celery workers for distributed task processing in the Unstract platform.

Overview

Independent, microservices-based workers that communicate with the Unstract backend via internal APIs, providing:

  • 75% memory reduction compared to Django-based workers
  • Independent scaling and deployment
  • Better fault isolation and resilience
  • Simplified dependencies without Django ORM

Workers

Worker Queue Purpose
api-deployment celery_api_deployments API workflow deployments and executions
general celery General tasks, webhooks, standard workflows
file-processing file_processing File processing through workflow tools
callback file_processing_callback Result aggregation and workflow finalization

Quick Start

1. Install Dependencies

cd /home/ali/projects/unstract/workers
uv sync

# Install all workers
for dir in api-deployment general file-processing callback; do
    cd $dir && uv sync && cd ..
done

2. Configuration

Quick Setup (Recommended):

# Copy environment file
cp sample.env .env

# Automatic configuration for your development setup
./setup-dev-env.sh

Manual Setup:

# Edit .env based on your setup:
# 1. Full Docker:     DJANGO_APP_BACKEND_URL=http://unstract-backend:8000
# 2. Backend on host:  DJANGO_APP_BACKEND_URL=http://172.17.0.1:8000 (Linux)
#                      DJANGO_APP_BACKEND_URL=http://host.docker.internal:8000 (Mac/Win)
# 3. Local dev:       DJANGO_APP_BACKEND_URL=http://localhost:8000

# Or use environment variables
export INTERNAL_API_BASE_URL="http://localhost:8000/internal"
export INTERNAL_SERVICE_API_KEY="internal-celery-worker-key-123"
export CELERY_BROKER_URL="redis://localhost:6379/0"

Test Configuration:

python test_backend_connection.py  # Verify backend connectivity

3. Run Workers

# Quick start - run all workers
./run-worker.sh all

# Or run individual workers
./run-worker.sh api           # API deployment worker
./run-worker.sh general       # General worker
./run-worker.sh file          # File processing worker
./run-worker.sh callback      # Callback worker

# With options
./run-worker.sh -l DEBUG api  # Debug logging
./run-worker.sh -d general    # Background mode
./run-worker.sh -s            # Show status
./run-worker.sh -k            # Kill all

Health Monitoring

# Check worker health
curl http://localhost:8080/health  # API deployment
curl http://localhost:8081/health  # General
curl http://localhost:8082/health  # File processing
curl http://localhost:8083/health  # Callback

Architecture

See ARCHITECTURE.md for detailed architecture decisions and design patterns.

Operations

For deployment, monitoring, and troubleshooting, see OPERATIONS.md.

Development

Project Structure

workers/
├── shared/              # Common utilities and API clients
│   ├── api_client.py   # Main internal API client
│   ├── clients/        # Modular API clients
│   ├── config.py       # Configuration management
│   └── utils/          # Helper utilities
├── api-deployment/     # API workflow deployment worker
├── general/           # General purpose worker
├── file-processing/   # File processing worker
└── callback/          # Callback aggregation worker

Adding New Workers

  1. Create worker directory with pyproject.toml
  2. Implement worker.py and tasks.py
  3. Add to run-worker.sh script
  4. Create deployment configurations

Testing

# Run tests
cd /home/ali/projects/unstract/workers
uv run pytest

# Test individual worker
cd api-deployment
uv run pytest tests/

Docker Deployment

# Build all workers
VERSION=local docker compose -f docker-compose.build.yaml build \
    worker-api-deployment worker-callback worker-file-processing worker-general

# Run workers
VERSION=local docker compose --profile workers-new up -d

# Check status
docker compose --profile workers-new ps

# View logs
docker compose --profile workers-new logs -f

Contributing

  1. Follow the architecture principles in ARCHITECTURE_PRINCIPLES.md
  2. Ensure backward compatibility with existing workers
  3. Add tests for new functionality
  4. Update documentation as needed

License

AGPL-3.0 - See LICENSE file for details