Files
unstract/workers/run-worker-docker.sh
ali 0c5997f9a9 UN-2470 [FEAT] Remove Django dependency from Celery workers with internal APIs (#1494)
* UN-2470 [MISC] Remove Django dependency from Celery workers

This commit introduces a new worker architecture that decouples
Celery workers from Django where possible, enabling support for
gevent/eventlet pool types and reducing worker startup overhead.

Key changes:
- Created separate worker modules (api-deployment, callback, file_processing, general)
- Added internal API endpoints for worker communication
- Implemented Django-free task execution where appropriate
- Added shared utilities and client facades
- Updated container configurations for new worker architecture

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix pre-commit issues: file permissions and ruff errors

Setup the docker for new workers

- Add executable permissions to worker entrypoint files
- Fix import order in namespace package __init__.py
- Remove unused variable api_status in general worker
- Address ruff E402 and F841 errors

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactoreed, Dockerfiles,fixes

* flexibility on celery run commands

* added debug logs

* handled filehistory for API

* cleanup

* cleanup

* cloud plugin structure

* minor changes in import plugin

* added notification and logger workers under new worker module

* add docker compatibility for new workers

* handled docker issues

* log consumer worker fixes

* added scheduler worker

* minor env changes

* cleanup the logs

* minor changes in logs

* resolved scheduler worker issues

* cleanup and refactor

* ensuring backward compatibbility to existing wokers

* added configuration internal apis and cache utils

* optimization

* Fix API client singleton pattern to share HTTP sessions

- Fix flawed singleton implementation that was trying to share BaseAPIClient instances
- Now properly shares HTTP sessions between specialized clients
- Eliminates 6x BaseAPIClient initialization by reusing the same underlying session
- Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session)
- Added debug logging to verify singleton pattern activation

* cleanup and structuring

* cleanup in callback

* file system connectors  issue

* celery env values changes

* optional gossip

* variables for sync, mingle and gossip

* Fix for file type check

* Task pipeline issue resolving

* api deployement failed response handled

* Task pipline fixes

* updated file history cleanup with active file execution

* pipline status update and workflow ui page execution

* cleanup and resolvinf conflicts

* remove unstract-core from conenctoprs

* Commit uv.lock changes

* uv locks updates

* resolve migration issues

* defer connector-metadtda

* Fix connector migration for production scale

- Add encryption key handling with defer() to prevent decryption failures
- Add final cleanup step to fix duplicate connector names
- Optimize for large datasets with batch processing and bulk operations
- Ensure unique constraint in migration 0004 can be created successfully

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* hitl fixes

* minor fixes on hitl

* api_hub related changes

* dockerfile fixes

* api client cache fixes with actual response class

* fix: tags and llm_profile_id

* optimized clear cache

* cleanup

* enhanced logs

* added more handling on is file dir and added loggers

* cleanup the runplatform script

* internal apis are excempting from csrf

* sonal cloud issues

* sona-cloud issues

* resolving sonar cloud issues

* resolving sonar cloud issues

* Delta: added Batch size fix in workers

* comments addressed

* celery configurational changes for new workers

* fiixes in callback regaurding the pipline type check

* change internal url registry logic

* gitignore changes

* gitignore changes

* addressng pr cmmnets and cleanup the codes

* adding missed profiles for v2

* sonal cloud blocker issues resolved

* imlement otel

* Commit uv.lock changes

* handle execution time and some cleanup

* adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544

* scheduler backward compatibitlity

* replace user_data with custom_data

* Commit uv.lock changes

* celery worker command issue resolved

* enhance package imports in connectors by changing to lazy imports

* Update runner.py by removing the otel from it

Update runner.py by removing the otel from it

Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>

* added delta changes

* handle erro to destination db

* resolve tool instances id validation and hitl queu name in API

* handled direct execution from workflow page to worker and logs

* handle cost logs

* Update health.py

Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor log changes

* introducing log consumer scheduler to bulk create, and socket .emit from worker for ws

* Commit uv.lock changes

* time limit or timeout celery config cleanup

* implemented redis client class in worker

* pipline status enum mismatch

* notification worker fixes

* resolve uv lock conflicts

* workflow log fixes

* ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys

* default TTL changed for unified logs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2025-10-03 11:24:07 +05:30

459 lines
16 KiB
Bash
Executable File

#!/bin/bash
# =============================================================================
# Unstract Workers Runner Script - Docker Version
# =============================================================================
# This script is optimized for running workers inside Docker containers
# where all dependencies are pre-installed during image build.
#
# For local development, use run-worker.sh instead.
set -e
# Colors for output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
NC='\033[0m' # No Color
# Script directory - in Docker, everything runs from /app
WORKERS_DIR="/app"
# Default environment file
ENV_FILE="/app/.env"
# Available workers
declare -A WORKERS=(
["api"]="api_deployment"
["api-deployment"]="api_deployment"
["general"]="general"
["file"]="file_processing"
["file-processing"]="file_processing"
["callback"]="callback"
["notification"]="notification"
["log"]="log_consumer"
["log-consumer"]="log_consumer"
["scheduler"]="scheduler"
["schedule"]="scheduler"
["all"]="all"
)
# Worker queue mappings
declare -A WORKER_QUEUES=(
["api_deployment"]="celery_api_deployments"
["general"]="celery"
["file_processing"]="file_processing,api_file_processing"
["callback"]="file_processing_callback,api_file_processing_callback"
["notification"]="notifications,notifications_webhook,notifications_email,notifications_sms,notifications_priority"
["log_consumer"]="celery_log_task_queue"
["scheduler"]="scheduler"
)
# Worker health ports
declare -A WORKER_HEALTH_PORTS=(
["api_deployment"]="8080"
["general"]="8081"
["file_processing"]="8082"
["callback"]="8083"
["log_consumer"]="8084"
["notification"]="8085"
["scheduler"]="8087"
)
# Function to print colored output
print_status() {
local color=$1
local message=$2
echo -e "${color}${message}${NC}"
}
# Function to load environment file
load_env() {
local env_file=$1
if [[ -f "$env_file" ]]; then
print_status $GREEN "Loading environment from: $env_file"
set -a # automatically export all variables
source "$env_file"
set +a
else
print_status $YELLOW "Warning: Environment file not found: $env_file"
print_status $YELLOW "Make sure required environment variables are set"
fi
}
# Function to detect worker type from command-line arguments
detect_worker_type_from_args() {
local -n args_ref=$1
# Look for --queues argument to infer worker type
local queues=""
local i=0
while [[ $i -lt ${#args_ref[@]} ]]; do
local arg="${args_ref[$i]}"
case "$arg" in
--queues=*)
queues="${arg#--queues=}"
break
;;
--queues)
((i++))
if [[ $i -lt ${#args_ref[@]} ]]; then
queues="${args_ref[$i]}"
break
fi
;;
esac
((i++))
done
# Map queue patterns to worker types
case "$queues" in
*"file_processing"*) echo "file_processing" ;;
*"celery_api_deployments"*) echo "api_deployment" ;;
*"file_processing_callback"*) echo "callback" ;;
*"notifications"*) echo "notification" ;;
*"celery_log_task_queue"*) echo "log_consumer" ;;
*"scheduler"*) echo "scheduler" ;;
*"celery"*) echo "general" ;;
*) echo "general" ;; # fallback
esac
}
# Function to run a single worker
run_worker() {
local worker_type=$1
# Normalize worker type - convert hyphens to underscores for consistency
case "$worker_type" in
"api-deployment"|"api")
worker_type="api_deployment"
;;
"file-processing"|"file")
worker_type="file_processing"
;;
"log-consumer"|"log")
worker_type="log_consumer"
;;
# general, callback, and notification stay the same
esac
# Set worker-specific environment variables
export WORKER_TYPE="$worker_type"
export WORKER_NAME="${worker_type}-worker"
# Determine instance name
local worker_instance_name="${worker_type}-worker"
if [[ -n "$HOSTNAME" ]]; then
# In Docker/K8s, use the container hostname
worker_instance_name="${worker_type}-${HOSTNAME}"
elif [[ -n "$WORKER_INSTANCE_ID" ]]; then
worker_instance_name="${worker_type}-worker-${WORKER_INSTANCE_ID}"
else
# Default naming for production
worker_instance_name="${worker_type}-worker-prod-01"
fi
# Get queues for this worker - allow environment override
local queues="${WORKER_QUEUES[$worker_type]}"
case "$worker_type" in
"api_deployment")
queues="${CELERY_QUEUES_API_DEPLOYMENT:-$queues}"
;;
"general")
queues="${CELERY_QUEUES_GENERAL:-$queues}"
;;
"file_processing")
queues="${CELERY_QUEUES_FILE_PROCESSING:-$queues}"
;;
"callback")
queues="${CELERY_QUEUES_CALLBACK:-$queues}"
;;
"notification")
queues="${CELERY_QUEUES_NOTIFICATION:-$queues}"
;;
"log_consumer")
queues="${CELERY_QUEUES_LOG_CONSUMER:-$queues}"
;;
"scheduler")
queues="${CELERY_QUEUES_SCHEDULER:-$queues}"
;;
esac
# Get health port
local health_port="${WORKER_HEALTH_PORTS[$worker_type]}"
# Set health port environment variable
case "$worker_type" in
"api_deployment")
export API_DEPLOYMENT_HEALTH_PORT="${health_port}"
export API_DEPLOYMENT_METRICS_PORT="${health_port}"
;;
"general")
export GENERAL_HEALTH_PORT="${health_port}"
export GENERAL_METRICS_PORT="${health_port}"
;;
"file_processing")
export FILE_PROCESSING_HEALTH_PORT="${health_port}"
export FILE_PROCESSING_METRICS_PORT="${health_port}"
;;
"callback")
export CALLBACK_HEALTH_PORT="${health_port}"
export CALLBACK_METRICS_PORT="${health_port}"
;;
"notification")
export NOTIFICATION_HEALTH_PORT="${health_port}"
export NOTIFICATION_METRICS_PORT="${health_port}"
;;
"log_consumer")
export LOG_CONSUMER_HEALTH_PORT="${health_port}"
export LOG_CONSUMER_METRICS_PORT="${health_port}"
;;
"scheduler")
export SCHEDULER_HEALTH_PORT="${health_port}"
export SCHEDULER_METRICS_PORT="${health_port}"
;;
esac
# Determine concurrency settings
local concurrency=""
case "$worker_type" in
"api_deployment")
concurrency="${WORKER_API_DEPLOYMENT_CONCURRENCY:-2}"
;;
"general")
concurrency="${WORKER_GENERAL_CONCURRENCY:-4}"
;;
"file_processing")
concurrency="${WORKER_FILE_PROCESSING_CONCURRENCY:-4}"
;;
"callback")
concurrency="${WORKER_CALLBACK_CONCURRENCY:-4}"
;;
"notification")
concurrency="${WORKER_NOTIFICATION_CONCURRENCY:-2}"
;;
"log_consumer")
concurrency="${WORKER_LOG_CONSUMER_CONCURRENCY:-2}"
;;
"scheduler")
concurrency="${WORKER_SCHEDULER_CONCURRENCY:-2}"
;;
esac
print_status $GREEN "Starting $worker_type worker..."
print_status $BLUE "Working Directory: /app"
print_status $BLUE "Worker Name: $worker_instance_name"
print_status $BLUE "Queues: $queues"
print_status $BLUE "Health Port: $health_port"
print_status $BLUE "Concurrency: $concurrency"
# Build Celery command with configurable options
local app_module="${CELERY_APP_MODULE:-worker}"
# Initial command without specific args - they'll be resolved with priority system
local celery_cmd="/app/.venv/bin/celery -A $app_module worker"
local celery_args=""
# =============================================================================
# Hierarchical Configuration Resolution (4-tier priority system)
# =============================================================================
# Resolve worker-specific overrides using the hierarchical configuration pattern:
# 1. Command-line arguments (highest priority)
# 2. {WORKER_TYPE}_{SETTING_NAME} (high priority)
# 3. CELERY_{SETTING_NAME} (medium priority)
# 4. Default value (lowest priority)
# Traditional environment-based command building (no CLI parsing needed)
# Convert worker_type to uppercase for environment variable resolution
local worker_type_upper=$(echo "$worker_type" | tr '[:lower:]' '[:upper:]' | tr '-' '_')
# Helper function for hierarchical configuration resolution (environment-based)
resolve_config() {
local setting_name=$1
local default_value=$2
# Check worker-specific setting (highest priority)
local worker_specific_var="${worker_type_upper}_${setting_name}"
local worker_value=$(eval echo "\${${worker_specific_var}:-}")
if [[ -n "$worker_value" ]]; then
echo "$worker_value"
return
fi
# Check global Celery setting (medium priority)
local global_var="CELERY_${setting_name}"
local global_value=$(eval echo "\${${global_var}:-}")
if [[ -n "$global_value" ]]; then
echo "$global_value"
return
fi
# Use default value (lowest priority)
echo "$default_value"
}
# Resolve configuration using environment variables only
local resolved_queues="$queues"
celery_args="$celery_args --queues=$resolved_queues"
# Resolve log level
local resolved_loglevel="${CELERY_LOG_LEVEL:-${LOG_LEVEL:-INFO}}"
celery_args="$celery_args --loglevel=$resolved_loglevel"
# Resolve hostname
local resolved_hostname="${CELERY_HOSTNAME:-${worker_instance_name}@%h}"
celery_args="$celery_args --hostname=$resolved_hostname"
# Apply hierarchical configuration for pool type
local pool_type=$(resolve_config "POOL_TYPE" "prefork")
# Override with legacy CELERY_POOL for backward compatibility
pool_type="${CELERY_POOL:-$pool_type}"
celery_args="$celery_args --pool=$pool_type"
# Configure concurrency with hierarchical resolution
local resolved_concurrency=$(resolve_config "CONCURRENCY" "$concurrency")
# Apply legacy CELERY_CONCURRENCY
resolved_concurrency="${CELERY_CONCURRENCY:-$resolved_concurrency}"
celery_args="$celery_args --concurrency=$resolved_concurrency"
# Apply hierarchical configuration for optional parameters
# Prefetch multiplier
local prefetch_multiplier=$(resolve_config "PREFETCH_MULTIPLIER" "")
prefetch_multiplier="${CELERY_PREFETCH_MULTIPLIER:-$prefetch_multiplier}"
if [[ -n "$prefetch_multiplier" ]]; then
celery_args="$celery_args --prefetch-multiplier=$prefetch_multiplier"
fi
# Max tasks per child
local max_tasks_per_child=$(resolve_config "MAX_TASKS_PER_CHILD" "")
max_tasks_per_child="${CELERY_MAX_TASKS_PER_CHILD:-$max_tasks_per_child}"
if [[ -n "$max_tasks_per_child" ]]; then
celery_args="$celery_args --max-tasks-per-child=$max_tasks_per_child"
fi
# Task time limit
local time_limit=$(resolve_config "TASK_TIME_LIMIT" "")
time_limit="${CELERY_TIME_LIMIT:-$time_limit}"
if [[ -n "$time_limit" ]]; then
celery_args="$celery_args --time-limit=$time_limit"
fi
# Task soft time limit
local soft_time_limit=$(resolve_config "TASK_SOFT_TIME_LIMIT" "")
soft_time_limit="${CELERY_SOFT_TIME_LIMIT:-$soft_time_limit}"
if [[ -n "$soft_time_limit" ]]; then
celery_args="$celery_args --soft-time-limit=$soft_time_limit"
fi
# Add gossip, mingle, and heartbeat control flags based on environment variables
# Default: gossip=true, mingle=true, heartbeat=true (Celery defaults)
if [[ "${CELERY_WORKER_GOSSIP:-true}" == "false" ]]; then
celery_args="$celery_args --without-gossip"
fi
if [[ "${CELERY_WORKER_MINGLE:-true}" == "false" ]]; then
celery_args="$celery_args --without-mingle"
fi
if [[ "${CELERY_WORKER_HEARTBEAT:-true}" == "false" ]]; then
celery_args="$celery_args --without-heartbeat"
fi
# Add any additional custom Celery arguments
if [[ -n "$CELERY_EXTRA_ARGS" ]]; then
celery_args="$celery_args $CELERY_EXTRA_ARGS"
fi
# Execute the command
exec $celery_cmd $celery_args
}
# Main execution
# Load environment first for any needed variables
load_env "$ENV_FILE"
# Add PYTHONPATH for imports - include both /app and /unstract for packages
export PYTHONPATH="/app:/unstract/core/src:/unstract/connectors/src:/unstract/filesystem/src:/unstract/flags/src:/unstract/tool-registry/src:/unstract/tool-sandbox/src:/unstract/workflow-execution/src:${PYTHONPATH:-}"
# Two-path logic: Full Celery command vs Traditional worker type
if [[ "$1" == *"celery"* ]] || [[ "$1" == *".venv"* ]]; then
# =============================================================================
# PATH 1: Full Celery Command Detected - Use Directly
# =============================================================================
print_status $BLUE "🚀 Full Celery command detected - executing directly"
# Extract worker type for environment setup
ALL_ARGS=("$@")
WORKER_TYPE=$(detect_worker_type_from_args ALL_ARGS)
print_status $BLUE "Detected worker type: $WORKER_TYPE"
print_status $BLUE "Command: $*"
# Set essential environment variables for worker identification
export WORKER_TYPE="$WORKER_TYPE"
export WORKER_NAME="${WORKER_TYPE}-worker"
# Set worker instance name for identification
if [[ -n "$HOSTNAME" ]]; then
worker_instance_name="${WORKER_TYPE}-${HOSTNAME}"
elif [[ -n "$WORKER_INSTANCE_ID" ]]; then
worker_instance_name="${WORKER_TYPE}-worker-${WORKER_INSTANCE_ID}"
else
worker_instance_name="${WORKER_TYPE}-worker-docker"
fi
export WORKER_NAME="$worker_instance_name"
# Set health port environment variable based on worker type
case "$WORKER_TYPE" in
"api_deployment")
export API_DEPLOYMENT_HEALTH_PORT="8080"
export API_DEPLOYMENT_METRICS_PORT="8080"
;;
"general")
export GENERAL_HEALTH_PORT="8081"
export GENERAL_METRICS_PORT="8081"
;;
"file_processing")
export FILE_PROCESSING_HEALTH_PORT="8082"
export FILE_PROCESSING_METRICS_PORT="8082"
;;
"callback")
export CALLBACK_HEALTH_PORT="8083"
export CALLBACK_METRICS_PORT="8083"
;;
"notification")
export NOTIFICATION_HEALTH_PORT="8085"
export NOTIFICATION_METRICS_PORT="8085"
;;
"log_consumer")
export LOG_CONSUMER_HEALTH_PORT="8084"
export LOG_CONSUMER_METRICS_PORT="8084"
;;
"scheduler")
export SCHEDULER_HEALTH_PORT="8087"
export SCHEDULER_METRICS_PORT="8087"
;;
esac
print_status $GREEN "✅ Executing Celery command with highest priority..."
# Execute the full command directly - Celery will handle all arguments
exec "$@"
else
# =============================================================================
# PATH 2: Traditional Worker Type - Build from Environment
# =============================================================================
WORKER_TYPE="${1:-general}"
print_status $BLUE "🔧 Traditional worker type detected: $WORKER_TYPE"
print_status $BLUE "Building command from environment variables..."
# Use existing run_worker function for environment-based building
run_worker "$WORKER_TYPE"
fi