feat: Using rabbitMQ as message broker for celery instead of redis (#1340)
* feat: Using rabbitMQ as message broker for celery instead of redis: * minor: Addressed security issue, removed setting of already loaded envs * fix: Updated envs needed to pass CELERY_BROKER_URL * minor: Fixed deps conflict, removed default for broker config in base.py * minor: Added a default to address failing test --------- Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
9fae15ff6f
commit
1f7adb7353
@@ -6,6 +6,7 @@ Contains the backend services for Unstract written with Django and DRF.
|
||||
|
||||
1. Postgres
|
||||
1. Redis
|
||||
1. RabbitMQ
|
||||
|
||||
## Getting started
|
||||
|
||||
@@ -51,7 +52,7 @@ uv run sample_script.py
|
||||
- If you plan to run the django server locally, make sure the dependent services are up (either locally or through docker compose)
|
||||
- Copy `sample.env` into `.env` and update the necessary variables. For eg:
|
||||
|
||||
```
|
||||
```bash
|
||||
DJANGO_SETTINGS_MODULE='backend.settings.dev'
|
||||
DB_HOST='localhost'
|
||||
DB_USER='unstract_dev'
|
||||
@@ -123,6 +124,7 @@ celery -A backend worker --loglevel=info -Q <queue_name>
|
||||
```
|
||||
|
||||
### Autoscaling Workers
|
||||
|
||||
```bash
|
||||
celery -A backend worker --loglevel=info -Q <queue_name> --autoscale=<max_workers>,<min_workers>
|
||||
```
|
||||
@@ -135,7 +137,6 @@ Celery supports autoscaling of worker processes, allowing you to dynamically adj
|
||||
|
||||
- **Min Workers (`min_workers`)**: This is the minimum number of worker processes that will always be running.
|
||||
|
||||
|
||||
### Worker Dashboard
|
||||
|
||||
- We have to ensure the package flower is installed in the current environment
|
||||
@@ -144,8 +145,15 @@ Celery supports autoscaling of worker processes, allowing you to dynamically adj
|
||||
```bash
|
||||
celery -A backend flower
|
||||
```
|
||||
|
||||
This command will start Flower on the default port (5555) and can be accessed via a web browser. Flower provides a user-friendly interface for monitoring and managing Celery tasks
|
||||
|
||||
### Broker Dashboard
|
||||
|
||||
- RabbitMQ provides a web interface for monitoring and managing the broker
|
||||
- Access the dashboard at `http://localhost:15672`
|
||||
- Default credentials: `admin` / `password`
|
||||
- These can be configured with `RABBITMQ_DEFAULT_USER` and `RABBITMQ_DEFAULT_PASS` envs in [essentials.env](../docker/essentials.env)
|
||||
|
||||
## Connecting to Postgres
|
||||
|
||||
@@ -153,17 +161,17 @@ Follow the below steps to connect to the postgres DB running with `docker compos
|
||||
|
||||
1. Exec into a shell within the postgres container
|
||||
|
||||
```
|
||||
```bash
|
||||
docker compose exec -it db bash
|
||||
```
|
||||
|
||||
2. Connect to the db as the specified user
|
||||
1. Connect to the db as the specified user
|
||||
|
||||
```
|
||||
```bash
|
||||
psql -d unstract_db -U unstract_dev
|
||||
```
|
||||
|
||||
3. Execute PSQL commands within the shell.
|
||||
1. Execute PSQL commands within the shell.
|
||||
|
||||
## API Docs
|
||||
|
||||
@@ -179,10 +187,12 @@ a container)
|
||||
## Connectors
|
||||
|
||||
### Google Drive
|
||||
|
||||
The Google Drive connector makes use of [PyDrive2](https://pypi.org/project/PyDrive2/) library and supports only OAuth 2.0 authentication.
|
||||
To set it up, follow the first step higlighted in [Google's docs](https://developers.google.com/identity/protocols/oauth2#1.-obtain-oauth-2.0-credentials-from-the-dynamic_data.setvar.console_name-.) and set the client ID and client secret
|
||||
as envs in `backend/.env`
|
||||
```
|
||||
|
||||
```bash
|
||||
GOOGLE_OAUTH2_KEY="<client-id>"
|
||||
GOOGLE_OAUTH2_SECRET="<client-secret>"
|
||||
```
|
||||
@@ -198,13 +208,13 @@ Information regarding how tools are added and maintained can be found [here](/un
|
||||
|
||||
- If its the first time, create a super user and follow the on-screen instructions
|
||||
|
||||
```
|
||||
```bash
|
||||
python manage.py createsuperuser
|
||||
```
|
||||
|
||||
- Register your models in `<app>/admin.py`, for example
|
||||
|
||||
```
|
||||
```bash
|
||||
from django.contrib import admin
|
||||
from .models import Prompt
|
||||
|
||||
@@ -217,7 +227,7 @@ admin.site.register(Prompt)
|
||||
|
||||
Units tests are run with [pytest](https://docs.pytest.org/en/7.3.x/) and [pytest-django](https://pytest-django.readthedocs.io/en/latest/index.html)
|
||||
|
||||
```
|
||||
```bash
|
||||
pytest
|
||||
pytest prompt # To run for an app named prompt
|
||||
```
|
||||
|
||||
@@ -2,8 +2,6 @@ from urllib.parse import quote_plus
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from unstract.core.utilities import UnstractUtils
|
||||
|
||||
|
||||
class CeleryConfig:
|
||||
"""Specifies celery configuration
|
||||
@@ -11,10 +9,6 @@ class CeleryConfig:
|
||||
Refer https://docs.celeryq.dev/en/stable/userguide/configuration.html
|
||||
"""
|
||||
|
||||
# Envs defined for configuration with Unstract
|
||||
CELERY_BROKER_URL = "CELERY_BROKER_URL"
|
||||
CELERY_BROKER_VISIBILITY_TIMEOUT = "CELERY_BROKER_VISIBILITY_TIMEOUT"
|
||||
|
||||
# Result backend configuration
|
||||
result_backend = (
|
||||
f"db+postgresql://{settings.DB_USER}:{quote_plus(settings.DB_PASSWORD)}"
|
||||
@@ -22,11 +16,7 @@ class CeleryConfig:
|
||||
)
|
||||
|
||||
# Broker URL configuration
|
||||
broker_url = UnstractUtils.get_env(
|
||||
CELERY_BROKER_URL,
|
||||
f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}",
|
||||
raise_err=True,
|
||||
)
|
||||
broker_url = settings.CELERY_BROKER_URL
|
||||
|
||||
# Task serialization and content settings
|
||||
accept_content = ["json"]
|
||||
@@ -41,9 +31,3 @@ class CeleryConfig:
|
||||
beat_scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
|
||||
|
||||
task_acks_late = True
|
||||
# Large timeout avoids worker to pick up same unacknowledged task again
|
||||
broker_transport_options = {
|
||||
"visibility_timeout": float(
|
||||
UnstractUtils.get_env(CELERY_BROKER_VISIBILITY_TIMEOUT, 7200)
|
||||
)
|
||||
}
|
||||
|
||||
@@ -39,4 +39,10 @@ queues_to_purge = [ExecutionLogConstants.CELERY_QUEUE_NAME]
|
||||
with app.connection() as connection:
|
||||
channel = connection.channel()
|
||||
for queue_name in queues_to_purge:
|
||||
channel.queue_purge(queue_name)
|
||||
try:
|
||||
# Declare the queue (will be created if it doesn't exist)
|
||||
channel.queue_declare(queue=queue_name, durable=True, auto_delete=False)
|
||||
channel.queue_purge(queue_name)
|
||||
logger.info(f"Successfully purged queue: {queue_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not purge queue {queue_name}: {str(e)}")
|
||||
|
||||
@@ -13,6 +13,7 @@ import os
|
||||
from pathlib import Path
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import httpx
|
||||
from dotenv import find_dotenv, load_dotenv
|
||||
from utils.common_utils import CommonUtils
|
||||
|
||||
@@ -45,6 +46,15 @@ BASE_DIR = Path(__file__).resolve().parent.parent
|
||||
# Load default log from env
|
||||
DEFAULT_LOG_LEVEL = os.environ.get("DEFAULT_LOG_LEVEL", "INFO")
|
||||
|
||||
# Celery Broker Configuration
|
||||
CELERY_BROKER_BASE_URL = get_required_setting("CELERY_BROKER_BASE_URL")
|
||||
CELERY_BROKER_USER = get_required_setting("CELERY_BROKER_USER")
|
||||
CELERY_BROKER_PASS = get_required_setting("CELERY_BROKER_PASS")
|
||||
CELERY_BROKER_URL = str(
|
||||
httpx.URL(CELERY_BROKER_BASE_URL).copy_with(
|
||||
username=CELERY_BROKER_USER, password=CELERY_BROKER_PASS
|
||||
)
|
||||
)
|
||||
|
||||
ENV_FILE = find_dotenv()
|
||||
if ENV_FILE:
|
||||
@@ -397,13 +407,13 @@ AUTH_PASSWORD_VALIDATORS = [
|
||||
"UserAttributeSimilarityValidator",
|
||||
},
|
||||
{
|
||||
"NAME": "django.contrib.auth.password_validation." "MinimumLengthValidator",
|
||||
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
|
||||
},
|
||||
{
|
||||
"NAME": "django.contrib.auth.password_validation." "CommonPasswordValidator",
|
||||
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
|
||||
},
|
||||
{
|
||||
"NAME": "django.contrib.auth.password_validation." "NumericPasswordValidator",
|
||||
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
|
||||
},
|
||||
]
|
||||
|
||||
|
||||
@@ -36,13 +36,10 @@ activate_venv
|
||||
|
||||
# This implementation is easier than docker compose run sanity checks and usage parameters
|
||||
echo -e "${blue_text}Running schema creation command...${default_text}"
|
||||
DB_HOST="localhost" REDIS_HOST="localhost" CELERY_BROKER_URL="redis://localhost:6379" \
|
||||
python manage.py create_schema || { echo "Schema creation failed"; exit 1; }
|
||||
|
||||
echo -e "${blue_text}Running schema migration command...${default_text}"
|
||||
DB_HOST="localhost" REDIS_HOST="localhost" CELERY_BROKER_URL="redis://localhost:6379" \
|
||||
python manage.py migrate || { echo "Schema migration failed"; exit 1; }
|
||||
|
||||
echo -e "${blue_text}Running data migration command...${default_text}"
|
||||
DB_HOST="localhost" REDIS_HOST="localhost" CELERY_BROKER_URL="redis://localhost:6379" \
|
||||
SCHEMAS_TO_MIGRATE=_ALL_ python manage.py migrate_to_v2 || { echo "Data migration failed"; exit 1; }
|
||||
|
||||
@@ -15,7 +15,7 @@ classifiers = [
|
||||
dependencies = [
|
||||
"Authlib==1.2.1", # For Auth plugins
|
||||
"boto3~=1.34.0", # For Unstract-cloud-storage
|
||||
"celery>=5.3.4", # For Celery
|
||||
"celery[amqp]>=5.3.4", # For Celery
|
||||
"flower>=2.0.1", # Celery Monitoring
|
||||
"cron-descriptor==1.4.0", # For cron string description
|
||||
"cryptography>=41.0.7",
|
||||
@@ -44,6 +44,7 @@ dependencies = [
|
||||
"azure-mgmt-apimanagement==3.0.0",
|
||||
"croniter>=3.0.3",
|
||||
"django-filter>=24.3",
|
||||
"httpx>=0.27.0",
|
||||
# Hence required to add all indirect local dependencies too here.
|
||||
"unstract-connectors",
|
||||
"unstract-core",
|
||||
|
||||
@@ -108,7 +108,6 @@ SYSTEM_ADMIN_EMAIL="admin@abc.com"
|
||||
# Set Django Session Expiry Time (in seconds)
|
||||
SESSION_COOKIE_AGE=86400
|
||||
|
||||
|
||||
# Control async extraction of LLMWhisperer
|
||||
# Time in seconds to wait before polling LLMWhisperer's status API
|
||||
ADAPTER_LLMW_POLL_INTERVAL=30
|
||||
@@ -129,9 +128,9 @@ LOGS_EXPIRATION_TIME_IN_SECOND=86400
|
||||
|
||||
# Celery Configuration
|
||||
# Used by celery and to connect to queue to push logs
|
||||
CELERY_BROKER_URL="redis://unstract-redis:6379"
|
||||
# Redis broker visibility set to 1 day
|
||||
CELERY_BROKER_VISIBILITY_TIMEOUT=86400
|
||||
CELERY_BROKER_BASE_URL="amqp://unstract-rabbitmq:5672//"
|
||||
CELERY_BROKER_USER=admin
|
||||
CELERY_BROKER_PASS=password
|
||||
|
||||
# Indexing flag to prevent re-index
|
||||
INDEXING_FLAG_TTL=1800
|
||||
|
||||
6
backend/uv.lock
generated
6
backend/uv.lock
generated
@@ -3879,6 +3879,7 @@ dependencies = [
|
||||
{ name = "drf-yasg" },
|
||||
{ name = "flower" },
|
||||
{ name = "gcsfs" },
|
||||
{ name = "httpx" },
|
||||
{ name = "psycopg2-binary" },
|
||||
{ name = "python-dotenv" },
|
||||
{ name = "python-magic" },
|
||||
@@ -3924,7 +3925,7 @@ requires-dist = [
|
||||
{ name = "azure-identity", specifier = "==1.16.0" },
|
||||
{ name = "azure-mgmt-apimanagement", specifier = "==3.0.0" },
|
||||
{ name = "boto3", specifier = "~=1.34.0" },
|
||||
{ name = "celery", specifier = ">=5.3.4" },
|
||||
{ name = "celery", extras = ["amqp"], specifier = ">=5.3.4" },
|
||||
{ name = "cron-descriptor", specifier = "==1.4.0" },
|
||||
{ name = "croniter", specifier = ">=3.0.3" },
|
||||
{ name = "cryptography", specifier = ">=41.0.7" },
|
||||
@@ -3940,6 +3941,7 @@ requires-dist = [
|
||||
{ name = "drf-yasg", specifier = "==1.21.7" },
|
||||
{ name = "flower", specifier = ">=2.0.1" },
|
||||
{ name = "gcsfs", specifier = "==2024.10.0" },
|
||||
{ name = "httpx", specifier = ">=0.27.0" },
|
||||
{ name = "psycopg2-binary", specifier = "==2.9.9" },
|
||||
{ name = "python-dotenv", specifier = "==1.0.0" },
|
||||
{ name = "python-magic", specifier = "==0.4.27" },
|
||||
@@ -4031,6 +4033,7 @@ name = "unstract-core"
|
||||
version = "0.0.1"
|
||||
source = { editable = "../unstract/core" }
|
||||
dependencies = [
|
||||
{ name = "httpx" },
|
||||
{ name = "kombu" },
|
||||
{ name = "redis" },
|
||||
{ name = "requests" },
|
||||
@@ -4039,6 +4042,7 @@ dependencies = [
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "flask", marker = "extra == 'flask'", specifier = "~=3.1.0" },
|
||||
{ name = "httpx", specifier = ">=0.27.0" },
|
||||
{ name = "kombu", specifier = "~=5.5.3" },
|
||||
{ name = "redis", specifier = "~=5.2.1" },
|
||||
{ name = "requests", specifier = "==2.31.0" },
|
||||
|
||||
@@ -134,6 +134,19 @@ services:
|
||||
env_file:
|
||||
- ./essentials.env
|
||||
|
||||
rabbitmq:
|
||||
image: rabbitmq:4.1.0-management
|
||||
container_name: unstract-rabbitmq
|
||||
hostname: unstract-rabbit
|
||||
restart: unless-stopped
|
||||
env_file:
|
||||
- ./essentials.env
|
||||
ports:
|
||||
- "5672:5672" # AMQP port
|
||||
- "15672:15672" # Management UI port
|
||||
volumes:
|
||||
- rabbitmq_data:/var/lib/rabbitmq
|
||||
|
||||
volumes:
|
||||
flipt_data:
|
||||
minio_data:
|
||||
@@ -141,3 +154,4 @@ volumes:
|
||||
qdrant_data:
|
||||
redis_data:
|
||||
prompt_studio_data:
|
||||
rabbitmq_data:
|
||||
|
||||
@@ -15,6 +15,7 @@ services:
|
||||
depends_on:
|
||||
- db
|
||||
- redis
|
||||
- rabbitmq
|
||||
- reverse-proxy
|
||||
- minio
|
||||
- createbuckets
|
||||
@@ -41,11 +42,12 @@ services:
|
||||
container_name: unstract-worker
|
||||
restart: unless-stopped
|
||||
entrypoint: .venv/bin/celery
|
||||
command: "-A backend worker --loglevel=info -Q celery --autoscale=${WORKER_AUTOSCALE}"
|
||||
command: "-A backend worker --loglevel=info -Q celery,celery_api_deployments --autoscale=${WORKER_AUTOSCALE}"
|
||||
env_file:
|
||||
- ../backend/.env
|
||||
depends_on:
|
||||
- redis
|
||||
- rabbitmq
|
||||
- db
|
||||
environment:
|
||||
- ENVIRONMENT=development
|
||||
- APPLICATION_NAME=unstract-worker
|
||||
@@ -66,44 +68,26 @@ services:
|
||||
env_file:
|
||||
- ../backend/.env
|
||||
depends_on:
|
||||
- redis
|
||||
- rabbitmq
|
||||
- db
|
||||
environment:
|
||||
- ENVIRONMENT=development
|
||||
- APPLICATION_NAME=unstract-worker-logging
|
||||
labels:
|
||||
- traefik.enable=false
|
||||
|
||||
# Celery worker for handling API deployment tasks
|
||||
worker-api-deployment:
|
||||
image: unstract/backend:${VERSION}
|
||||
container_name: unstract-worker-api-deployment
|
||||
restart: unless-stopped
|
||||
entrypoint: .venv/bin/celery
|
||||
command: "-A backend worker --loglevel=info -Q celery_api_deployments --autoscale=${WORKER_API_DEPLOYMENTS_AUTOSCALE}"
|
||||
env_file:
|
||||
- ../backend/.env
|
||||
depends_on:
|
||||
- redis
|
||||
environment:
|
||||
- ENVIRONMENT=development
|
||||
- APPLICATION_NAME=unstract-worker-api-deployment
|
||||
labels:
|
||||
- traefik.enable=false
|
||||
volumes:
|
||||
- ./workflow_data:/data
|
||||
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
|
||||
|
||||
# Celery worker for handling file processing tasks
|
||||
worker-file-processing:
|
||||
image: unstract/backend:${VERSION}
|
||||
container_name: unstract-worker-file-processing
|
||||
restart: unless-stopped
|
||||
entrypoint: .venv/bin/celery
|
||||
command: "-A backend.workers.file_processing worker --loglevel=info -Q file_processing --autoscale=${WORKER_FILE_PROCESSING_AUTOSCALE}"
|
||||
command: "-A backend.workers.file_processing worker --loglevel=info -Q file_processing,api_file_processing --autoscale=${WORKER_FILE_PROCESSING_AUTOSCALE}"
|
||||
env_file:
|
||||
- ../backend/.env
|
||||
depends_on:
|
||||
- redis
|
||||
- rabbitmq
|
||||
- db
|
||||
environment:
|
||||
- ENVIRONMENT=development
|
||||
- APPLICATION_NAME=unstract-worker-file-processing
|
||||
@@ -113,21 +97,20 @@ services:
|
||||
- ./workflow_data:/data
|
||||
- ${TOOL_REGISTRY_CONFIG_SRC_PATH}:/data/tool_registry_config
|
||||
|
||||
|
||||
# Celery worker for handling API file processing tasks
|
||||
worker-api-file-processing:
|
||||
worker-file-processing-callback:
|
||||
image: unstract/backend:${VERSION}
|
||||
container_name: unstract-worker-api-file-processing
|
||||
container_name: unstract-worker-file-processing-callback
|
||||
restart: unless-stopped
|
||||
entrypoint: .venv/bin/celery
|
||||
command: "-A backend.workers.file_processing worker --loglevel=info -Q api_file_processing --autoscale=${WORKER_API_FILE_PROCESSING_AUTOSCALE}"
|
||||
command: "-A backend.workers.file_processing_callback worker --loglevel=info -Q file_processing_callback,api_file_processing_callback --autoscale=${WORKER_FILE_PROCESSING_CALLBACK_AUTOSCALE}"
|
||||
env_file:
|
||||
- ../backend/.env
|
||||
depends_on:
|
||||
- redis
|
||||
- rabbitmq
|
||||
- db
|
||||
environment:
|
||||
- ENVIRONMENT=development
|
||||
- APPLICATION_NAME=unstract-worker-api-file-processing
|
||||
- APPLICATION_NAME=unstract-worker-file-processing-callback
|
||||
labels:
|
||||
- traefik.enable=false
|
||||
volumes:
|
||||
@@ -146,8 +129,7 @@ services:
|
||||
depends_on:
|
||||
- worker
|
||||
- worker-logging
|
||||
- worker-api-deployment
|
||||
- redis
|
||||
- rabbitmq
|
||||
labels:
|
||||
- traefik.enable=false
|
||||
ports:
|
||||
@@ -172,7 +154,7 @@ services:
|
||||
- ./essentials.env
|
||||
depends_on:
|
||||
- db
|
||||
- redis
|
||||
- rabbitmq
|
||||
environment:
|
||||
- ENVIRONMENT=development
|
||||
- APPLICATION_NAME=unstract-celery-beat
|
||||
@@ -215,6 +197,7 @@ services:
|
||||
- db
|
||||
- minio
|
||||
- createbuckets
|
||||
- rabbitmq
|
||||
ports:
|
||||
- "3003:3003"
|
||||
env_file:
|
||||
@@ -252,6 +235,7 @@ services:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
depends_on:
|
||||
- redis
|
||||
- rabbitmq
|
||||
labels:
|
||||
- traefik.enable=false
|
||||
|
||||
|
||||
@@ -12,7 +12,3 @@ services:
|
||||
worker-logging:
|
||||
profiles:
|
||||
- high_memory
|
||||
|
||||
worker-api-deployment:
|
||||
profiles:
|
||||
- high_memory
|
||||
|
||||
@@ -7,10 +7,7 @@ TOOL_REGISTRY_CONFIG_SRC_PATH="${PWD}/../unstract/tool-registry/tool_registry_co
|
||||
# Format: <max_workers>,<min_workers>
|
||||
# Hint: The max value (max_workers) is related to your CPU resources and the level of concurrency you need.
|
||||
# Always monitor system performance and adjust the max value as needed.
|
||||
WORKER_API_DEPLOYMENTS_AUTOSCALE=4,1
|
||||
WORKER_LOGGING_AUTOSCALE=4,1
|
||||
WORKER_AUTOSCALE=4,1
|
||||
WORKER_FILE_PROCESSING_AUTOSCALE=4,1
|
||||
WORKER_API_FILE_PROCESSING_AUTOSCALE=4,1
|
||||
WORKER_FILE_PROCESSING_CALLBACK_AUTOSCALE=4,1
|
||||
WORKER_API_FILE_PROCESSING_CALLBACK_AUTOSCALE=4,1
|
||||
|
||||
@@ -16,3 +16,7 @@ FLIPT_DB_URL="postgres://${POSTGRES_USER}:${POSTGRES_PASSWORD}@db:5432/${POSTGRE
|
||||
QDRANT_USER=unstract_vector_dev
|
||||
QDRANT_PASS=unstract_vector_pass
|
||||
QDRANT_DB=unstract_vector_db
|
||||
|
||||
# RabbitMQ related envs
|
||||
RABBITMQ_DEFAULT_USER=admin
|
||||
RABBITMQ_DEFAULT_PASS=password
|
||||
|
||||
@@ -18,7 +18,9 @@ LOG_LEVEL=INFO
|
||||
|
||||
### Env from `unstract-core` ###
|
||||
# Celery for PublishLogs
|
||||
CELERY_BROKER_URL="redis://unstract-redis:6379"
|
||||
CELERY_BROKER_BASE_URL="amqp://unstract-rabbitmq:5672//"
|
||||
CELERY_BROKER_USER=admin
|
||||
CELERY_BROKER_PASS=password
|
||||
# Logs Expiry of 24 hours
|
||||
LOGS_EXPIRATION_TIME_IN_SECOND=86400
|
||||
|
||||
|
||||
@@ -39,7 +39,9 @@ sudo ln -s "$HOME/.docker/run/docker.sock" /var/run/docker.sock
|
||||
|
||||
| Variable | Description |
|
||||
| -------------------------- |-----------------------------------------------------------------------------------------------|
|
||||
| `CELERY_BROKER_URL` | URL for Celery's message broker, used to queue tasks. Must match backend configuration. |
|
||||
| `CELERY_BROKER_BASE_URL` | Base URL for Celery's message broker, used to queue tasks. Must match backend configuration. |
|
||||
| `CELERY_BROKER_USER` | Username for Celery's message broker. |
|
||||
| `CELERY_BROKER_PASS` | Password for Celery's message broker. |
|
||||
| `TOOL_CONTAINER_NETWORK` | Network used for running tool containers. |
|
||||
| `TOOL_CONTAINER_LABELS` | Labels applied to tool containers for observability [Optional]. |
|
||||
| `EXECUTION_DATA_DIR` | Target mount directory within tool containers. (Default: "/data") |
|
||||
|
||||
@@ -1,4 +1,7 @@
|
||||
CELERY_BROKER_URL = "redis://unstract-redis:6379"
|
||||
# To pass to tool-sidecar for Kombu's connection
|
||||
CELERY_BROKER_BASE_URL="amqp://unstract-rabbitmq:5672//"
|
||||
CELERY_BROKER_USER=admin
|
||||
CELERY_BROKER_PASS=password
|
||||
|
||||
TOOL_CONTAINER_NETWORK="unstract-network"
|
||||
TOOL_CONTAINER_LABELS="[]"
|
||||
|
||||
@@ -39,4 +39,6 @@ class Env:
|
||||
REDIS_PORT = "REDIS_PORT"
|
||||
REDIS_USER = "REDIS_USER"
|
||||
REDIS_PASSWORD = "REDIS_PASSWORD"
|
||||
CELERY_BROKER_URL = "CELERY_BROKER_URL"
|
||||
CELERY_BROKER_BASE_URL = "CELERY_BROKER_BASE_URL"
|
||||
CELERY_BROKER_USER = "CELERY_BROKER_USER"
|
||||
CELERY_BROKER_PASS = "CELERY_BROKER_PASS"
|
||||
|
||||
@@ -231,7 +231,9 @@ class UnstractRunner:
|
||||
"FILE_EXECUTION_ID": file_execution_id,
|
||||
"MESSAGING_CHANNEL": messaging_channel,
|
||||
"LOG_LEVEL": os.getenv(Env.LOG_LEVEL, "INFO"),
|
||||
"CELERY_BROKER_URL": os.getenv(Env.CELERY_BROKER_URL),
|
||||
"CELERY_BROKER_BASE_URL": os.getenv(Env.CELERY_BROKER_BASE_URL),
|
||||
"CELERY_BROKER_USER": os.getenv(Env.CELERY_BROKER_USER),
|
||||
"CELERY_BROKER_PASS": os.getenv(Env.CELERY_BROKER_PASS),
|
||||
"CONTAINER_NAME": container_name,
|
||||
}
|
||||
sidecar_config = self.client.get_container_run_config(
|
||||
|
||||
@@ -20,7 +20,9 @@ class Env:
|
||||
REDIS_PORT = "REDIS_PORT"
|
||||
REDIS_USER = "REDIS_USER"
|
||||
REDIS_PASSWORD = "REDIS_PASSWORD"
|
||||
CELERY_BROKER_URL = "CELERY_BROKER_URL"
|
||||
CELERY_BROKER_BASE_URL = "CELERY_BROKER_BASE_URL"
|
||||
CELERY_BROKER_USER = "CELERY_BROKER_USER"
|
||||
CELERY_BROKER_PASS = "CELERY_BROKER_PASS"
|
||||
TOOL_INSTANCE_ID = "TOOL_INSTANCE_ID"
|
||||
EXECUTION_ID = "EXECUTION_ID"
|
||||
ORGANIZATION_ID = "ORGANIZATION_ID"
|
||||
|
||||
@@ -255,7 +255,10 @@ def main():
|
||||
redis_port = os.getenv(Env.REDIS_PORT)
|
||||
redis_user = os.getenv(Env.REDIS_USER)
|
||||
redis_password = os.getenv(Env.REDIS_PASSWORD)
|
||||
celery_broker_url = os.getenv(Env.CELERY_BROKER_URL)
|
||||
# Needed for Kombu (used from unstract-core)
|
||||
celery_broker_base_url = os.getenv(Env.CELERY_BROKER_BASE_URL)
|
||||
celery_broker_user = os.getenv(Env.CELERY_BROKER_USER)
|
||||
celery_broker_pass = os.getenv(Env.CELERY_BROKER_PASS)
|
||||
|
||||
# Get execution parameters from environment
|
||||
tool_instance_id = os.getenv(Env.TOOL_INSTANCE_ID)
|
||||
@@ -275,7 +278,9 @@ def main():
|
||||
Env.LOG_PATH: log_path,
|
||||
Env.REDIS_HOST: redis_host,
|
||||
Env.REDIS_PORT: redis_port,
|
||||
Env.CELERY_BROKER_URL: celery_broker_url,
|
||||
Env.CELERY_BROKER_BASE_URL: celery_broker_base_url,
|
||||
Env.CELERY_BROKER_USER: celery_broker_user,
|
||||
Env.CELERY_BROKER_PASS: celery_broker_pass,
|
||||
}
|
||||
|
||||
logger.info(f"Log processor started with params: {required_params}")
|
||||
|
||||
@@ -16,6 +16,7 @@ dependencies = [
|
||||
"redis~=5.2.1",
|
||||
"requests==2.31.0",
|
||||
"kombu~=5.5.3",
|
||||
"httpx>=0.27.0",
|
||||
]
|
||||
|
||||
[project.optional-dependencies]
|
||||
|
||||
@@ -6,6 +6,7 @@ import traceback
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import redis
|
||||
from kombu import Connection
|
||||
|
||||
@@ -13,7 +14,13 @@ from unstract.core.constants import LogEventArgument, LogProcessingTask
|
||||
|
||||
|
||||
class LogPublisher:
|
||||
kombu_conn = Connection(os.environ.get("CELERY_BROKER_URL"))
|
||||
broker_url = str(
|
||||
httpx.URL(os.getenv("CELERY_BROKER_BASE_URL", "amqp://")).copy_with(
|
||||
username=os.getenv("CELERY_BROKER_USER"),
|
||||
password=os.getenv("CELERY_BROKER_PASS"),
|
||||
)
|
||||
)
|
||||
kombu_conn = Connection(broker_url)
|
||||
r = redis.Redis(
|
||||
host=os.environ.get("REDIS_HOST"),
|
||||
port=os.environ.get("REDIS_PORT", 6379),
|
||||
|
||||
71
unstract/core/uv.lock
generated
71
unstract/core/uv.lock
generated
@@ -14,6 +14,20 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/26/99/fc813cd978842c26c82534010ea849eee9ab3a13ea2b74e95cb9c99e747b/amqp-5.3.1-py3-none-any.whl", hash = "sha256:43b3319e1b4e7d1251833a93d672b4af1e40f3d632d479b98661a95f117880a2", size = 50944 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "anyio"
|
||||
version = "4.9.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "idna" },
|
||||
{ name = "sniffio" },
|
||||
{ name = "typing-extensions" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/95/7d/4c1bd541d4dffa1b52bd83fb8527089e097a106fc90b467a7313b105f840/anyio-4.9.0.tar.gz", hash = "sha256:673c0c244e15788651a4ff38710fea9675823028a6f08a5eda409e0c9840a028", size = 190949 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/a1/ee/48ca1a7c89ffec8b6a0c5d02b89c305671d5ffd8d3c94acf8b8c408575bb/anyio-4.9.0-py3-none-any.whl", hash = "sha256:9f76d541cad6e36af7beb62e978876f3b41e3e04f2c1fbf0884604c0a9c4d93c", size = 100916 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "blinker"
|
||||
version = "1.9.0"
|
||||
@@ -91,6 +105,43 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/af/47/93213ee66ef8fae3b93b3e29206f6b251e65c97bd91d8e1c5596ef15af0a/flask-3.1.0-py3-none-any.whl", hash = "sha256:d667207822eb83f1c4b50949b1623c8fc8d51f2341d65f72e1a1815397551136", size = 102979 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h11"
|
||||
version = "0.16.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "httpcore"
|
||||
version = "1.0.9"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "certifi" },
|
||||
{ name = "h11" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "httpx"
|
||||
version = "0.28.1"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "anyio" },
|
||||
{ name = "certifi" },
|
||||
{ name = "httpcore" },
|
||||
{ name = "idna" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "idna"
|
||||
version = "3.10"
|
||||
@@ -177,6 +228,24 @@ wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/70/8e/0e2d847013cb52cd35b38c009bb167a1a26b2ce6cd6965bf26b47bc0bf44/requests-2.31.0-py3-none-any.whl", hash = "sha256:58cd2187c01e70e6e26505bca751777aa9f2ee0b7f4300988b709f44e013003f", size = 62574 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "sniffio"
|
||||
version = "1.3.1"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "typing-extensions"
|
||||
version = "4.14.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/d1/bc/51647cd02527e87d05cb083ccc402f93e441606ff1f01739a62c8ad09ba5/typing_extensions-4.14.0.tar.gz", hash = "sha256:8676b788e32f02ab42d9e7c61324048ae4c6d844a399eebace3d4979d75ceef4", size = 107423 }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/69/e0/552843e0d356fbb5256d21449fa957fa4eff3bbc135a74a691ee70c7c5da/typing_extensions-4.14.0-py3-none-any.whl", hash = "sha256:a1514509136dd0b477638fc68d6a91497af5076466ad0fa6c338e44e359944af", size = 43839 },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tzdata"
|
||||
version = "2025.2"
|
||||
@@ -191,6 +260,7 @@ name = "unstract-core"
|
||||
version = "0.0.1"
|
||||
source = { virtual = "." }
|
||||
dependencies = [
|
||||
{ name = "httpx" },
|
||||
{ name = "kombu" },
|
||||
{ name = "redis" },
|
||||
{ name = "requests" },
|
||||
@@ -204,6 +274,7 @@ flask = [
|
||||
[package.metadata]
|
||||
requires-dist = [
|
||||
{ name = "flask", marker = "extra == 'flask'", specifier = "~=3.1.0" },
|
||||
{ name = "httpx", specifier = ">=0.27.0" },
|
||||
{ name = "kombu", specifier = "~=5.5.3" },
|
||||
{ name = "redis", specifier = "~=5.2.1" },
|
||||
{ name = "requests", specifier = "==2.31.0" },
|
||||
|
||||
Reference in New Issue
Block a user