* UN-2470 [MISC] Remove Django dependency from Celery workers This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix pre-commit issues: file permissions and ruff errors Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactoreed, Dockerfiles,fixes * flexibility on celery run commands * added debug logs * handled filehistory for API * cleanup * cleanup * cloud plugin structure * minor changes in import plugin * added notification and logger workers under new worker module * add docker compatibility for new workers * handled docker issues * log consumer worker fixes * added scheduler worker * minor env changes * cleanup the logs * minor changes in logs * resolved scheduler worker issues * cleanup and refactor * ensuring backward compatibbility to existing wokers * added configuration internal apis and cache utils * optimization * Fix API client singleton pattern to share HTTP sessions - Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation * cleanup and structuring * cleanup in callback * file system connectors issue * celery env values changes * optional gossip * variables for sync, mingle and gossip * Fix for file type check * Task pipeline issue resolving * api deployement failed response handled * Task pipline fixes * updated file history cleanup with active file execution * pipline status update and workflow ui page execution * cleanup and resolvinf conflicts * remove unstract-core from conenctoprs * Commit uv.lock changes * uv locks updates * resolve migration issues * defer connector-metadtda * Fix connector migration for production scale - Add encryption key handling with defer() to prevent decryption failures - Add final cleanup step to fix duplicate connector names - Optimize for large datasets with batch processing and bulk operations - Ensure unique constraint in migration 0004 can be created successfully 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * hitl fixes * minor fixes on hitl * api_hub related changes * dockerfile fixes * api client cache fixes with actual response class * fix: tags and llm_profile_id * optimized clear cache * cleanup * enhanced logs * added more handling on is file dir and added loggers * cleanup the runplatform script * internal apis are excempting from csrf * sonal cloud issues * sona-cloud issues * resolving sonar cloud issues * resolving sonar cloud issues * Delta: added Batch size fix in workers * comments addressed * celery configurational changes for new workers * fiixes in callback regaurding the pipline type check * change internal url registry logic * gitignore changes * gitignore changes * addressng pr cmmnets and cleanup the codes * adding missed profiles for v2 * sonal cloud blocker issues resolved * imlement otel * Commit uv.lock changes * handle execution time and some cleanup * adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544 * scheduler backward compatibitlity * replace user_data with custom_data * Commit uv.lock changes * celery worker command issue resolved * enhance package imports in connectors by changing to lazy imports * Update runner.py by removing the otel from it Update runner.py by removing the otel from it Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> * added delta changes * handle erro to destination db * resolve tool instances id validation and hitl queu name in API * handled direct execution from workflow page to worker and logs * handle cost logs * Update health.py Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * minor log changes * introducing log consumer scheduler to bulk create, and socket .emit from worker for ws * Commit uv.lock changes * time limit or timeout celery config cleanup * implemented redis client class in worker * pipline status enum mismatch * notification worker fixes * resolve uv lock conflicts * workflow log fixes * ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys * default TTL changed for unified logs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
515 lines
14 KiB
Bash
Executable File
515 lines
14 KiB
Bash
Executable File
#!/bin/bash
|
|
# =============================================================================
|
|
# Unstract Workers Runner Script
|
|
# =============================================================================
|
|
# This script provides a convenient way to run individual or multiple workers
|
|
# with proper environment configuration and health monitoring.
|
|
|
|
set -e
|
|
|
|
# Colors for output
|
|
RED='\033[0;31m'
|
|
GREEN='\033[0;32m'
|
|
YELLOW='\033[1;33m'
|
|
BLUE='\033[0;34m'
|
|
NC='\033[0m' # No Color
|
|
|
|
# Script directory
|
|
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
|
WORKERS_DIR="$SCRIPT_DIR"
|
|
|
|
# Default environment file
|
|
ENV_FILE="$WORKERS_DIR/.env"
|
|
|
|
# Available workers
|
|
declare -A WORKERS=(
|
|
["api"]="api-deployment"
|
|
["api-deployment"]="api-deployment"
|
|
["general"]="general"
|
|
["file"]="file_processing"
|
|
["file-processing"]="file_processing"
|
|
["callback"]="callback"
|
|
["log"]="log_consumer"
|
|
["log-consumer"]="log_consumer"
|
|
["logs"]="log_consumer"
|
|
["notification"]="notification"
|
|
["notifications"]="notification"
|
|
["notify"]="notification"
|
|
["scheduler"]="scheduler"
|
|
["schedule"]="scheduler"
|
|
["all"]="all"
|
|
)
|
|
|
|
# Worker queue mappings
|
|
declare -A WORKER_QUEUES=(
|
|
["api-deployment"]="celery_api_deployments"
|
|
["general"]="celery"
|
|
["file_processing"]="file_processing,api_file_processing"
|
|
["callback"]="file_processing_callback,api_file_processing_callback"
|
|
["log_consumer"]="celery_log_task_queue"
|
|
["notification"]="notifications,notifications_webhook,notifications_email,notifications_sms,notifications_priority"
|
|
["scheduler"]="scheduler"
|
|
)
|
|
|
|
# Worker health ports
|
|
declare -A WORKER_HEALTH_PORTS=(
|
|
["api-deployment"]="8080"
|
|
["general"]="8081"
|
|
["file_processing"]="8082"
|
|
["callback"]="8083"
|
|
["log_consumer"]="8084"
|
|
["notification"]="8085"
|
|
["scheduler"]="8087"
|
|
)
|
|
|
|
# Function to display usage
|
|
usage() {
|
|
cat << EOF
|
|
Usage: $0 [OPTIONS] WORKER_TYPE
|
|
|
|
Run Unstract Celery workers with proper environment configuration.
|
|
|
|
WORKER_TYPE:
|
|
api, api-deployment Run API deployment worker
|
|
general Run general worker (webhooks, background tasks)
|
|
file, file-processing Run file processing worker
|
|
callback Run callback worker
|
|
log, log-consumer Run log consumer worker
|
|
notification, notify Run notification worker
|
|
scheduler, schedule Run scheduler worker (scheduled pipeline tasks)
|
|
all Run all workers (in separate processes)
|
|
|
|
OPTIONS:
|
|
-e, --env-file FILE Use specific environment file (default: .env)
|
|
-d, --detach Run worker in background (daemon mode)
|
|
-l, --log-level LEVEL Set log level (DEBUG, INFO, WARNING, ERROR)
|
|
-c, --concurrency N Set worker concurrency (default: auto)
|
|
-q, --queues QUEUES Override default queues (comma-separated)
|
|
-p, --health-port N Override health check port
|
|
-n, --hostname NAME Set custom worker hostname/name
|
|
-k, --kill Kill running workers and exit
|
|
-s, --status Show status of running workers
|
|
-h, --help Show this help message
|
|
|
|
EXAMPLES:
|
|
# Run API deployment worker
|
|
$0 api
|
|
|
|
# Run general worker with debug logging
|
|
$0 -l DEBUG general
|
|
|
|
# Run file processing worker in background
|
|
$0 -d file
|
|
|
|
# Run with custom environment file
|
|
$0 -e production.env all
|
|
|
|
# Run with custom concurrency
|
|
$0 -c 4 general
|
|
|
|
# Run with custom worker name (useful for scaling)
|
|
$0 -n api-01 api
|
|
$0 -n api-02 api
|
|
|
|
# Check worker status
|
|
$0 -s
|
|
|
|
# Kill all running workers
|
|
$0 -k
|
|
|
|
ENVIRONMENT:
|
|
The script will load environment variables from .env file if present.
|
|
Required variables:
|
|
- INTERNAL_SERVICE_API_KEY
|
|
- INTERNAL_API_BASE_URL
|
|
- CELERY_BROKER_BASE_URL
|
|
- DB_HOST, DB_USER, DB_PASSWORD, DB_NAME (for PostgreSQL result backend)
|
|
|
|
Plugin availability is detected dynamically via plugin registry.
|
|
See sample.env for full configuration options.
|
|
|
|
HEALTH CHECKS:
|
|
Each worker exposes a health check endpoint:
|
|
- API Deployment: http://localhost:8080/health
|
|
- General: http://localhost:8081/health
|
|
- File Processing: http://localhost:8082/health
|
|
- Callback: http://localhost:8083/health
|
|
- Log Consumer: http://localhost:8084/health
|
|
- Notification: http://localhost:8085/health
|
|
- Scheduler: http://localhost:8087/health
|
|
|
|
EOF
|
|
}
|
|
|
|
# Function to print colored output
|
|
print_status() {
|
|
local color=$1
|
|
local message=$2
|
|
echo -e "${color}${message}${NC}"
|
|
}
|
|
|
|
# Function to load environment file
|
|
load_env() {
|
|
local env_file=$1
|
|
|
|
if [[ -f "$env_file" ]]; then
|
|
print_status $GREEN "Loading environment from: $env_file"
|
|
set -a # automatically export all variables
|
|
source "$env_file"
|
|
set +a
|
|
else
|
|
print_status $YELLOW "Warning: Environment file not found: $env_file"
|
|
print_status $YELLOW "Make sure required environment variables are set"
|
|
fi
|
|
}
|
|
|
|
# Function to validate environment
|
|
validate_env() {
|
|
local required_vars=(
|
|
"INTERNAL_SERVICE_API_KEY"
|
|
"INTERNAL_API_BASE_URL"
|
|
"CELERY_BROKER_BASE_URL"
|
|
"DB_HOST"
|
|
"DB_USER"
|
|
"DB_PASSWORD"
|
|
"DB_NAME"
|
|
)
|
|
|
|
local missing_vars=()
|
|
|
|
for var in "${required_vars[@]}"; do
|
|
if [[ -z "${!var}" ]]; then
|
|
missing_vars+=("$var")
|
|
fi
|
|
done
|
|
|
|
if [[ ${#missing_vars[@]} -gt 0 ]]; then
|
|
print_status $RED "Error: Missing required environment variables:"
|
|
for var in "${missing_vars[@]}"; do
|
|
print_status $RED " - $var"
|
|
done
|
|
print_status $YELLOW "Please check your .env file or set these variables manually"
|
|
exit 1
|
|
fi
|
|
}
|
|
|
|
# Function to get worker PIDs
|
|
get_worker_pids() {
|
|
local worker_type=$1
|
|
pgrep -f "uv run celery.*worker.*$worker_type" || true
|
|
}
|
|
|
|
# Function to kill workers
|
|
kill_workers() {
|
|
print_status $YELLOW "Killing all running workers..."
|
|
|
|
for worker in "${!WORKERS[@]}"; do
|
|
if [[ "$worker" == "all" ]]; then
|
|
continue
|
|
fi
|
|
|
|
local worker_dir="${WORKERS[${worker}]}"
|
|
local pids=$(pgrep -f "uv run celery.*worker" || true)
|
|
|
|
if [[ -n "$pids" ]]; then
|
|
print_status $YELLOW "Killing worker processes: $pids"
|
|
echo "$pids" | xargs kill -TERM 2>/dev/null || true
|
|
sleep 2
|
|
# Force kill if still running
|
|
echo "$pids" | xargs kill -KILL 2>/dev/null || true
|
|
fi
|
|
done
|
|
|
|
print_status $GREEN "All workers stopped"
|
|
}
|
|
|
|
# Function to show worker status
|
|
show_status() {
|
|
print_status $BLUE "Worker Status:"
|
|
echo "=============="
|
|
|
|
for worker in api-deployment general file_processing callback log_consumer notification scheduler; do
|
|
local worker_dir="$WORKERS_DIR/$worker"
|
|
local health_port="${WORKER_HEALTH_PORTS[$worker]}"
|
|
local pids=$(get_worker_pids "$worker")
|
|
|
|
echo -n " $worker: "
|
|
|
|
if [[ -n "$pids" ]]; then
|
|
print_status $GREEN "RUNNING (PID: $pids)"
|
|
|
|
# Check health endpoint if possible
|
|
if command -v curl >/dev/null 2>&1; then
|
|
local health_url="http://localhost:$health_port/health"
|
|
if curl -s --max-time 2 "$health_url" >/dev/null 2>&1; then
|
|
echo " Health: http://localhost:$health_port/health - OK"
|
|
else
|
|
echo " Health: http://localhost:$health_port/health - UNREACHABLE"
|
|
fi
|
|
fi
|
|
else
|
|
print_status $RED "STOPPED"
|
|
fi
|
|
done
|
|
}
|
|
|
|
# Function to run a single worker
|
|
run_worker() {
|
|
local worker_type=$1
|
|
local detach=$2
|
|
local log_level=$3
|
|
local concurrency=$4
|
|
local custom_queues=$5
|
|
local health_port=$6
|
|
local custom_hostname=$7
|
|
|
|
local worker_dir="$WORKERS_DIR/$worker_type"
|
|
|
|
if [[ ! -d "$worker_dir" ]]; then
|
|
print_status $RED "Error: Worker directory not found: $worker_dir"
|
|
exit 1
|
|
fi
|
|
|
|
# Set worker-specific environment variables
|
|
export WORKER_NAME="${worker_type}-worker"
|
|
export WORKER_TYPE="$(echo "$worker_type" | tr '-' '_')" # Convert hyphens to underscores for Python module names
|
|
export LOG_LEVEL="${log_level:-INFO}"
|
|
|
|
# Set health port if specified
|
|
if [[ -n "$health_port" ]]; then
|
|
case "$worker_type" in
|
|
"api-deployment")
|
|
export API_DEPLOYMENT_HEALTH_PORT="$health_port"
|
|
;;
|
|
"general")
|
|
export GENERAL_HEALTH_PORT="$health_port"
|
|
;;
|
|
"file_processing")
|
|
export FILE_PROCESSING_HEALTH_PORT="$health_port"
|
|
;;
|
|
"callback")
|
|
export CALLBACK_HEALTH_PORT="$health_port"
|
|
;;
|
|
"log_consumer")
|
|
export LOG_CONSUMER_HEALTH_PORT="$health_port"
|
|
;;
|
|
"notification")
|
|
export NOTIFICATION_HEALTH_PORT="$health_port"
|
|
;;
|
|
"scheduler")
|
|
export SCHEDULER_HEALTH_PORT="$health_port"
|
|
;;
|
|
esac
|
|
fi
|
|
|
|
# Determine queues
|
|
local queues="${custom_queues:-${WORKER_QUEUES[$worker_type]}}"
|
|
|
|
# Build meaningful worker name
|
|
local worker_instance_name="${worker_type}-worker"
|
|
if [[ -n "$custom_hostname" ]]; then
|
|
worker_instance_name="$custom_hostname"
|
|
elif [[ -n "$WORKER_INSTANCE_ID" ]]; then
|
|
worker_instance_name="${worker_type}-worker-${WORKER_INSTANCE_ID}"
|
|
fi
|
|
|
|
# Build celery command
|
|
local cmd_args=(
|
|
"uv" "run" "celery" "-A" "worker" "worker"
|
|
"--loglevel=${log_level:-info}"
|
|
"--queues=$queues"
|
|
"--hostname=${worker_instance_name}@%h"
|
|
)
|
|
|
|
# Add concurrency if specified
|
|
if [[ -n "$concurrency" ]]; then
|
|
cmd_args+=("--concurrency=$concurrency")
|
|
fi
|
|
|
|
# Add concurrency for production-like setup
|
|
if [[ -z "$concurrency" ]]; then
|
|
case "$worker_type" in
|
|
"api-deployment")
|
|
cmd_args+=("--concurrency=2")
|
|
;;
|
|
"general")
|
|
cmd_args+=("--concurrency=4")
|
|
;;
|
|
"file_processing")
|
|
cmd_args+=("--concurrency=4")
|
|
;;
|
|
"callback")
|
|
cmd_args+=("--concurrency=4")
|
|
;;
|
|
"log_consumer")
|
|
cmd_args+=("--concurrency=2")
|
|
;;
|
|
"notification")
|
|
cmd_args+=("--concurrency=2")
|
|
;;
|
|
"scheduler")
|
|
cmd_args+=("--concurrency=2")
|
|
;;
|
|
esac
|
|
fi
|
|
|
|
print_status $GREEN "Starting $worker_type worker..."
|
|
print_status $BLUE "Directory: $worker_dir"
|
|
print_status $BLUE "Worker Name: $worker_instance_name"
|
|
print_status $BLUE "Queues: $queues"
|
|
print_status $BLUE "Health Port: ${WORKER_HEALTH_PORTS[$worker_type]}"
|
|
print_status $BLUE "Command: ${cmd_args[*]}"
|
|
|
|
cd "$worker_dir"
|
|
|
|
if [[ "$detach" == "true" ]]; then
|
|
# Run in background
|
|
nohup "${cmd_args[@]}" > "$worker_type.log" 2>&1 &
|
|
local pid=$!
|
|
print_status $GREEN "$worker_type worker started in background (PID: $pid)"
|
|
print_status $BLUE "Logs: $worker_dir/$worker_type.log"
|
|
else
|
|
# Run in foreground
|
|
exec "${cmd_args[@]}"
|
|
fi
|
|
}
|
|
|
|
# Function to run all workers
|
|
run_all_workers() {
|
|
local detach=$1
|
|
local log_level=$2
|
|
local concurrency=$3
|
|
|
|
print_status $GREEN "Starting all workers..."
|
|
|
|
# Always run all workers in background when using "all"
|
|
for worker in api-deployment general file_processing callback log_consumer notification scheduler; do
|
|
print_status $BLUE "Starting $worker worker in background..."
|
|
|
|
# Run each worker in background
|
|
(
|
|
run_worker "$worker" "true" "$log_level" "$concurrency" "" ""
|
|
) &
|
|
|
|
sleep 2 # Give each worker time to start
|
|
done
|
|
|
|
if [[ "$detach" != "true" ]]; then
|
|
print_status $GREEN "All workers started. Press Ctrl+C to stop all workers."
|
|
print_status $BLUE "Worker status:"
|
|
sleep 3
|
|
show_status
|
|
|
|
# Wait for any background job to finish (they won't unless killed)
|
|
wait
|
|
else
|
|
print_status $GREEN "All workers started in background"
|
|
show_status
|
|
fi
|
|
}
|
|
|
|
# Parse command line arguments
|
|
DETACH=false
|
|
LOG_LEVEL=""
|
|
CONCURRENCY=""
|
|
CUSTOM_QUEUES=""
|
|
HEALTH_PORT=""
|
|
CUSTOM_HOSTNAME=""
|
|
KILL_WORKERS=false
|
|
SHOW_STATUS=false
|
|
|
|
while [[ $# -gt 0 ]]; do
|
|
case $1 in
|
|
-e|--env-file)
|
|
ENV_FILE="$2"
|
|
shift 2
|
|
;;
|
|
-d|--detach)
|
|
DETACH=true
|
|
shift
|
|
;;
|
|
-l|--log-level)
|
|
LOG_LEVEL="$2"
|
|
shift 2
|
|
;;
|
|
-c|--concurrency)
|
|
CONCURRENCY="$2"
|
|
shift 2
|
|
;;
|
|
-q|--queues)
|
|
CUSTOM_QUEUES="$2"
|
|
shift 2
|
|
;;
|
|
-p|--health-port)
|
|
HEALTH_PORT="$2"
|
|
shift 2
|
|
;;
|
|
-n|--hostname)
|
|
CUSTOM_HOSTNAME="$2"
|
|
shift 2
|
|
;;
|
|
-k|--kill)
|
|
KILL_WORKERS=true
|
|
shift
|
|
;;
|
|
-s|--status)
|
|
SHOW_STATUS=true
|
|
shift
|
|
;;
|
|
-h|--help)
|
|
usage
|
|
exit 0
|
|
;;
|
|
-*)
|
|
print_status $RED "Unknown option: $1"
|
|
usage
|
|
exit 1
|
|
;;
|
|
*)
|
|
WORKER_TYPE="$1"
|
|
shift
|
|
;;
|
|
esac
|
|
done
|
|
|
|
# Handle special actions
|
|
if [[ "$KILL_WORKERS" == "true" ]]; then
|
|
kill_workers
|
|
exit 0
|
|
fi
|
|
|
|
if [[ "$SHOW_STATUS" == "true" ]]; then
|
|
show_status
|
|
exit 0
|
|
fi
|
|
|
|
# Validate worker type
|
|
if [[ -z "$WORKER_TYPE" ]]; then
|
|
print_status $RED "Error: Worker type is required"
|
|
usage
|
|
exit 1
|
|
fi
|
|
|
|
if [[ -z "${WORKERS[$WORKER_TYPE]}" ]]; then
|
|
print_status $RED "Error: Unknown worker type: $WORKER_TYPE"
|
|
print_status $BLUE "Available workers: ${!WORKERS[*]}"
|
|
exit 1
|
|
fi
|
|
|
|
# Load environment
|
|
load_env "$ENV_FILE"
|
|
|
|
# Validate environment
|
|
validate_env
|
|
|
|
# Add PYTHONPATH for imports
|
|
export PYTHONPATH="$WORKERS_DIR:${PYTHONPATH:-}"
|
|
|
|
# Run the requested worker(s)
|
|
if [[ "$WORKER_TYPE" == "all" ]]; then
|
|
run_all_workers "$DETACH" "$LOG_LEVEL" "$CONCURRENCY"
|
|
else
|
|
WORKER_DIR_NAME="${WORKERS[$WORKER_TYPE]}"
|
|
run_worker "$WORKER_DIR_NAME" "$DETACH" "$LOG_LEVEL" "$CONCURRENCY" "$CUSTOM_QUEUES" "$HEALTH_PORT" "$CUSTOM_HOSTNAME"
|
|
fi
|