Files
Chandrasekharan M 7d1a01b605 [MISC] Move Redis clients to unstract-core for better code organization (#1613)
- Moved redis_client.py and redis_queue_client.py from workers/shared/cache/ to unstract/core/src/unstract/core/cache/
- Created new cache module in unstract-core with __init__.py exports
- Updated all import statements across workers to use unstract.core.cache instead of shared.cache
- Updated imports in log_consumer tasks and cache_backends to use new location
- Maintains backward compatibility by keeping same API and functionality
2025-10-27 10:52:10 +05:30
..

Unstract Workers

Lightweight Celery workers for distributed task processing in the Unstract platform.

Overview

Independent, microservices-based workers that communicate with the Unstract backend via internal APIs, providing:

  • 75% memory reduction compared to Django-based workers
  • Independent scaling and deployment
  • Better fault isolation and resilience
  • Simplified dependencies without Django ORM

Workers

Worker Queue Purpose
api-deployment celery_api_deployments API workflow deployments and executions
general celery General tasks, webhooks, standard workflows
file-processing file_processing File processing through workflow tools
callback file_processing_callback Result aggregation and workflow finalization

Quick Start

1. Install Dependencies

cd /home/ali/projects/unstract/workers
uv sync

# Install all workers
for dir in api-deployment general file-processing callback; do
    cd $dir && uv sync && cd ..
done

2. Configuration

Quick Setup (Recommended):

# Copy environment file
cp sample.env .env

# Automatic configuration for your development setup
./setup-dev-env.sh

Manual Setup:

# Edit .env based on your setup:
# 1. Full Docker:     DJANGO_APP_BACKEND_URL=http://unstract-backend:8000
# 2. Backend on host:  DJANGO_APP_BACKEND_URL=http://172.17.0.1:8000 (Linux)
#                      DJANGO_APP_BACKEND_URL=http://host.docker.internal:8000 (Mac/Win)
# 3. Local dev:       DJANGO_APP_BACKEND_URL=http://localhost:8000

# Or use environment variables
export INTERNAL_API_BASE_URL="http://localhost:8000/internal"
export INTERNAL_SERVICE_API_KEY="internal-celery-worker-key-123"
export CELERY_BROKER_URL="redis://localhost:6379/0"

Test Configuration:

python test_backend_connection.py  # Verify backend connectivity

3. Run Workers

# Quick start - run all workers
./run-worker.sh all

# Or run individual workers
./run-worker.sh api           # API deployment worker
./run-worker.sh general       # General worker
./run-worker.sh file          # File processing worker
./run-worker.sh callback      # Callback worker

# With options
./run-worker.sh -l DEBUG api  # Debug logging
./run-worker.sh -d general    # Background mode
./run-worker.sh -s            # Show status
./run-worker.sh -k            # Kill all

Health Monitoring

# Check worker health
curl http://localhost:8080/health  # API deployment
curl http://localhost:8081/health  # General
curl http://localhost:8082/health  # File processing
curl http://localhost:8083/health  # Callback

Architecture

See ARCHITECTURE.md for detailed architecture decisions and design patterns.

Operations

For deployment, monitoring, and troubleshooting, see OPERATIONS.md.

Development

Project Structure

workers/
├── shared/              # Common utilities and API clients
│   ├── api_client.py   # Main internal API client
│   ├── clients/        # Modular API clients
│   ├── config.py       # Configuration management
│   └── utils/          # Helper utilities
├── api-deployment/     # API workflow deployment worker
├── general/           # General purpose worker
├── file-processing/   # File processing worker
└── callback/          # Callback aggregation worker

Adding New Workers

  1. Create worker directory with pyproject.toml
  2. Implement worker.py and tasks.py
  3. Add to run-worker.sh script
  4. Create deployment configurations

Testing

# Run tests
cd /home/ali/projects/unstract/workers
uv run pytest

# Test individual worker
cd api-deployment
uv run pytest tests/

Docker Deployment

# Build all workers
VERSION=local docker compose -f docker-compose.build.yaml build \
    worker-api-deployment worker-callback worker-file-processing worker-general

# Run workers
VERSION=local docker compose --profile workers-new up -d

# Check status
docker compose --profile workers-new ps

# View logs
docker compose --profile workers-new logs -f

Contributing

  1. Follow the architecture principles in ARCHITECTURE_PRINCIPLES.md
  2. Ensure backward compatibility with existing workers
  3. Add tests for new functionality
  4. Update documentation as needed

License

AGPL-3.0 - See LICENSE file for details