Files
unstract/workers
ali 77e14eadf6 UN-2865 [FIX] Remove premature COMPLETED status update in general worker after async file orchestration (#1574)
UN-2865 [FIX] Remove premature COMPLETED status update in general worker

This fix addresses a critical bug where the general worker incorrectly
marked workflow executions as COMPLETED immediately after orchestrating
async file processing, while files were still being processed.

Changes:
- Removed WorkflowExecutionStatusUpdate that set status to COMPLETED
- Removed incorrect execution_time update (only orchestration time)
- Removed incorrect total_files calculation
- Updated comments to clarify orchestration vs execution completion
- Updated logging to reflect async orchestration behavior

The callback worker now properly handles setting the final COMPLETED or
ERROR status after all files finish processing, matching the pattern
used by the API deployment worker.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-authored-by: Claude <noreply@anthropic.com>
2025-10-10 10:25:31 +05:30
..

Unstract Workers

Lightweight Celery workers for distributed task processing in the Unstract platform.

Overview

Independent, microservices-based workers that communicate with the Unstract backend via internal APIs, providing:

  • 75% memory reduction compared to Django-based workers
  • Independent scaling and deployment
  • Better fault isolation and resilience
  • Simplified dependencies without Django ORM

Workers

Worker Queue Purpose
api-deployment celery_api_deployments API workflow deployments and executions
general celery General tasks, webhooks, standard workflows
file-processing file_processing File processing through workflow tools
callback file_processing_callback Result aggregation and workflow finalization

Quick Start

1. Install Dependencies

cd /home/ali/projects/unstract/workers
uv sync

# Install all workers
for dir in api-deployment general file-processing callback; do
    cd $dir && uv sync && cd ..
done

2. Configuration

Quick Setup (Recommended):

# Copy environment file
cp sample.env .env

# Automatic configuration for your development setup
./setup-dev-env.sh

Manual Setup:

# Edit .env based on your setup:
# 1. Full Docker:     DJANGO_APP_BACKEND_URL=http://unstract-backend:8000
# 2. Backend on host:  DJANGO_APP_BACKEND_URL=http://172.17.0.1:8000 (Linux)
#                      DJANGO_APP_BACKEND_URL=http://host.docker.internal:8000 (Mac/Win)
# 3. Local dev:       DJANGO_APP_BACKEND_URL=http://localhost:8000

# Or use environment variables
export INTERNAL_API_BASE_URL="http://localhost:8000/internal"
export INTERNAL_SERVICE_API_KEY="internal-celery-worker-key-123"
export CELERY_BROKER_URL="redis://localhost:6379/0"

Test Configuration:

python test_backend_connection.py  # Verify backend connectivity

3. Run Workers

# Quick start - run all workers
./run-worker.sh all

# Or run individual workers
./run-worker.sh api           # API deployment worker
./run-worker.sh general       # General worker
./run-worker.sh file          # File processing worker
./run-worker.sh callback      # Callback worker

# With options
./run-worker.sh -l DEBUG api  # Debug logging
./run-worker.sh -d general    # Background mode
./run-worker.sh -s            # Show status
./run-worker.sh -k            # Kill all

Health Monitoring

# Check worker health
curl http://localhost:8080/health  # API deployment
curl http://localhost:8081/health  # General
curl http://localhost:8082/health  # File processing
curl http://localhost:8083/health  # Callback

Architecture

See ARCHITECTURE.md for detailed architecture decisions and design patterns.

Operations

For deployment, monitoring, and troubleshooting, see OPERATIONS.md.

Development

Project Structure

workers/
├── shared/              # Common utilities and API clients
│   ├── api_client.py   # Main internal API client
│   ├── clients/        # Modular API clients
│   ├── config.py       # Configuration management
│   └── utils/          # Helper utilities
├── api-deployment/     # API workflow deployment worker
├── general/           # General purpose worker
├── file-processing/   # File processing worker
└── callback/          # Callback aggregation worker

Adding New Workers

  1. Create worker directory with pyproject.toml
  2. Implement worker.py and tasks.py
  3. Add to run-worker.sh script
  4. Create deployment configurations

Testing

# Run tests
cd /home/ali/projects/unstract/workers
uv run pytest

# Test individual worker
cd api-deployment
uv run pytest tests/

Docker Deployment

# Build all workers
VERSION=local docker compose -f docker-compose.build.yaml build \
    worker-api-deployment worker-callback worker-file-processing worker-general

# Run workers
VERSION=local docker compose --profile workers-new up -d

# Check status
docker compose --profile workers-new ps

# View logs
docker compose --profile workers-new logs -f

Contributing

  1. Follow the architecture principles in ARCHITECTURE_PRINCIPLES.md
  2. Ensure backward compatibility with existing workers
  3. Add tests for new functionality
  4. Update documentation as needed

License

AGPL-3.0 - See LICENSE file for details