Files
unstract/backend/workflow_manager/internal_serializers.py
ali 0c5997f9a9 UN-2470 [FEAT] Remove Django dependency from Celery workers with internal APIs (#1494)
* UN-2470 [MISC] Remove Django dependency from Celery workers

This commit introduces a new worker architecture that decouples
Celery workers from Django where possible, enabling support for
gevent/eventlet pool types and reducing worker startup overhead.

Key changes:
- Created separate worker modules (api-deployment, callback, file_processing, general)
- Added internal API endpoints for worker communication
- Implemented Django-free task execution where appropriate
- Added shared utilities and client facades
- Updated container configurations for new worker architecture

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix pre-commit issues: file permissions and ruff errors

Setup the docker for new workers

- Add executable permissions to worker entrypoint files
- Fix import order in namespace package __init__.py
- Remove unused variable api_status in general worker
- Address ruff E402 and F841 errors

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* refactoreed, Dockerfiles,fixes

* flexibility on celery run commands

* added debug logs

* handled filehistory for API

* cleanup

* cleanup

* cloud plugin structure

* minor changes in import plugin

* added notification and logger workers under new worker module

* add docker compatibility for new workers

* handled docker issues

* log consumer worker fixes

* added scheduler worker

* minor env changes

* cleanup the logs

* minor changes in logs

* resolved scheduler worker issues

* cleanup and refactor

* ensuring backward compatibbility to existing wokers

* added configuration internal apis and cache utils

* optimization

* Fix API client singleton pattern to share HTTP sessions

- Fix flawed singleton implementation that was trying to share BaseAPIClient instances
- Now properly shares HTTP sessions between specialized clients
- Eliminates 6x BaseAPIClient initialization by reusing the same underlying session
- Should reduce API deployment orchestration time by ~135ms (from 6 clients to 1 session)
- Added debug logging to verify singleton pattern activation

* cleanup and structuring

* cleanup in callback

* file system connectors  issue

* celery env values changes

* optional gossip

* variables for sync, mingle and gossip

* Fix for file type check

* Task pipeline issue resolving

* api deployement failed response handled

* Task pipline fixes

* updated file history cleanup with active file execution

* pipline status update and workflow ui page execution

* cleanup and resolvinf conflicts

* remove unstract-core from conenctoprs

* Commit uv.lock changes

* uv locks updates

* resolve migration issues

* defer connector-metadtda

* Fix connector migration for production scale

- Add encryption key handling with defer() to prevent decryption failures
- Add final cleanup step to fix duplicate connector names
- Optimize for large datasets with batch processing and bulk operations
- Ensure unique constraint in migration 0004 can be created successfully

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>

* hitl fixes

* minor fixes on hitl

* api_hub related changes

* dockerfile fixes

* api client cache fixes with actual response class

* fix: tags and llm_profile_id

* optimized clear cache

* cleanup

* enhanced logs

* added more handling on is file dir and added loggers

* cleanup the runplatform script

* internal apis are excempting from csrf

* sonal cloud issues

* sona-cloud issues

* resolving sonar cloud issues

* resolving sonar cloud issues

* Delta: added Batch size fix in workers

* comments addressed

* celery configurational changes for new workers

* fiixes in callback regaurding the pipline type check

* change internal url registry logic

* gitignore changes

* gitignore changes

* addressng pr cmmnets and cleanup the codes

* adding missed profiles for v2

* sonal cloud blocker issues resolved

* imlement otel

* Commit uv.lock changes

* handle execution time and some cleanup

* adding user_data in metadata Pr: https://github.com/Zipstack/unstract/pull/1544

* scheduler backward compatibitlity

* replace user_data with custom_data

* Commit uv.lock changes

* celery worker command issue resolved

* enhance package imports in connectors by changing to lazy imports

* Update runner.py by removing the otel from it

Update runner.py by removing the otel from it

Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>

* added delta changes

* handle erro to destination db

* resolve tool instances id validation and hitl queu name in API

* handled direct execution from workflow page to worker and logs

* handle cost logs

* Update health.py

Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* minor log changes

* introducing log consumer scheduler to bulk create, and socket .emit from worker for ws

* Commit uv.lock changes

* time limit or timeout celery config cleanup

* implemented redis client class in worker

* pipline status enum mismatch

* notification worker fixes

* resolve uv lock conflicts

* workflow log fixes

* ws channel name issue resolved. and handling redis down in status tracker, and removing redis keys

* default TTL changed for unified logs

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Signed-off-by: ali <117142933+muhammad-ali-e@users.noreply.github.com>
Signed-off-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Ritwik G <100672805+ritwik-g@users.noreply.github.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
2025-10-03 11:24:07 +05:30

221 lines
8.0 KiB
Python

"""Workflow Manager Internal API Serializers
Handles serialization for workflow execution related internal endpoints.
"""
import logging
from pipeline_v2.models import Pipeline
from rest_framework import serializers
# Import shared dataclasses for type safety and consistency
from unstract.core.data_models import (
FileExecutionStatusUpdateRequest,
WorkflowFileExecutionData,
)
from workflow_manager.file_execution.models import WorkflowFileExecution
from workflow_manager.workflow_v2.enums import ExecutionStatus
from workflow_manager.workflow_v2.models.execution import WorkflowExecution
logger = logging.getLogger(__name__)
class WorkflowExecutionSerializer(serializers.ModelSerializer):
"""Serializer for WorkflowExecution model for internal API."""
workflow_id = serializers.CharField(source="workflow.id", read_only=True)
workflow_name = serializers.CharField(source="workflow.workflow_name", read_only=True)
pipeline_id = serializers.SerializerMethodField()
tags = serializers.SerializerMethodField()
def get_pipeline_id(self, obj):
"""ROOT CAUSE FIX: Return None for pipeline_id if the referenced pipeline doesn't exist.
This prevents callback workers from attempting to update deleted pipelines.
PERFORMANCE: Cache pipeline existence to avoid repeated DB queries.
"""
if not obj.pipeline_id:
return None
# Use instance-level cache to avoid repeated DB queries within same request
cache_key = f"_pipeline_exists_{obj.pipeline_id}"
if hasattr(self, cache_key):
return getattr(self, cache_key)
# Import here to avoid circular imports
from api_v2.models import APIDeployment
try:
# First check if it's a Pipeline
Pipeline.objects.get(id=obj.pipeline_id)
result = str(obj.pipeline_id)
setattr(self, cache_key, result)
return result
except Pipeline.DoesNotExist:
# Not a Pipeline, check if it's an APIDeployment
try:
APIDeployment.objects.get(id=obj.pipeline_id)
result = str(obj.pipeline_id)
setattr(self, cache_key, result)
return result
except APIDeployment.DoesNotExist:
# Neither Pipeline nor APIDeployment exists - return None to prevent stale reference usage
setattr(self, cache_key, None)
return None
def get_tags(self, obj):
"""Serialize tags as full objects with id, name, and description.
This method ensures tags are serialized as:
[{"id": "uuid", "name": "tag_name", "description": "..."}, ...]
instead of just ["uuid1", "uuid2", ...]
"""
try:
return [
{
"id": str(tag.id),
"name": tag.name,
"description": tag.description or "",
}
for tag in obj.tags.all()
]
except Exception as e:
logger.warning(f"Failed to serialize tags for execution {obj.id}: {str(e)}")
return []
class Meta:
model = WorkflowExecution
fields = [
"id",
"workflow_id",
"workflow_name",
"pipeline_id",
"task_id",
"execution_mode",
"execution_method",
"execution_type",
"execution_log_id",
"status",
"result_acknowledged",
"total_files",
"error_message",
"attempts",
"execution_time",
"created_at",
"modified_at",
"tags",
]
read_only_fields = ["id", "created_at", "modified_at"]
class WorkflowFileExecutionSerializer(serializers.ModelSerializer):
"""Serializer for WorkflowFileExecution model for internal API.
Enhanced with shared dataclass integration for type safety.
"""
workflow_execution_id = serializers.CharField(
source="workflow_execution.id", read_only=True
)
class Meta:
model = WorkflowFileExecution
fields = [
"id",
"workflow_execution_id",
"file_name",
"file_path",
"file_size",
"file_hash",
"provider_file_uuid",
"mime_type",
"fs_metadata",
"status",
"execution_error",
"created_at",
"modified_at",
]
read_only_fields = ["id", "created_at", "modified_at"]
def to_dataclass(self, instance=None) -> WorkflowFileExecutionData:
"""Convert serialized data to shared dataclass."""
if instance is None:
instance = self.instance
return WorkflowFileExecutionData.from_dict(self.to_representation(instance))
@classmethod
def from_dataclass(cls, data: WorkflowFileExecutionData) -> dict:
"""Convert shared dataclass to serializer-compatible dict."""
return data.to_dict()
class FileExecutionStatusUpdateSerializer(serializers.Serializer):
"""Serializer for updating file execution status.
Enhanced with shared dataclass integration for type safety.
"""
status = serializers.ChoiceField(choices=ExecutionStatus.choices)
error_message = serializers.CharField(required=False, allow_blank=True)
result = serializers.CharField(required=False, allow_blank=True)
execution_time = serializers.FloatField(required=False, min_value=0)
def to_dataclass(self) -> FileExecutionStatusUpdateRequest:
"""Convert validated data to shared dataclass."""
return FileExecutionStatusUpdateRequest(
status=self.validated_data["status"],
error_message=self.validated_data.get("error_message"),
result=self.validated_data.get("result"),
)
@classmethod
def from_dataclass(cls, data: FileExecutionStatusUpdateRequest):
"""Create serializer from shared dataclass."""
return cls(data=data.to_dict())
class WorkflowExecutionStatusUpdateSerializer(serializers.Serializer):
"""Serializer for updating workflow execution status."""
status = serializers.ChoiceField(choices=ExecutionStatus.choices)
error_message = serializers.CharField(required=False, allow_blank=True)
total_files = serializers.IntegerField(
required=False, min_value=0
) # Allow 0 but backend will only update if > 0
attempts = serializers.IntegerField(required=False, min_value=0)
execution_time = serializers.FloatField(required=False, min_value=0)
class OrganizationContextSerializer(serializers.Serializer):
"""Serializer for organization context information."""
organization_id = serializers.CharField(allow_null=True, required=False)
organization_name = serializers.CharField(required=False, allow_blank=True)
settings = serializers.DictField(required=False)
class WorkflowExecutionContextSerializer(serializers.Serializer):
"""Serializer for complete workflow execution context."""
execution = WorkflowExecutionSerializer()
workflow_definition = serializers.DictField()
source_config = serializers.DictField()
destination_config = serializers.DictField(required=False)
organization_context = OrganizationContextSerializer()
file_executions = serializers.ListField(required=False)
aggregated_usage_cost = serializers.FloatField(required=False, allow_null=True)
class FileBatchCreateSerializer(serializers.Serializer):
"""Serializer for creating file execution batches."""
workflow_execution_id = serializers.UUIDField()
files = serializers.ListField(child=serializers.DictField(), allow_empty=False)
is_api = serializers.BooleanField(default=False)
class FileBatchResponseSerializer(serializers.Serializer):
"""Serializer for file batch creation response."""
batch_id = serializers.CharField()
workflow_execution_id = serializers.CharField()
total_files = serializers.IntegerField()
created_file_executions = serializers.ListField()
skipped_files = serializers.ListField(required=False)