* UN-2470 [MISC] Remove Django dependency from Celery workers This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix pre-commit issues: file permissions and ruff errors Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactoreed, Dockerfiles,fixes * flexibility on celery run commands * added debug logs * handled filehistory for API * cleanup * cleanup * cloud plugin structure * minor changes in import plugin * added notification and logger workers under new worker module * add docker compatibility for new workers * handled docker issues * log consumer worker fixes * added scheduler worker * minor env changes * cleanup the logs * minor changes in logs * resolved scheduler worker issues * cleanup and refactor * ensuring backward compatibbility to existing wokers * added configuration internal apis and cache utils * optimization * Fix API client singleton pattern to share HTTP sessions - Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation * cleanup and structuring * cleanup in callback * file system connectors issue * celery env values changes * optional gossip * variables for sync, mingle and gossip * Fix for file type check * Task pipeline issue resolving * api deployement failed response handled * Task pipline fixes * updated file history cleanup with active file execution * pipline status update and workflow ui page execution * cleanup and resolvinf conflicts * remove unstract-core from conenctoprs * Commit uv.lock changes * uv locks updates * resolve migration issues * defer connector-metadtda * Fix connector migration for production scale - Add encryption key handling with defer() to prevent decryption failures - Add final cleanup step to fix duplicate connector names - Optimize for large datasets with batch processing and bulk operations - Ensure unique constraint in migration 0004 can be created successfully 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * hitl fixes * minor fixes on hitl * api_hub related changes * dockerfile fixes * api client cache fixes with actual response class * fix: tags and llm_profile_id * optimized clear cache * cleanup * enhanced logs * added more handling on is file dir and added loggers * cleanup the runplatform script * internal apis are excempting from csrf * sonal cloud issues * sona-cloud issues * resolving sonar cloud issues * resolving sonar cloud issues * Delta: added Batch size fix in workers * comments addressed * celery configurational changes for new workers * fiixes in callback regaurding the pipline type check * change internal url registry logic * gitignore changes * gitignore changes * addressng pr cmmnets and cleanup the codes * adding missed profiles for v2 * sonal cloud blocker issues resolved * imlement otel * Commit uv.lock changes * handle execution time and some cleanup * adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544 * scheduler backward compatibitlity * replace user_data with custom_data * Commit uv.lock changes * celery worker command issue resolved * enhance package imports in connectors by changing to lazy imports * Update runner.py by removing the otel from it Update runner.py by removing the otel from it Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> * added delta changes * handle erro to destination db * resolve tool instances id validation and hitl queu name in API * handled direct execution from workflow page to worker and logs * handle cost logs * Update health.py Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * minor log changes * introducing log consumer scheduler to bulk create, and socket .emit from worker for ws * Commit uv.lock changes * time limit or timeout celery config cleanup * implemented redis client class in worker * pipline status enum mismatch * notification worker fixes * resolve uv lock conflicts * workflow log fixes * ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys * default TTL changed for unified logs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
459 lines
16 KiB
Bash
Executable File
459 lines
16 KiB
Bash
Executable File
#!/bin/bash
|
|
# =============================================================================
|
|
# Unstract Workers Runner Script - Docker Version
|
|
# =============================================================================
|
|
# This script is optimized for running workers inside Docker containers
|
|
# where all dependencies are pre-installed during image build.
|
|
#
|
|
# For local development, use run-worker.sh instead.
|
|
|
|
set -e
|
|
|
|
# Colors for output
|
|
RED='\033[0;31m'
|
|
GREEN='\033[0;32m'
|
|
YELLOW='\033[1;33m'
|
|
BLUE='\033[0;34m'
|
|
NC='\033[0m' # No Color
|
|
|
|
# Script directory - in Docker, everything runs from /app
|
|
WORKERS_DIR="/app"
|
|
|
|
# Default environment file
|
|
ENV_FILE="/app/.env"
|
|
|
|
# Available workers
|
|
declare -A WORKERS=(
|
|
["api"]="api_deployment"
|
|
["api-deployment"]="api_deployment"
|
|
["general"]="general"
|
|
["file"]="file_processing"
|
|
["file-processing"]="file_processing"
|
|
["callback"]="callback"
|
|
["notification"]="notification"
|
|
["log"]="log_consumer"
|
|
["log-consumer"]="log_consumer"
|
|
["scheduler"]="scheduler"
|
|
["schedule"]="scheduler"
|
|
["all"]="all"
|
|
)
|
|
|
|
# Worker queue mappings
|
|
declare -A WORKER_QUEUES=(
|
|
["api_deployment"]="celery_api_deployments"
|
|
["general"]="celery"
|
|
["file_processing"]="file_processing,api_file_processing"
|
|
["callback"]="file_processing_callback,api_file_processing_callback"
|
|
["notification"]="notifications,notifications_webhook,notifications_email,notifications_sms,notifications_priority"
|
|
["log_consumer"]="celery_log_task_queue"
|
|
["scheduler"]="scheduler"
|
|
)
|
|
|
|
# Worker health ports
|
|
declare -A WORKER_HEALTH_PORTS=(
|
|
["api_deployment"]="8080"
|
|
["general"]="8081"
|
|
["file_processing"]="8082"
|
|
["callback"]="8083"
|
|
["log_consumer"]="8084"
|
|
["notification"]="8085"
|
|
["scheduler"]="8087"
|
|
)
|
|
|
|
# Function to print colored output
|
|
print_status() {
|
|
local color=$1
|
|
local message=$2
|
|
echo -e "${color}${message}${NC}"
|
|
}
|
|
|
|
# Function to load environment file
|
|
load_env() {
|
|
local env_file=$1
|
|
|
|
if [[ -f "$env_file" ]]; then
|
|
print_status $GREEN "Loading environment from: $env_file"
|
|
set -a # automatically export all variables
|
|
source "$env_file"
|
|
set +a
|
|
else
|
|
print_status $YELLOW "Warning: Environment file not found: $env_file"
|
|
print_status $YELLOW "Make sure required environment variables are set"
|
|
fi
|
|
}
|
|
|
|
|
|
# Function to detect worker type from command-line arguments
|
|
detect_worker_type_from_args() {
|
|
local -n args_ref=$1
|
|
|
|
# Look for --queues argument to infer worker type
|
|
local queues=""
|
|
local i=0
|
|
while [[ $i -lt ${#args_ref[@]} ]]; do
|
|
local arg="${args_ref[$i]}"
|
|
case "$arg" in
|
|
--queues=*)
|
|
queues="${arg#--queues=}"
|
|
break
|
|
;;
|
|
--queues)
|
|
((i++))
|
|
if [[ $i -lt ${#args_ref[@]} ]]; then
|
|
queues="${args_ref[$i]}"
|
|
break
|
|
fi
|
|
;;
|
|
esac
|
|
((i++))
|
|
done
|
|
|
|
# Map queue patterns to worker types
|
|
case "$queues" in
|
|
*"file_processing"*) echo "file_processing" ;;
|
|
*"celery_api_deployments"*) echo "api_deployment" ;;
|
|
*"file_processing_callback"*) echo "callback" ;;
|
|
*"notifications"*) echo "notification" ;;
|
|
*"celery_log_task_queue"*) echo "log_consumer" ;;
|
|
*"scheduler"*) echo "scheduler" ;;
|
|
*"celery"*) echo "general" ;;
|
|
*) echo "general" ;; # fallback
|
|
esac
|
|
}
|
|
|
|
# Function to run a single worker
|
|
run_worker() {
|
|
local worker_type=$1
|
|
|
|
# Normalize worker type - convert hyphens to underscores for consistency
|
|
case "$worker_type" in
|
|
"api-deployment"|"api")
|
|
worker_type="api_deployment"
|
|
;;
|
|
"file-processing"|"file")
|
|
worker_type="file_processing"
|
|
;;
|
|
"log-consumer"|"log")
|
|
worker_type="log_consumer"
|
|
;;
|
|
# general, callback, and notification stay the same
|
|
esac
|
|
|
|
# Set worker-specific environment variables
|
|
export WORKER_TYPE="$worker_type"
|
|
export WORKER_NAME="${worker_type}-worker"
|
|
|
|
# Determine instance name
|
|
local worker_instance_name="${worker_type}-worker"
|
|
if [[ -n "$HOSTNAME" ]]; then
|
|
# In Docker/K8s, use the container hostname
|
|
worker_instance_name="${worker_type}-${HOSTNAME}"
|
|
elif [[ -n "$WORKER_INSTANCE_ID" ]]; then
|
|
worker_instance_name="${worker_type}-worker-${WORKER_INSTANCE_ID}"
|
|
else
|
|
# Default naming for production
|
|
worker_instance_name="${worker_type}-worker-prod-01"
|
|
fi
|
|
|
|
# Get queues for this worker - allow environment override
|
|
local queues="${WORKER_QUEUES[$worker_type]}"
|
|
case "$worker_type" in
|
|
"api_deployment")
|
|
queues="${CELERY_QUEUES_API_DEPLOYMENT:-$queues}"
|
|
;;
|
|
"general")
|
|
queues="${CELERY_QUEUES_GENERAL:-$queues}"
|
|
;;
|
|
"file_processing")
|
|
queues="${CELERY_QUEUES_FILE_PROCESSING:-$queues}"
|
|
;;
|
|
"callback")
|
|
queues="${CELERY_QUEUES_CALLBACK:-$queues}"
|
|
;;
|
|
"notification")
|
|
queues="${CELERY_QUEUES_NOTIFICATION:-$queues}"
|
|
;;
|
|
"log_consumer")
|
|
queues="${CELERY_QUEUES_LOG_CONSUMER:-$queues}"
|
|
;;
|
|
"scheduler")
|
|
queues="${CELERY_QUEUES_SCHEDULER:-$queues}"
|
|
;;
|
|
esac
|
|
|
|
# Get health port
|
|
local health_port="${WORKER_HEALTH_PORTS[$worker_type]}"
|
|
|
|
# Set health port environment variable
|
|
case "$worker_type" in
|
|
"api_deployment")
|
|
export API_DEPLOYMENT_HEALTH_PORT="${health_port}"
|
|
export API_DEPLOYMENT_METRICS_PORT="${health_port}"
|
|
;;
|
|
"general")
|
|
export GENERAL_HEALTH_PORT="${health_port}"
|
|
export GENERAL_METRICS_PORT="${health_port}"
|
|
;;
|
|
"file_processing")
|
|
export FILE_PROCESSING_HEALTH_PORT="${health_port}"
|
|
export FILE_PROCESSING_METRICS_PORT="${health_port}"
|
|
;;
|
|
"callback")
|
|
export CALLBACK_HEALTH_PORT="${health_port}"
|
|
export CALLBACK_METRICS_PORT="${health_port}"
|
|
;;
|
|
"notification")
|
|
export NOTIFICATION_HEALTH_PORT="${health_port}"
|
|
export NOTIFICATION_METRICS_PORT="${health_port}"
|
|
;;
|
|
"log_consumer")
|
|
export LOG_CONSUMER_HEALTH_PORT="${health_port}"
|
|
export LOG_CONSUMER_METRICS_PORT="${health_port}"
|
|
;;
|
|
"scheduler")
|
|
export SCHEDULER_HEALTH_PORT="${health_port}"
|
|
export SCHEDULER_METRICS_PORT="${health_port}"
|
|
;;
|
|
esac
|
|
|
|
# Determine concurrency settings
|
|
local concurrency=""
|
|
case "$worker_type" in
|
|
"api_deployment")
|
|
concurrency="${WORKER_API_DEPLOYMENT_CONCURRENCY:-2}"
|
|
;;
|
|
"general")
|
|
concurrency="${WORKER_GENERAL_CONCURRENCY:-4}"
|
|
;;
|
|
"file_processing")
|
|
concurrency="${WORKER_FILE_PROCESSING_CONCURRENCY:-4}"
|
|
;;
|
|
"callback")
|
|
concurrency="${WORKER_CALLBACK_CONCURRENCY:-4}"
|
|
;;
|
|
"notification")
|
|
concurrency="${WORKER_NOTIFICATION_CONCURRENCY:-2}"
|
|
;;
|
|
"log_consumer")
|
|
concurrency="${WORKER_LOG_CONSUMER_CONCURRENCY:-2}"
|
|
;;
|
|
"scheduler")
|
|
concurrency="${WORKER_SCHEDULER_CONCURRENCY:-2}"
|
|
;;
|
|
esac
|
|
|
|
print_status $GREEN "Starting $worker_type worker..."
|
|
print_status $BLUE "Working Directory: /app"
|
|
print_status $BLUE "Worker Name: $worker_instance_name"
|
|
print_status $BLUE "Queues: $queues"
|
|
print_status $BLUE "Health Port: $health_port"
|
|
print_status $BLUE "Concurrency: $concurrency"
|
|
|
|
# Build Celery command with configurable options
|
|
local app_module="${CELERY_APP_MODULE:-worker}"
|
|
|
|
# Initial command without specific args - they'll be resolved with priority system
|
|
local celery_cmd="/app/.venv/bin/celery -A $app_module worker"
|
|
local celery_args=""
|
|
|
|
# =============================================================================
|
|
# Hierarchical Configuration Resolution (4-tier priority system)
|
|
# =============================================================================
|
|
# Resolve worker-specific overrides using the hierarchical configuration pattern:
|
|
# 1. Command-line arguments (highest priority)
|
|
# 2. {WORKER_TYPE}_{SETTING_NAME} (high priority)
|
|
# 3. CELERY_{SETTING_NAME} (medium priority)
|
|
# 4. Default value (lowest priority)
|
|
|
|
# Traditional environment-based command building (no CLI parsing needed)
|
|
|
|
# Convert worker_type to uppercase for environment variable resolution
|
|
local worker_type_upper=$(echo "$worker_type" | tr '[:lower:]' '[:upper:]' | tr '-' '_')
|
|
|
|
# Helper function for hierarchical configuration resolution (environment-based)
|
|
resolve_config() {
|
|
local setting_name=$1
|
|
local default_value=$2
|
|
|
|
# Check worker-specific setting (highest priority)
|
|
local worker_specific_var="${worker_type_upper}_${setting_name}"
|
|
local worker_value=$(eval echo "\${${worker_specific_var}:-}")
|
|
if [[ -n "$worker_value" ]]; then
|
|
echo "$worker_value"
|
|
return
|
|
fi
|
|
|
|
# Check global Celery setting (medium priority)
|
|
local global_var="CELERY_${setting_name}"
|
|
local global_value=$(eval echo "\${${global_var}:-}")
|
|
if [[ -n "$global_value" ]]; then
|
|
echo "$global_value"
|
|
return
|
|
fi
|
|
|
|
# Use default value (lowest priority)
|
|
echo "$default_value"
|
|
}
|
|
|
|
# Resolve configuration using environment variables only
|
|
local resolved_queues="$queues"
|
|
celery_args="$celery_args --queues=$resolved_queues"
|
|
|
|
# Resolve log level
|
|
local resolved_loglevel="${CELERY_LOG_LEVEL:-${LOG_LEVEL:-INFO}}"
|
|
celery_args="$celery_args --loglevel=$resolved_loglevel"
|
|
|
|
# Resolve hostname
|
|
local resolved_hostname="${CELERY_HOSTNAME:-${worker_instance_name}@%h}"
|
|
celery_args="$celery_args --hostname=$resolved_hostname"
|
|
|
|
# Apply hierarchical configuration for pool type
|
|
local pool_type=$(resolve_config "POOL_TYPE" "prefork")
|
|
# Override with legacy CELERY_POOL for backward compatibility
|
|
pool_type="${CELERY_POOL:-$pool_type}"
|
|
celery_args="$celery_args --pool=$pool_type"
|
|
|
|
# Configure concurrency with hierarchical resolution
|
|
local resolved_concurrency=$(resolve_config "CONCURRENCY" "$concurrency")
|
|
# Apply legacy CELERY_CONCURRENCY
|
|
resolved_concurrency="${CELERY_CONCURRENCY:-$resolved_concurrency}"
|
|
celery_args="$celery_args --concurrency=$resolved_concurrency"
|
|
|
|
# Apply hierarchical configuration for optional parameters
|
|
|
|
# Prefetch multiplier
|
|
local prefetch_multiplier=$(resolve_config "PREFETCH_MULTIPLIER" "")
|
|
prefetch_multiplier="${CELERY_PREFETCH_MULTIPLIER:-$prefetch_multiplier}"
|
|
if [[ -n "$prefetch_multiplier" ]]; then
|
|
celery_args="$celery_args --prefetch-multiplier=$prefetch_multiplier"
|
|
fi
|
|
|
|
# Max tasks per child
|
|
local max_tasks_per_child=$(resolve_config "MAX_TASKS_PER_CHILD" "")
|
|
max_tasks_per_child="${CELERY_MAX_TASKS_PER_CHILD:-$max_tasks_per_child}"
|
|
if [[ -n "$max_tasks_per_child" ]]; then
|
|
celery_args="$celery_args --max-tasks-per-child=$max_tasks_per_child"
|
|
fi
|
|
|
|
# Task time limit
|
|
local time_limit=$(resolve_config "TASK_TIME_LIMIT" "")
|
|
time_limit="${CELERY_TIME_LIMIT:-$time_limit}"
|
|
if [[ -n "$time_limit" ]]; then
|
|
celery_args="$celery_args --time-limit=$time_limit"
|
|
fi
|
|
|
|
# Task soft time limit
|
|
local soft_time_limit=$(resolve_config "TASK_SOFT_TIME_LIMIT" "")
|
|
soft_time_limit="${CELERY_SOFT_TIME_LIMIT:-$soft_time_limit}"
|
|
if [[ -n "$soft_time_limit" ]]; then
|
|
celery_args="$celery_args --soft-time-limit=$soft_time_limit"
|
|
fi
|
|
|
|
# Add gossip, mingle, and heartbeat control flags based on environment variables
|
|
# Default: gossip=true, mingle=true, heartbeat=true (Celery defaults)
|
|
|
|
if [[ "${CELERY_WORKER_GOSSIP:-true}" == "false" ]]; then
|
|
celery_args="$celery_args --without-gossip"
|
|
fi
|
|
|
|
if [[ "${CELERY_WORKER_MINGLE:-true}" == "false" ]]; then
|
|
celery_args="$celery_args --without-mingle"
|
|
fi
|
|
|
|
if [[ "${CELERY_WORKER_HEARTBEAT:-true}" == "false" ]]; then
|
|
celery_args="$celery_args --without-heartbeat"
|
|
fi
|
|
|
|
# Add any additional custom Celery arguments
|
|
if [[ -n "$CELERY_EXTRA_ARGS" ]]; then
|
|
celery_args="$celery_args $CELERY_EXTRA_ARGS"
|
|
fi
|
|
|
|
# Execute the command
|
|
exec $celery_cmd $celery_args
|
|
}
|
|
|
|
# Main execution
|
|
# Load environment first for any needed variables
|
|
load_env "$ENV_FILE"
|
|
|
|
# Add PYTHONPATH for imports - include both /app and /unstract for packages
|
|
export PYTHONPATH="/app:/unstract/core/src:/unstract/connectors/src:/unstract/filesystem/src:/unstract/flags/src:/unstract/tool-registry/src:/unstract/tool-sandbox/src:/unstract/workflow-execution/src:${PYTHONPATH:-}"
|
|
|
|
# Two-path logic: Full Celery command vs Traditional worker type
|
|
if [[ "$1" == *"celery"* ]] || [[ "$1" == *".venv"* ]]; then
|
|
# =============================================================================
|
|
# PATH 1: Full Celery Command Detected - Use Directly
|
|
# =============================================================================
|
|
print_status $BLUE "🚀 Full Celery command detected - executing directly"
|
|
|
|
# Extract worker type for environment setup
|
|
ALL_ARGS=("$@")
|
|
WORKER_TYPE=$(detect_worker_type_from_args ALL_ARGS)
|
|
|
|
print_status $BLUE "Detected worker type: $WORKER_TYPE"
|
|
print_status $BLUE "Command: $*"
|
|
|
|
# Set essential environment variables for worker identification
|
|
export WORKER_TYPE="$WORKER_TYPE"
|
|
export WORKER_NAME="${WORKER_TYPE}-worker"
|
|
|
|
# Set worker instance name for identification
|
|
if [[ -n "$HOSTNAME" ]]; then
|
|
worker_instance_name="${WORKER_TYPE}-${HOSTNAME}"
|
|
elif [[ -n "$WORKER_INSTANCE_ID" ]]; then
|
|
worker_instance_name="${WORKER_TYPE}-worker-${WORKER_INSTANCE_ID}"
|
|
else
|
|
worker_instance_name="${WORKER_TYPE}-worker-docker"
|
|
fi
|
|
export WORKER_NAME="$worker_instance_name"
|
|
|
|
# Set health port environment variable based on worker type
|
|
case "$WORKER_TYPE" in
|
|
"api_deployment")
|
|
export API_DEPLOYMENT_HEALTH_PORT="8080"
|
|
export API_DEPLOYMENT_METRICS_PORT="8080"
|
|
;;
|
|
"general")
|
|
export GENERAL_HEALTH_PORT="8081"
|
|
export GENERAL_METRICS_PORT="8081"
|
|
;;
|
|
"file_processing")
|
|
export FILE_PROCESSING_HEALTH_PORT="8082"
|
|
export FILE_PROCESSING_METRICS_PORT="8082"
|
|
;;
|
|
"callback")
|
|
export CALLBACK_HEALTH_PORT="8083"
|
|
export CALLBACK_METRICS_PORT="8083"
|
|
;;
|
|
"notification")
|
|
export NOTIFICATION_HEALTH_PORT="8085"
|
|
export NOTIFICATION_METRICS_PORT="8085"
|
|
;;
|
|
"log_consumer")
|
|
export LOG_CONSUMER_HEALTH_PORT="8084"
|
|
export LOG_CONSUMER_METRICS_PORT="8084"
|
|
;;
|
|
"scheduler")
|
|
export SCHEDULER_HEALTH_PORT="8087"
|
|
export SCHEDULER_METRICS_PORT="8087"
|
|
;;
|
|
esac
|
|
|
|
print_status $GREEN "✅ Executing Celery command with highest priority..."
|
|
|
|
# Execute the full command directly - Celery will handle all arguments
|
|
exec "$@"
|
|
|
|
else
|
|
# =============================================================================
|
|
# PATH 2: Traditional Worker Type - Build from Environment
|
|
# =============================================================================
|
|
WORKER_TYPE="${1:-general}"
|
|
print_status $BLUE "🔧 Traditional worker type detected: $WORKER_TYPE"
|
|
print_status $BLUE "Building command from environment variables..."
|
|
|
|
# Use existing run_worker function for environment-based building
|
|
run_worker "$WORKER_TYPE"
|
|
fi
|