UN-2889 [FIX] Handle Celery logger with empty request_id to prevent SIGSEGV crashes (#1591)

* UN-2889 [FIX] Handle Celery logger with empty request_id to prevent SIGSEGV crashes

- Simplified logging filters into RequestIDFilter and OTelFieldFilter
- Removed custom DjangoStyleFormatter and StructuredFormatter classes
- Removed Celery's worker_log_format config that created formatters without filters
- Removed LOG_FORMAT environment variable and all format options
- All workers now use single standardized format with filters always applied

* addressd coderabiit comment

* addressd coderabiit comment
This commit is contained in:
ali
2025-10-15 15:00:10 +05:30
committed by GitHub
parent 7661377e00
commit ebadc955a3
7 changed files with 74 additions and 285 deletions

View File

@@ -190,8 +190,8 @@ LOG_CONSUMER_AUTOSCALE=2,1
# =============================================================================
LOG_LEVEL=INFO
# structured or django
LOG_FORMAT=django
# Note: LOG_FORMAT removed - format is now hardcoded (not configurable)
# All workers use a single standardized format matching Django backend
DEFAULT_LOG_LEVEL=INFO
WORKER_VERSION=1.0.0
WORKER_INSTANCE_ID=dev-01

View File

@@ -51,5 +51,5 @@ class EnvVars:
# Logging configuration
LOG_LEVEL = "LOG_LEVEL"
LOG_FORMAT = "LOG_FORMAT"
LOG_FILE = "LOG_FILE"
# Note: LOG_FORMAT removed - format is now hardcoded (not configurable)

View File

@@ -70,29 +70,15 @@ class WorkerBuilder:
result_backend=config.celery_result_backend,
)
# Add Django-style log format configuration to override Celery's default formats
logging_config = WorkerRegistry.get_logging_config(worker_type)
log_format = os.getenv("LOG_FORMAT", logging_config.get("log_format", "django"))
if log_format.lower() == "django":
# Use Django-style format for both worker and task logs
django_log_format = (
"%(levelname)s : [%(asctime)s]"
"{module:%(module)s process:%(process)d "
"thread:%(thread)d request_id:%(request_id)s "
"trace_id:%(otelTraceID)s span_id:%(otelSpanID)s} :- %(message)s"
)
# Override Celery's default log formats
celery_config.update(
{
"worker_log_format": django_log_format,
"worker_task_log_format": f"[%(task_name)s(%(task_id)s)] {django_log_format}",
# Disable Celery's default logging setup to prevent conflicts
"worker_hijack_root_logger": False,
"worker_log_color": False,
}
)
# Prevent Celery from hijacking our logging configuration
# Our root logger (configured in WorkerBuilder.setup_logging) has the proper
# filters to prevent KeyError on missing fields
celery_config.update(
{
"worker_hijack_root_logger": False, # Use our root logger config
"worker_log_color": False, # Disable colors for consistency
}
)
# Apply any additional overrides
if override_config:
@@ -119,66 +105,37 @@ class WorkerBuilder:
Configured logger instance
"""
logging_config = WorkerRegistry.get_logging_config(worker_type)
# Determine log format from environment or config
log_format = os.getenv("LOG_FORMAT", logging_config.get("log_format", "django"))
log_level = os.getenv("LOG_LEVEL", logging_config.get("log_level", "INFO"))
# Configure worker logging
WorkerLogger.configure(
log_level=log_level,
log_format=log_format,
worker_name=worker_type.to_worker_name(),
)
# Configure Celery's built-in loggers to use the same format
WorkerBuilder._configure_celery_loggers(log_format, log_level)
WorkerBuilder._configure_celery_loggers(log_level)
return WorkerLogger.get_logger(worker_type.to_worker_name())
@staticmethod
def _configure_celery_loggers(log_format: str, log_level: str) -> None:
"""Configure Celery's built-in loggers and root logger to use consistent formatting.
def _configure_celery_loggers(log_level: str) -> None:
"""Configure Celery's built-in loggers to propagate to root logger.
This ensures that ALL loggers (including task execution loggers) use the same format
as the rest of the application, eliminating mixed log formats completely.
Sets log levels for Celery-specific loggers and enables propagation so they
forward log records to the root logger's handlers. Root logger is already
configured by WorkerLogger.configure() with proper filters to prevent
KeyError/SIGSEGV.
This approach avoids clearing root logger handlers (which would drop handlers
from external libraries like OpenTelemetry) and prevents duplicate log output.
Args:
log_format: Log format to use ('django' or 'structured')
log_level: Log level to set
"""
from ..logging.logger import (
DjangoStyleFormatter,
StructuredFormatter,
WorkerFieldFilter,
)
# Choose the appropriate formatter
if log_format.lower() == "django":
formatter = DjangoStyleFormatter()
elif log_format.lower() == "structured":
formatter = StructuredFormatter()
else:
formatter = DjangoStyleFormatter() # Default to Django format
# CRITICAL: Configure root logger to catch ALL loggers
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, log_level.upper()))
# Clear all existing handlers on root logger
root_logger.handlers.clear()
# Add our custom handler to root logger
root_handler = logging.StreamHandler()
root_handler.setFormatter(formatter)
# Add field filter for Django format to ensure required fields
if log_format.lower() == "django":
root_handler.addFilter(WorkerFieldFilter())
root_logger.addHandler(root_handler)
# Configure specific Celery loggers for extra assurance
# Configure specific Celery loggers to propagate to root
# Root logger is already configured by WorkerLogger.configure() with proper
# filters and handlers. We just need to set log levels and enable propagation.
celery_loggers = [
"celery",
"celery.worker",
@@ -204,21 +161,9 @@ class WorkerBuilder:
celery_logger = logging.getLogger(logger_name)
celery_logger.setLevel(getattr(logging, log_level.upper()))
# Remove existing handlers to avoid duplication
celery_logger.handlers.clear()
# Add our custom handler with consistent formatting
handler = logging.StreamHandler()
handler.setFormatter(formatter)
# Add field filter for Django format to ensure required fields
if log_format.lower() == "django":
handler.addFilter(WorkerFieldFilter())
celery_logger.addHandler(handler)
# Disable propagation to avoid duplicate logs (each logger has its own handler)
celery_logger.propagate = False
# Enable propagation so logs flow to root logger's handler
# Root logger already has filters to prevent KeyError/ValueError
celery_logger.propagate = True
@staticmethod
def setup_health_monitoring(

View File

@@ -139,34 +139,29 @@ class WorkerRegistry:
# See shared/models/worker_models.py:get_celery_setting() for hierarchical config
# Use environment variables like CALLBACK_TASK_TIME_LIMIT=3600 or CELERY_TASK_TIME_LIMIT=300
# Logging configurations - All workers use Django format for consistency
# Logging configurations
# Note: Log format is hardcoded (no longer configurable) - see logger.py
# All workers use the same standardized format matching Django backend
_LOGGING_CONFIGS: dict[WorkerType, dict] = {
WorkerType.API_DEPLOYMENT: {
"log_format": "django",
"log_level": "INFO",
},
WorkerType.GENERAL: {
"log_format": "django",
"log_level": "INFO",
},
WorkerType.FILE_PROCESSING: {
"log_format": "django",
"log_level": "INFO",
},
WorkerType.CALLBACK: {
"log_format": "django",
"log_level": "INFO",
},
WorkerType.NOTIFICATION: {
"log_format": "django",
"log_level": "INFO",
},
WorkerType.LOG_CONSUMER: {
"log_format": "django",
"log_level": "INFO",
},
WorkerType.SCHEDULER: {
"log_format": "django",
"log_level": "INFO",
},
}
@@ -256,12 +251,12 @@ class WorkerRegistry:
worker_type: Type of worker
Returns:
Logging configuration dict
Logging configuration dict with log_level
Note: log_format is no longer configurable - hardcoded in logger.py
"""
return cls._LOGGING_CONFIGS.get(
worker_type,
{
"log_format": "structured",
"log_level": "INFO",
},
)

View File

@@ -212,8 +212,8 @@ class WorkerConfig:
# Logging Configuration
log_level: str = field(default_factory=lambda: os.getenv("LOG_LEVEL", "INFO"))
log_format: str = field(default_factory=lambda: os.getenv("LOG_FORMAT", "structured"))
log_file: str | None = field(default_factory=lambda: os.getenv("LOG_FILE"))
# Note: log_format removed - we now use a single standardized format everywhere
# Circuit Breaker Settings
circuit_breaker_failure_threshold: int = field(
@@ -557,8 +557,9 @@ class WorkerConfig:
== "true",
"worker_send_task_events": True,
"task_send_sent_event": True,
"worker_log_format": "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s",
"worker_task_log_format": "[%(asctime)s: %(levelname)s/%(processName)s][%(task_name)s(%(task_id)s)] %(message)s",
# Note: worker_log_format and worker_task_log_format are not set here
# to prevent conflicts. Logging is configured via WorkerBuilder.setup_logging()
# which uses the proven pattern from unstract/core/flask/logging.py
}
def __repr__(self) -> str:

View File

@@ -1,68 +0,0 @@
"""Shared logging configuration for workers to match Django backend format."""
import logging
import logging.config
import os
# Default log level from environment
DEFAULT_LOG_LEVEL = os.environ.get("DEFAULT_LOG_LEVEL", "INFO")
class WorkerFieldFilter(logging.Filter):
"""Filter to add missing fields for worker logging."""
def filter(self, record):
# Add missing fields with default values
for attr in ["request_id", "otelTraceID", "otelSpanID"]:
if not hasattr(record, attr):
setattr(record, attr, "-")
return True
def setup_worker_logging():
"""Setup logging configuration that matches Django backend format."""
logging_config = {
"version": 1,
"disable_existing_loggers": False,
"filters": {
"worker_fields": {"()": "shared.logging_config.WorkerFieldFilter"},
},
"formatters": {
"enriched": {
"format": (
"%(levelname)s : [%(asctime)s]"
"{module:%(module)s process:%(process)d "
"thread:%(thread)d request_id:%(request_id)s "
"trace_id:%(otelTraceID)s span_id:%(otelSpanID)s} :- %(message)s"
),
},
"simple": {
"format": "{levelname} {message}",
"style": "{",
},
},
"handlers": {
"console": {
"level": DEFAULT_LOG_LEVEL,
"class": "logging.StreamHandler",
"filters": ["worker_fields"],
"formatter": "enriched",
},
},
"root": {
"handlers": ["console"],
"level": DEFAULT_LOG_LEVEL,
},
}
# Configure logging
logging.config.dictConfig(logging_config)
return logging.getLogger()
def get_worker_logger(name: str = None) -> logging.Logger:
"""Get a logger configured for worker use."""
if name:
return logging.getLogger(name)
return logging.getLogger()

View File

@@ -4,12 +4,10 @@ Provides structured logging, performance monitoring, and metrics collection for
"""
import functools
import json
import logging
import os
import sys
import time
import traceback
from collections.abc import Callable
from contextlib import contextmanager
from dataclasses import asdict, dataclass
@@ -33,112 +31,33 @@ class LogContext:
request_id: str | None = None
class WorkerFieldFilter(logging.Filter):
"""Filter to add missing fields for worker logging."""
class RequestIDFilter(logging.Filter):
"""Filter to inject request_id into log records.
Adopts the proven pattern from unstract/core/flask/logging.py for consistency.
Normalizes missing or falsy values (None, empty string, etc.) to "-" for
consistent log formatting.
"""
def filter(self, record):
# Add missing fields with default values to match Django backend
for attr in ["request_id", "otelTraceID", "otelSpanID"]:
if not hasattr(record, attr):
setattr(record, attr, "-")
if not getattr(record, "request_id", None):
record.request_id = "-"
return True
class DjangoStyleFormatter(logging.Formatter):
"""Custom formatter to match Django backend logging format exactly."""
class OTelFieldFilter(logging.Filter):
"""Filter to inject OpenTelemetry fields into log records.
def __init__(self, include_context: bool = True):
"""Initialize formatter.
Adopts the proven pattern from unstract/core/flask/logging.py for consistency.
Normalizes missing or falsy values (None, empty string, etc.) to "-" for
consistent log formatting.
"""
Args:
include_context: Whether to include thread-local context in logs
"""
# Use Django backend's exact format
format_string = (
"%(levelname)s : [%(asctime)s]"
"{module:%(module)s process:%(process)d "
"thread:%(thread)d request_id:%(request_id)s "
"trace_id:%(otelTraceID)s span_id:%(otelSpanID)s} :- %(message)s"
)
super().__init__(fmt=format_string)
self.include_context = include_context
class StructuredFormatter(logging.Formatter):
"""Custom formatter for structured JSON logging."""
def __init__(self, include_context: bool = True):
"""Initialize formatter.
Args:
include_context: Whether to include thread-local context in logs
"""
super().__init__()
self.include_context = include_context
def format(self, record: logging.LogRecord) -> str:
"""Format log record as structured JSON."""
# Base log entry
log_entry = {
"timestamp": datetime.now(UTC).isoformat() + "Z",
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"process": record.process,
"thread": record.thread,
}
# Add exception information if present
if record.exc_info:
log_entry["exception"] = {
"type": record.exc_info[0].__name__,
"message": str(record.exc_info[1]),
"traceback": traceback.format_exception(*record.exc_info),
}
# Add extra fields from log record
extra_fields = {}
for key, value in record.__dict__.items():
if key not in {
"name",
"msg",
"args",
"levelname",
"levelno",
"pathname",
"filename",
"module",
"lineno",
"funcName",
"created",
"msecs",
"relativeCreated",
"thread",
"threadName",
"processName",
"process",
"getMessage",
"exc_info",
"exc_text",
"stack_info",
}:
extra_fields[key] = value
if extra_fields:
log_entry["extra"] = extra_fields
# Add context information if available and enabled
if self.include_context and hasattr(_context, "log_context"):
context_dict = asdict(_context.log_context)
# Only include non-None values
context_dict = {k: v for k, v in context_dict.items() if v is not None}
if context_dict:
log_entry["context"] = context_dict
return json.dumps(log_entry, default=str)
def filter(self, record):
for attr in ["otelTraceID", "otelSpanID"]:
if not getattr(record, attr, None):
setattr(record, attr, "-")
return True
class WorkerLogger:
@@ -151,15 +70,16 @@ class WorkerLogger:
def configure(
cls,
log_level: str = "INFO",
log_format: str = "structured",
log_file: str | None = None,
worker_name: str | None = None,
):
"""Configure global logging settings.
Uses a single standardized format matching the Django backend.
Filters ensure required fields are always present to prevent KeyError/SIGSEGV.
Args:
log_level: Logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
log_format: Log format ('structured' for JSON, 'simple' for text)
log_file: Optional log file path
worker_name: Worker name for context
"""
@@ -173,23 +93,22 @@ class WorkerLogger:
# Clear existing handlers
root_logger.handlers.clear()
# Choose formatter
if log_format.lower() == "structured":
formatter = StructuredFormatter()
elif log_format.lower() == "django":
formatter = DjangoStyleFormatter()
else:
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
# Use single standardized format (same as Django backend)
formatter = logging.Formatter(
"%(levelname)s : [%(asctime)s]"
"{module:%(module)s process:%(process)d "
"thread:%(thread)d request_id:%(request_id)s "
"trace_id:%(otelTraceID)s span_id:%(otelSpanID)s} :- %(message)s"
)
# Console handler
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
# Add filter for Django-style format to ensure required fields are present
if log_format.lower() == "django":
console_handler.addFilter(WorkerFieldFilter())
# ALWAYS add filters to ensure required fields are present
# This prevents KeyError/ValueError that cause SIGSEGV
console_handler.addFilter(RequestIDFilter())
console_handler.addFilter(OTelFieldFilter())
root_logger.addHandler(console_handler)
@@ -198,9 +117,9 @@ class WorkerLogger:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
# Add filter for Django-style format to ensure required fields are present
if log_format.lower() == "django":
file_handler.addFilter(WorkerFieldFilter())
# Add filters to file handler as well
file_handler.addFilter(RequestIDFilter())
file_handler.addFilter(OTelFieldFilter())
root_logger.addHandler(file_handler)
@@ -259,9 +178,6 @@ class WorkerLogger:
# Configure logging with registry settings
cls.configure(
log_level=os.getenv("LOG_LEVEL", logging_config.get("log_level", "INFO")),
log_format=os.getenv(
"LOG_FORMAT", logging_config.get("log_format", "structured")
),
worker_name=worker_type.to_worker_name(),
)