* UN-2470 [MISC] Remove Django dependency from Celery workers This commit introduces a new worker architecture that decouples Celery workers from Django where possible, enabling support for gevent/eventlet pool types and reducing worker startup overhead. Key changes: - Created separate worker modules (api-deployment, callback, file_processing, general) - Added internal API endpoints for worker communication - Implemented Django-free task execution where appropriate - Added shared utilities and client facades - Updated container configurations for new worker architecture 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * Fix pre-commit issues: file permissions and ruff errors Setup the docker for new workers - Add executable permissions to worker entrypoint files - Fix import order in namespace package __init__.py - Remove unused variable api_status in general worker - Address ruff E402 and F841 errors 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * refactoreed, Dockerfiles,fixes * flexibility on celery run commands * added debug logs * handled filehistory for API * cleanup * cleanup * cloud plugin structure * minor changes in import plugin * added notification and logger workers under new worker module * add docker compatibility for new workers * handled docker issues * log consumer worker fixes * added scheduler worker * minor env changes * cleanup the logs * minor changes in logs * resolved scheduler worker issues * cleanup and refactor * ensuring backward compatibbility to existing wokers * added configuration internal apis and cache utils * optimization * Fix API client singleton pattern to share HTTP sessions - Fix flawed singleton implementation that was trying to share BaseAPIClient instances - Now properly shares HTTP sessions between specialized clients - Eliminates 6x BaseAPIClient initialization by reusing the same underlying session - Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session) - Added debug logging to verify singleton pattern activation * cleanup and structuring * cleanup in callback * file system connectors issue * celery env values changes * optional gossip * variables for sync, mingle and gossip * Fix for file type check * Task pipeline issue resolving * api deployement failed response handled * Task pipline fixes * updated file history cleanup with active file execution * pipline status update and workflow ui page execution * cleanup and resolvinf conflicts * remove unstract-core from conenctoprs * Commit uv.lock changes * uv locks updates * resolve migration issues * defer connector-metadtda * Fix connector migration for production scale - Add encryption key handling with defer() to prevent decryption failures - Add final cleanup step to fix duplicate connector names - Optimize for large datasets with batch processing and bulk operations - Ensure unique constraint in migration 0004 can be created successfully 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com> * hitl fixes * minor fixes on hitl * api_hub related changes * dockerfile fixes * api client cache fixes with actual response class * fix: tags and llm_profile_id * optimized clear cache * cleanup * enhanced logs * added more handling on is file dir and added loggers * cleanup the runplatform script * internal apis are excempting from csrf * sonal cloud issues * sona-cloud issues * resolving sonar cloud issues * resolving sonar cloud issues * Delta: added Batch size fix in workers * comments addressed * celery configurational changes for new workers * fiixes in callback regaurding the pipline type check * change internal url registry logic * gitignore changes * gitignore changes * addressng pr cmmnets and cleanup the codes * adding missed profiles for v2 * sonal cloud blocker issues resolved * imlement otel * Commit uv.lock changes * handle execution time and some cleanup * adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544 * scheduler backward compatibitlity * replace user_data with custom_data * Commit uv.lock changes * celery worker command issue resolved * enhance package imports in connectors by changing to lazy imports * Update runner.py by removing the otel from it Update runner.py by removing the otel from it Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> * added delta changes * handle erro to destination db * resolve tool instances id validation and hitl queu name in API * handled direct execution from workflow page to worker and logs * handle cost logs * Update health.py Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * minor log changes * introducing log consumer scheduler to bulk create, and socket .emit from worker for ws * Commit uv.lock changes * time limit or timeout celery config cleanup * implemented redis client class in worker * pipline status enum mismatch * notification worker fixes * resolve uv lock conflicts * workflow log fixes * ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys * default TTL changed for unified logs * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com> Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
104 lines
3.5 KiB
Python
104 lines
3.5 KiB
Python
import logging
|
|
import os
|
|
from importlib import import_module
|
|
from typing import Any
|
|
|
|
from django.apps import apps
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ProcessorConfig:
|
|
"""Loader config for processor plugins."""
|
|
|
|
PLUGINS_APP = "plugins"
|
|
PLUGIN_DIR = "processor"
|
|
MODULE = "module"
|
|
METADATA = "metadata"
|
|
METADATA_NAME = "name"
|
|
METADATA_SERVICE_CLASS = "service_class"
|
|
METADATA_IS_ACTIVE = "is_active"
|
|
|
|
|
|
# Cache for loaded plugins to avoid repeated loading
|
|
_processor_plugins_cache: list[Any] = []
|
|
_plugins_loaded = False
|
|
|
|
|
|
def load_plugins() -> list[Any]:
|
|
"""Iterate through the processor plugins and register them."""
|
|
global _processor_plugins_cache, _plugins_loaded
|
|
|
|
# Return cached plugins if already loaded
|
|
if _plugins_loaded:
|
|
return _processor_plugins_cache
|
|
|
|
plugins_app = apps.get_app_config(ProcessorConfig.PLUGINS_APP)
|
|
package_path = plugins_app.module.__package__
|
|
processor_dir = os.path.join(plugins_app.path, ProcessorConfig.PLUGIN_DIR)
|
|
processor_package_path = f"{package_path}.{ProcessorConfig.PLUGIN_DIR}"
|
|
processor_plugins: list[Any] = []
|
|
|
|
if not os.path.exists(processor_dir):
|
|
logger.info("No processor directory found at %s.", processor_dir)
|
|
return []
|
|
|
|
for item in os.listdir(processor_dir):
|
|
# Loads a plugin if it is in a directory.
|
|
if os.path.isdir(os.path.join(processor_dir, item)):
|
|
processor_module_name = item
|
|
# Loads a plugin if it is a shared library.
|
|
# Module name is extracted from shared library name.
|
|
# `processor.platform_architecture.so` will be file name and
|
|
# `processor` will be the module name.
|
|
elif item.endswith(".so"):
|
|
processor_module_name = item.split(".")[0]
|
|
else:
|
|
continue
|
|
try:
|
|
full_module_path = f"{processor_package_path}.{processor_module_name}"
|
|
module = import_module(full_module_path)
|
|
metadata = getattr(module, ProcessorConfig.METADATA, {})
|
|
|
|
if metadata.get(ProcessorConfig.METADATA_IS_ACTIVE, False):
|
|
processor_plugins.append(
|
|
{
|
|
ProcessorConfig.MODULE: module,
|
|
ProcessorConfig.METADATA: module.metadata,
|
|
}
|
|
)
|
|
logger.info(
|
|
"Loaded processor plugin: %s, is_active: %s",
|
|
module.metadata[ProcessorConfig.METADATA_NAME],
|
|
module.metadata[ProcessorConfig.METADATA_IS_ACTIVE],
|
|
)
|
|
else:
|
|
logger.info(
|
|
"Processor plugin %s is not active.",
|
|
processor_module_name,
|
|
)
|
|
except ModuleNotFoundError as exception:
|
|
logger.error(
|
|
"Error while importing processor plugin: %s",
|
|
exception,
|
|
)
|
|
|
|
if len(processor_plugins) == 0:
|
|
logger.info("No processor plugins found.")
|
|
|
|
# Cache the results for future requests
|
|
_processor_plugins_cache = processor_plugins
|
|
_plugins_loaded = True
|
|
|
|
return processor_plugins
|
|
|
|
|
|
def get_plugin_class_by_name(name: str, plugins: list[Any]) -> Any:
|
|
"""Retrieve a specific plugin class by name."""
|
|
for plugin in plugins:
|
|
metadata = plugin[ProcessorConfig.METADATA]
|
|
if metadata.get(ProcessorConfig.METADATA_NAME) == name:
|
|
return metadata.get(ProcessorConfig.METADATA_SERVICE_CLASS)
|
|
logger.warning("Plugin with name '%s' not found.", name)
|
|
return None
|