- Deleted final.md, fix_async_fixtures.py, fix_cog_tests.py, fix_fixture_scoping.py, and run_race_condition_tests.sh as they are no longer needed. - Updated main.py to enhance logging and error handling. - Refactored various components for improved performance and maintainability. - Introduced new logging utilities for better context and performance monitoring. - Ensured all datetime operations are timezone-aware and consistent across the codebase.
1156 lines
46 KiB
Python
1156 lines
46 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Discord Voice Chat Quote Bot - Main Entry Point
|
|
|
|
An advanced AI-powered Discord bot that continuously records, analyzes, and curates
|
|
memorable quotes from voice channel conversations using persistent 120-second audio clips.
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING, Dict, Optional, TypeVar
|
|
|
|
import discord
|
|
from discord.ext import commands
|
|
from dotenv import load_dotenv
|
|
|
|
if TYPE_CHECKING:
|
|
from services.audio.laughter_detection import LaughterDetector
|
|
from services.audio.speaker_recognition import SpeakerRecognitionService
|
|
from services.audio.transcription_service import TranscriptionService
|
|
from services.audio.tts_service import TTSService
|
|
from services.interaction.feedback_system import FeedbackSystem
|
|
from services.interaction.user_assisted_tagging import UserAssistedTaggingService
|
|
from services.monitoring.health_monitor import HealthMonitor
|
|
from services.quotes.quote_explanation import QuoteExplanationService
|
|
|
|
from config.settings import Settings
|
|
from core.ai_manager import AIProviderManager
|
|
from core.consent_manager import ConsentManager
|
|
from core.database import DatabaseManager
|
|
from core.memory_manager import MemoryManager
|
|
from services.audio.audio_recorder import AudioRecorderService
|
|
from services.automation.response_scheduler import ResponseScheduler
|
|
from services.quotes.quote_analyzer import QuoteAnalyzer
|
|
from utils.audio_processor import AudioProcessor
|
|
from utils.logging_config import get_context_logger, get_performance_logger, setup_logging
|
|
from utils.metrics import MetricsCollector
|
|
|
|
# Temporary: Comment out due to ONNX/ml_dtypes compatibility issue
|
|
# from services.audio.speaker_diarization import SpeakerDiarizationService
|
|
|
|
|
|
# Temporary stub
|
|
class SpeakerDiarizationService:
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
async def initialize(self):
|
|
pass
|
|
|
|
async def close(self):
|
|
pass
|
|
|
|
|
|
# Basic logging setup - will be enhanced after settings are loaded
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
|
|
handlers=[logging.StreamHandler(sys.stdout)],
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Enhanced loggers will be initialized after settings are available
|
|
bot_logger: Optional[any] = None
|
|
perf_logger: Optional[any] = None
|
|
|
|
BotT = TypeVar("BotT", bound=commands.Bot)
|
|
|
|
|
|
class QuoteBot(commands.Bot):
|
|
"""
|
|
Discord Voice Chat Quote Bot
|
|
|
|
Main bot class that orchestrates all components including audio recording,
|
|
speaker recognition, AI analysis, and user interaction systems.
|
|
"""
|
|
|
|
def __init__(self):
|
|
# Load environment configuration
|
|
load_dotenv()
|
|
from config.settings import get_settings
|
|
self.settings = get_settings()
|
|
|
|
# Initialize enhanced logging system
|
|
self.loggers = setup_logging(self.settings)
|
|
global bot_logger, perf_logger
|
|
bot_logger = get_context_logger('disbord.main')
|
|
perf_logger = get_performance_logger('disbord.main')
|
|
|
|
# Log bot initialization start
|
|
bot_logger.info("QuoteBot initializing", {
|
|
'log_level': self.settings.log_level,
|
|
'debug_mode': self.settings.debug_mode,
|
|
'verbose_logging': self.settings.verbose_logging
|
|
})
|
|
|
|
# Configure Discord intents
|
|
intents = discord.Intents.default()
|
|
intents.message_content = True
|
|
intents.voice_states = True
|
|
intents.guilds = True
|
|
intents.members = True
|
|
|
|
super().__init__(
|
|
command_prefix="!",
|
|
intents=intents,
|
|
help_command=None,
|
|
case_insensitive=True,
|
|
)
|
|
|
|
# Initialize core managers
|
|
self.db_manager: Optional[DatabaseManager] = None
|
|
self.ai_manager: Optional[AIProviderManager] = None
|
|
self.memory_manager: Optional[MemoryManager] = None
|
|
self.consent_manager: Optional[ConsentManager] = None
|
|
|
|
# Enhanced loggers for different components
|
|
self.db_logger = get_context_logger('disbord.database')
|
|
self.ai_logger = get_context_logger('disbord.ai_manager')
|
|
self.memory_logger = get_context_logger('disbord.memory_manager')
|
|
self.audio_logger = get_context_logger('disbord.audio_recorder')
|
|
self.transcription_logger = get_context_logger('disbord.transcription')
|
|
self.analysis_logger = get_context_logger('disbord.quote_analyzer')
|
|
|
|
# Initialize services
|
|
self.audio_recorder: Optional[AudioRecorderService] = None
|
|
self.speaker_diarization: Optional[SpeakerDiarizationService] = None
|
|
self.transcription_service: Optional["TranscriptionService"] = None
|
|
self.laughter_detector: Optional["LaughterDetector"] = None
|
|
self.quote_analyzer: Optional[QuoteAnalyzer] = None
|
|
self.tts_service: Optional["TTSService"] = None
|
|
self.response_scheduler: Optional[ResponseScheduler] = None
|
|
self.speaker_recognition: Optional["SpeakerRecognitionService"] = None
|
|
self.user_tagging: Optional["UserAssistedTaggingService"] = None
|
|
self.quote_explanation: Optional["QuoteExplanationService"] = None
|
|
self.feedback_system: Optional["FeedbackSystem"] = None
|
|
self.health_monitor: Optional["HealthMonitor"] = None
|
|
|
|
# Initialize utilities
|
|
self.metrics: Optional[MetricsCollector] = None
|
|
self.audio_processor: Optional[AudioProcessor] = None
|
|
|
|
# Bot state
|
|
self.ready: bool = False
|
|
self.active_recordings: Dict[int, object] = {}
|
|
self.processing_queue: asyncio.Queue = asyncio.Queue()
|
|
|
|
# Create necessary directories
|
|
self._create_directories()
|
|
|
|
# Log configuration summary
|
|
bot_logger.info("Bot configuration loaded", {
|
|
'guilds_configured': bool(self.settings.guild_id),
|
|
'ai_providers': list(self.settings.ai_providers.keys()),
|
|
'recording_duration': self.settings.recording_clip_duration,
|
|
'quote_thresholds': self.settings.thresholds
|
|
})
|
|
|
|
def _create_directories(self):
|
|
"""Create required directories for data, logs, and temporary files"""
|
|
directories = [
|
|
"data",
|
|
"logs",
|
|
"temp",
|
|
"migrations/versions",
|
|
"config",
|
|
"core",
|
|
"services",
|
|
"utils",
|
|
"cogs",
|
|
"extensions",
|
|
]
|
|
|
|
for directory in directories:
|
|
Path(directory).mkdir(parents=True, exist_ok=True)
|
|
|
|
async def setup_hook(self):
|
|
"""Initialize all bot components and load cogs"""
|
|
start_time = time.perf_counter()
|
|
bot_logger.info("Starting Discord Quote Bot initialization")
|
|
|
|
try:
|
|
# Initialize core managers
|
|
init_start = time.perf_counter()
|
|
await self._initialize_core_managers()
|
|
core_init_time = round((time.perf_counter() - init_start) * 1000, 2)
|
|
bot_logger.info("Core managers initialized", {'duration_ms': core_init_time})
|
|
|
|
# Initialize services
|
|
init_start = time.perf_counter()
|
|
await self._initialize_services()
|
|
services_init_time = round((time.perf_counter() - init_start) * 1000, 2)
|
|
bot_logger.info("Services initialized", {'duration_ms': services_init_time})
|
|
|
|
# Initialize utilities
|
|
init_start = time.perf_counter()
|
|
await self._initialize_utilities()
|
|
utils_init_time = round((time.perf_counter() - init_start) * 1000, 2)
|
|
bot_logger.info("Utilities initialized", {'duration_ms': utils_init_time})
|
|
|
|
# Load cogs
|
|
init_start = time.perf_counter()
|
|
await self._load_cogs()
|
|
cogs_init_time = round((time.perf_counter() - init_start) * 1000, 2)
|
|
bot_logger.info("Cogs loaded", {'duration_ms': cogs_init_time})
|
|
|
|
# Start background tasks
|
|
init_start = time.perf_counter()
|
|
await self._start_background_tasks()
|
|
tasks_init_time = round((time.perf_counter() - init_start) * 1000, 2)
|
|
bot_logger.info("Background tasks started", {'duration_ms': tasks_init_time})
|
|
|
|
# Sync slash commands
|
|
init_start = time.perf_counter()
|
|
synced_commands = await self.tree.sync()
|
|
sync_time = round((time.perf_counter() - init_start) * 1000, 2)
|
|
bot_logger.info("Slash commands synced", {
|
|
'commands_synced': len(synced_commands),
|
|
'duration_ms': sync_time
|
|
})
|
|
|
|
total_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
bot_logger.info("Bot initialization completed successfully", {
|
|
'total_duration_ms': total_time,
|
|
'core_init_ms': core_init_time,
|
|
'services_init_ms': services_init_time,
|
|
'utils_init_ms': utils_init_time,
|
|
'cogs_init_ms': cogs_init_time,
|
|
'tasks_init_ms': tasks_init_time,
|
|
'sync_ms': sync_time
|
|
})
|
|
|
|
except Exception as e:
|
|
total_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
bot_logger.error("Failed to initialize bot", {
|
|
'duration_ms': total_time,
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)
|
|
}, exc_info=True)
|
|
await self.close()
|
|
raise
|
|
|
|
async def _initialize_core_managers(self):
|
|
"""Initialize core management systems"""
|
|
logger.info("Initializing core managers...")
|
|
|
|
# Database manager
|
|
self.db_manager = DatabaseManager(self.settings.database_url)
|
|
await self.db_manager.initialize()
|
|
|
|
# AI provider manager
|
|
self.ai_manager = AIProviderManager(self.settings)
|
|
await self.ai_manager.initialize()
|
|
|
|
# Memory manager for long-term context
|
|
self.memory_manager = MemoryManager(
|
|
self.ai_manager, self.db_manager, self.settings
|
|
)
|
|
await self.memory_manager.initialize()
|
|
|
|
# Consent manager for privacy compliance
|
|
self.consent_manager = ConsentManager(self.db_manager)
|
|
await self.consent_manager.initialize()
|
|
|
|
async def _initialize_services(self):
|
|
"""Initialize audio and AI processing services"""
|
|
logger.info("Initializing processing services...")
|
|
|
|
# Health monitor (required by slash commands)
|
|
assert self.db_manager is not None
|
|
from services.monitoring.health_monitor import HealthMonitor
|
|
|
|
self.health_monitor = HealthMonitor(self.db_manager)
|
|
await self.health_monitor.initialize()
|
|
|
|
# Speaker diarization first (required by other services)
|
|
assert self.db_manager is not None
|
|
assert self.consent_manager is not None
|
|
self.speaker_diarization = SpeakerDiarizationService(
|
|
self.db_manager, self.consent_manager, AudioProcessor()
|
|
)
|
|
await self.speaker_diarization.initialize()
|
|
|
|
# Speaker recognition service
|
|
assert self.ai_manager is not None
|
|
from services.audio.speaker_recognition import \
|
|
SpeakerRecognitionService
|
|
|
|
self.speaker_recognition = SpeakerRecognitionService(
|
|
self.ai_manager, self.db_manager, AudioProcessor()
|
|
)
|
|
await self.speaker_recognition.initialize()
|
|
|
|
# Audio recording service
|
|
assert self.consent_manager is not None
|
|
assert self.speaker_diarization is not None
|
|
self.audio_recorder = AudioRecorderService(
|
|
self.settings, self.consent_manager, self.speaker_diarization
|
|
)
|
|
# Initialize the audio recorder with required dependencies
|
|
await self.audio_recorder.initialize(self.db_manager, AudioProcessor())
|
|
|
|
# Transcription service
|
|
from services.audio.transcription_service import TranscriptionService
|
|
|
|
assert self.ai_manager is not None
|
|
assert self.db_manager is not None
|
|
assert self.speaker_diarization is not None
|
|
self.transcription_service = TranscriptionService(
|
|
self.ai_manager, self.db_manager, self.speaker_diarization, AudioProcessor()
|
|
)
|
|
await self.transcription_service.initialize()
|
|
|
|
# Laughter detection service
|
|
from services.audio.laughter_detection import LaughterDetector
|
|
|
|
assert self.db_manager is not None
|
|
self.laughter_detector = LaughterDetector(AudioProcessor(), self.db_manager)
|
|
await self.laughter_detector.initialize()
|
|
|
|
# TTS service (optional)
|
|
try:
|
|
from services.audio.tts_service import TTSService
|
|
|
|
assert self.ai_manager is not None
|
|
self.tts_service = TTSService(self.ai_manager, self.settings)
|
|
await self.tts_service.initialize()
|
|
logger.info("TTS service initialized")
|
|
except Exception as e:
|
|
logger.warning(f"TTS service initialization failed (non-critical): {e}")
|
|
self.tts_service = None
|
|
|
|
# Quote analysis engine
|
|
assert self.ai_manager is not None
|
|
assert self.memory_manager is not None
|
|
assert self.db_manager is not None
|
|
self.quote_analyzer = QuoteAnalyzer(
|
|
self.ai_manager, self.memory_manager, self.db_manager, self.settings
|
|
)
|
|
await self.quote_analyzer.initialize()
|
|
|
|
# Response scheduling system
|
|
assert self.db_manager is not None
|
|
assert self.ai_manager is not None
|
|
self.response_scheduler = ResponseScheduler(
|
|
self.db_manager,
|
|
self.ai_manager,
|
|
self.settings,
|
|
self, # Pass bot instance
|
|
)
|
|
await self.response_scheduler.initialize()
|
|
|
|
# User-assisted tagging service
|
|
from services.interaction.user_assisted_tagging import \
|
|
UserAssistedTaggingService
|
|
|
|
self.user_tagging = UserAssistedTaggingService(
|
|
self, self.db_manager, self.speaker_diarization, self.transcription_service, self.consent_manager
|
|
)
|
|
await self.user_tagging.initialize()
|
|
|
|
# Quote explanation service
|
|
from services.quotes.quote_explanation import QuoteExplanationService
|
|
|
|
self.quote_explanation = QuoteExplanationService(
|
|
self, self.db_manager, self.ai_manager
|
|
)
|
|
await self.quote_explanation.initialize()
|
|
|
|
# Feedback system
|
|
from services.interaction.feedback_system import FeedbackSystem
|
|
|
|
self.feedback_system = FeedbackSystem(self, self.db_manager, self.ai_manager, self.consent_manager)
|
|
await self.feedback_system.initialize()
|
|
|
|
async def _initialize_utilities(self):
|
|
"""Initialize utility components"""
|
|
logger.info("Initializing utilities...")
|
|
|
|
# Metrics collection
|
|
self.metrics = MetricsCollector()
|
|
|
|
# Audio processing utilities
|
|
self.audio_processor = AudioProcessor()
|
|
|
|
async def _load_cogs(self):
|
|
"""Load all Discord command cogs"""
|
|
logger.info("Loading command cogs...")
|
|
|
|
cogs = [
|
|
"cogs.voice_cog",
|
|
"cogs.quotes_cog",
|
|
"cogs.consent_cog",
|
|
"cogs.admin_cog",
|
|
"cogs.tasks_cog",
|
|
]
|
|
|
|
for cog in cogs:
|
|
try:
|
|
await self.load_extension(cog)
|
|
logger.info(f"Loaded cog: {cog}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load cog {cog}: {e}")
|
|
|
|
# Load slash commands cog with services
|
|
try:
|
|
from commands import slash_commands
|
|
|
|
await slash_commands.setup(
|
|
self,
|
|
db_manager=self.db_manager,
|
|
consent_manager=self.consent_manager,
|
|
memory_manager=self.memory_manager,
|
|
audio_recorder=self.audio_recorder,
|
|
speaker_recognition=self.speaker_recognition,
|
|
user_tagging=self.user_tagging,
|
|
quote_analyzer=self.quote_analyzer,
|
|
tts_service=self.tts_service,
|
|
quote_explanation=self.quote_explanation,
|
|
feedback_system=self.feedback_system,
|
|
health_monitor=self.health_monitor,
|
|
)
|
|
logger.info("Loaded slash commands cog with services")
|
|
except Exception as e:
|
|
logger.error(f"Failed to load slash commands cog: {e}")
|
|
|
|
async def _start_background_tasks(self):
|
|
"""Start background processing tasks"""
|
|
logger.info("Starting background tasks...")
|
|
|
|
# Audio processing task
|
|
self.loop.create_task(self._audio_processing_worker())
|
|
|
|
# Response scheduler task
|
|
if self.response_scheduler:
|
|
self.loop.create_task(self.response_scheduler.start_scheduler())
|
|
|
|
# Metrics collection task
|
|
if self.metrics:
|
|
self.loop.create_task(self.metrics.start_collection())
|
|
|
|
# Health check task
|
|
self.loop.create_task(self._health_check_worker())
|
|
|
|
async def _audio_processing_worker(self):
|
|
"""Background worker for processing audio clips"""
|
|
logger.info("Audio processing worker started")
|
|
|
|
while not self.is_closed():
|
|
try:
|
|
# Wait for audio clips to process
|
|
audio_clip = await self.processing_queue.get()
|
|
|
|
if audio_clip is None: # Shutdown signal
|
|
break
|
|
|
|
# Process the audio clip
|
|
await self._process_audio_clip(audio_clip)
|
|
|
|
# Mark task as done
|
|
self.processing_queue.task_done()
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in audio processing worker: {e}")
|
|
await asyncio.sleep(1) # Brief pause before retrying
|
|
|
|
async def _process_audio_clip(self, audio_clip):
|
|
"""Process a single audio clip through the full pipeline"""
|
|
try:
|
|
logger.info(f"Processing audio clip: {audio_clip.id}")
|
|
|
|
# Step 1: Get speaker diarization result (already performed in audio recorder)
|
|
diarization_result = getattr(audio_clip, "diarization_result", None)
|
|
|
|
# Step 2: Transcribe audio with speaker segment mapping
|
|
if self.transcription_service and hasattr(
|
|
self.transcription_service, "transcribe_audio_clip"
|
|
):
|
|
transcription_session = await self.transcription_service.transcribe_audio_clip( # type: ignore
|
|
audio_clip.file_path,
|
|
audio_clip.guild_id,
|
|
audio_clip.channel_id,
|
|
diarization_result,
|
|
audio_clip.id,
|
|
)
|
|
else:
|
|
transcription_session = None
|
|
|
|
if not transcription_session:
|
|
logger.warning(
|
|
f"No transcription results for audio clip: {audio_clip.id}"
|
|
)
|
|
return
|
|
|
|
# Step 2.5: Analyze laughter in the audio clip
|
|
if self.laughter_detector and hasattr(
|
|
self.laughter_detector, "detect_laughter"
|
|
):
|
|
laughter_analysis = await self.laughter_detector.detect_laughter( # type: ignore
|
|
audio_clip.file_path, audio_clip.participants
|
|
)
|
|
else:
|
|
laughter_analysis = None
|
|
|
|
# Step 3: Process each transcribed segment for quote analysis
|
|
quote_candidates = []
|
|
for segment in transcription_session.transcribed_segments:
|
|
if segment.is_quote_candidate and segment.text.strip():
|
|
# Find overlapping laughter for this segment
|
|
segment_laughter = self._get_segment_laughter_data(
|
|
segment, laughter_analysis
|
|
)
|
|
|
|
# Step 4: Quote analysis and scoring with laughter data
|
|
if self.quote_analyzer:
|
|
quote_data = await self.quote_analyzer.analyze_quote(
|
|
segment.text,
|
|
segment.speaker_label,
|
|
{
|
|
"user_id": segment.user_id,
|
|
"confidence": segment.confidence,
|
|
"timestamp": getattr(transcription_session, 'timestamp', 0.0) if isinstance(getattr(transcription_session, 'timestamp', None), (int, float)) else (transcription_session.timestamp.timestamp() if hasattr(transcription_session.timestamp, 'timestamp') else 0.0),
|
|
"duration": segment.end_time - segment.start_time,
|
|
"word_count": segment.word_count,
|
|
"language": segment.language,
|
|
"channel_id": audio_clip.channel_id,
|
|
"guild_id": audio_clip.guild_id,
|
|
"laughter_duration": segment_laughter.get(
|
|
"duration", 0
|
|
),
|
|
"laughter_intensity": segment_laughter.get(
|
|
"intensity", 0
|
|
),
|
|
"laughter_data": segment_laughter,
|
|
},
|
|
)
|
|
else:
|
|
quote_data = None
|
|
|
|
if quote_data:
|
|
quote_candidates.append(quote_data)
|
|
|
|
# Step 5: Process quote scores and determine responses
|
|
if self.response_scheduler:
|
|
for quote_data in quote_candidates:
|
|
await self.response_scheduler.process_quote_score(quote_data)
|
|
|
|
# Update metrics
|
|
laughter_info = {
|
|
"total_laughter_duration": (
|
|
laughter_analysis.total_laughter_duration
|
|
if laughter_analysis
|
|
else 0
|
|
),
|
|
"laughter_segments": (
|
|
len(laughter_analysis.laughter_segments) if laughter_analysis else 0
|
|
),
|
|
}
|
|
|
|
if self.metrics:
|
|
self.metrics.increment(
|
|
"audio_clips_processed",
|
|
{
|
|
"guild_id": str(audio_clip.guild_id),
|
|
"segments_processed": str(
|
|
len(transcription_session.transcribed_segments)
|
|
),
|
|
"quote_candidates": str(len(quote_candidates)),
|
|
"laughter_detected": str(
|
|
laughter_analysis is not None
|
|
and laughter_analysis.total_laughter_duration > 0
|
|
),
|
|
},
|
|
)
|
|
|
|
logger.info(
|
|
f"Processed audio clip {audio_clip.id}: "
|
|
f"{len(transcription_session.transcribed_segments)} segments, "
|
|
f"{len(quote_candidates)} quote candidates, "
|
|
f"{laughter_info['total_laughter_duration']:.2f}s laughter"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to process audio clip {audio_clip.id}: {e}")
|
|
if self.metrics:
|
|
self.metrics.increment(
|
|
"audio_processing_errors",
|
|
{"guild_id": str(getattr(audio_clip, "guild_id", "unknown"))},
|
|
)
|
|
|
|
def _get_segment_laughter_data(
|
|
self, segment, laughter_analysis
|
|
) -> Dict[str, float]:
|
|
"""Get laughter data that overlaps with a transcribed segment"""
|
|
try:
|
|
if not laughter_analysis or not laughter_analysis.laughter_segments:
|
|
return {"duration": 0.0, "intensity": 0.0, "count": 0}
|
|
|
|
overlapping_laughter = []
|
|
segment_start = segment.start_time
|
|
segment_end = segment.end_time
|
|
|
|
for laughter_seg in laughter_analysis.laughter_segments:
|
|
# Check for overlap
|
|
overlap_start = max(segment_start, laughter_seg.start_time)
|
|
overlap_end = min(segment_end, laughter_seg.end_time)
|
|
|
|
if overlap_start < overlap_end: # There is overlap
|
|
overlap_duration = overlap_end - overlap_start
|
|
overlapping_laughter.append(
|
|
{
|
|
"duration": overlap_duration,
|
|
"intensity": laughter_seg.intensity,
|
|
"confidence": laughter_seg.confidence,
|
|
}
|
|
)
|
|
|
|
if not overlapping_laughter:
|
|
return {"duration": 0.0, "intensity": 0.0, "count": 0}
|
|
|
|
# Aggregate overlapping laughter
|
|
total_duration = sum(laugh["duration"] for laugh in overlapping_laughter)
|
|
weighted_intensity = (
|
|
sum(
|
|
laugh["duration"] * laugh["intensity"]
|
|
for laugh in overlapping_laughter
|
|
)
|
|
/ total_duration
|
|
if total_duration > 0
|
|
else 0
|
|
)
|
|
|
|
return {
|
|
"duration": total_duration,
|
|
"intensity": weighted_intensity,
|
|
"count": len(overlapping_laughter),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get segment laughter data: {e}")
|
|
return {"duration": 0.0, "intensity": 0.0, "count": 0}
|
|
|
|
async def _health_check_worker(self):
|
|
"""Background worker for health monitoring"""
|
|
while not self.is_closed():
|
|
try:
|
|
# Check system health
|
|
health_status = await self._check_system_health()
|
|
|
|
# Update metrics
|
|
if self.metrics:
|
|
overall_score = health_status.get("overall_score")
|
|
if isinstance(overall_score, (int, float)):
|
|
self.metrics.set_gauge("system_health", overall_score)
|
|
|
|
# Log warnings for unhealthy components
|
|
if isinstance(health_status.get("components"), dict):
|
|
for component, status in health_status["components"].items(): # type: ignore
|
|
if isinstance(status, dict) and status.get("healthy") is False:
|
|
logger.warning(
|
|
f"Component {component} is unhealthy: {status.get('error', 'Unknown error')}"
|
|
)
|
|
|
|
await asyncio.sleep(30) # Check every 30 seconds
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in health check worker: {e}")
|
|
await asyncio.sleep(30)
|
|
|
|
async def _check_system_health(self) -> Dict[str, object]:
|
|
"""Check health of all system components"""
|
|
components: Dict[str, object] = {}
|
|
|
|
# Database health
|
|
try:
|
|
if self.db_manager:
|
|
await self.db_manager.check_health()
|
|
components["database"] = {"healthy": True, "latency": 0}
|
|
except Exception as e:
|
|
components["database"] = {"healthy": False, "error": str(e)}
|
|
|
|
# AI providers health
|
|
try:
|
|
if self.ai_manager:
|
|
provider_health = await self.ai_manager.check_health()
|
|
if isinstance(provider_health, dict):
|
|
components["ai_providers"] = provider_health
|
|
else:
|
|
components["ai_providers"] = {"healthy": True}
|
|
else:
|
|
components["ai_providers"] = {
|
|
"healthy": False,
|
|
"error": "AI manager not initialized",
|
|
}
|
|
except Exception as e:
|
|
components["ai_providers"] = {"healthy": False, "error": str(e)}
|
|
|
|
# Memory system health
|
|
try:
|
|
if self.memory_manager:
|
|
memory_health = await self.memory_manager.check_health()
|
|
if isinstance(memory_health, dict):
|
|
components["memory_system"] = memory_health
|
|
else:
|
|
components["memory_system"] = {"healthy": True}
|
|
else:
|
|
components["memory_system"] = {
|
|
"healthy": False,
|
|
"error": "Memory manager not initialized",
|
|
}
|
|
except Exception as e:
|
|
components["memory_system"] = {"healthy": False, "error": str(e)}
|
|
|
|
# Transcription service health
|
|
try:
|
|
if self.transcription_service and hasattr(
|
|
self.transcription_service, "check_health"
|
|
):
|
|
transcription_health = await self.transcription_service.check_health() # type: ignore
|
|
if isinstance(transcription_health, dict):
|
|
components["transcription_service"] = transcription_health
|
|
else:
|
|
components["transcription_service"] = {"healthy": True}
|
|
else:
|
|
components["transcription_service"] = {
|
|
"healthy": False,
|
|
"error": "Transcription service not initialized",
|
|
}
|
|
except Exception as e:
|
|
components["transcription_service"] = {"healthy": False, "error": str(e)}
|
|
|
|
# Quote analyzer health
|
|
try:
|
|
if self.quote_analyzer:
|
|
analyzer_health = await self.quote_analyzer.check_health()
|
|
if isinstance(analyzer_health, dict):
|
|
components["quote_analyzer"] = analyzer_health
|
|
else:
|
|
components["quote_analyzer"] = {"healthy": True}
|
|
else:
|
|
components["quote_analyzer"] = {
|
|
"healthy": False,
|
|
"error": "Quote analyzer not initialized",
|
|
}
|
|
except Exception as e:
|
|
components["quote_analyzer"] = {"healthy": False, "error": str(e)}
|
|
|
|
# Laughter detector health
|
|
try:
|
|
if self.laughter_detector and hasattr(
|
|
self.laughter_detector, "check_health"
|
|
):
|
|
laughter_health = await self.laughter_detector.check_health() # type: ignore
|
|
if isinstance(laughter_health, dict):
|
|
components["laughter_detector"] = laughter_health
|
|
else:
|
|
components["laughter_detector"] = {"healthy": True}
|
|
else:
|
|
components["laughter_detector"] = {
|
|
"healthy": False,
|
|
"error": "Laughter detector not initialized",
|
|
}
|
|
except Exception as e:
|
|
components["laughter_detector"] = {"healthy": False, "error": str(e)}
|
|
|
|
# Calculate overall health score
|
|
healthy_count = sum(
|
|
1
|
|
for comp in components.values()
|
|
if isinstance(comp, dict) and comp.get("healthy", False)
|
|
)
|
|
overall_score = healthy_count / len(components) if components else 0.0
|
|
|
|
return {
|
|
"overall_score": overall_score,
|
|
"components": components,
|
|
"timestamp": asyncio.get_event_loop().time(),
|
|
}
|
|
|
|
async def on_ready(self):
|
|
"""Called when the bot is ready and connected to Discord"""
|
|
bot_logger.info("Bot connected to Discord", {
|
|
'bot_user': str(self.user),
|
|
'bot_id': self.user.id if self.user else None,
|
|
'guild_count': len(self.guilds),
|
|
'user_count': sum(guild.member_count for guild in self.guilds if guild.member_count),
|
|
'latency_ms': round(self.latency * 1000, 2)
|
|
})
|
|
|
|
# Log guild information in debug mode
|
|
if self.settings.debug_mode:
|
|
for guild in self.guilds:
|
|
bot_logger.debug("Connected to guild", {
|
|
'guild_id': guild.id,
|
|
'guild_name': guild.name,
|
|
'member_count': guild.member_count,
|
|
'voice_channels': len([c for c in guild.channels if isinstance(c, discord.VoiceChannel)])
|
|
})
|
|
|
|
# Set bot activity status
|
|
activity = discord.Activity(
|
|
type=discord.ActivityType.listening, name="for memorable quotes 🎤"
|
|
)
|
|
await self.change_presence(activity=activity)
|
|
bot_logger.info("Bot presence updated", {'activity': 'listening for memorable quotes'})
|
|
|
|
self.ready = True
|
|
bot_logger.info("Bot is fully ready and operational")
|
|
|
|
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
|
|
"""Log voice channel join/leave events for debugging."""
|
|
# Skip bot's own voice state changes unless in debug mode
|
|
if member == self.user and not self.settings.debug_mode:
|
|
return
|
|
|
|
context = {
|
|
'user_id': member.id,
|
|
'username': member.display_name,
|
|
'guild_id': member.guild.id
|
|
}
|
|
|
|
# User joined a voice channel
|
|
if before.channel is None and after.channel is not None:
|
|
context.update({
|
|
'action': 'joined',
|
|
'channel_id': after.channel.id,
|
|
'channel_name': after.channel.name,
|
|
'member_count': len(after.channel.members)
|
|
})
|
|
bot_logger.info("User joined voice channel", context)
|
|
|
|
# Check if recording should start automatically
|
|
if self.audio_recorder and hasattr(self.audio_recorder, 'should_auto_record'):
|
|
try:
|
|
should_record = await self.audio_recorder.should_auto_record(after.channel)
|
|
if should_record:
|
|
bot_logger.info("Auto-recording triggered by voice join", context)
|
|
except Exception as e:
|
|
bot_logger.warning("Failed to check auto-record condition", {
|
|
**context, 'error': str(e)
|
|
})
|
|
|
|
# User left a voice channel
|
|
elif before.channel is not None and after.channel is None:
|
|
context.update({
|
|
'action': 'left',
|
|
'channel_id': before.channel.id,
|
|
'channel_name': before.channel.name,
|
|
'remaining_members': len(before.channel.members) - 1 # -1 because member hasn't left yet
|
|
})
|
|
bot_logger.info("User left voice channel", context)
|
|
|
|
# User moved between voice channels
|
|
elif before.channel != after.channel and before.channel and after.channel:
|
|
context.update({
|
|
'action': 'moved',
|
|
'from_channel_id': before.channel.id,
|
|
'from_channel_name': before.channel.name,
|
|
'to_channel_id': after.channel.id,
|
|
'to_channel_name': after.channel.name
|
|
})
|
|
bot_logger.info("User moved between voice channels", context)
|
|
|
|
# Voice state changes (mute, deafen, etc.)
|
|
elif before.channel == after.channel and before.channel:
|
|
changes = []
|
|
if before.self_mute != after.self_mute:
|
|
changes.append(f"self_mute: {before.self_mute} -> {after.self_mute}")
|
|
if before.self_deaf != after.self_deaf:
|
|
changes.append(f"self_deaf: {before.self_deaf} -> {after.self_deaf}")
|
|
if before.mute != after.mute:
|
|
changes.append(f"server_mute: {before.mute} -> {after.mute}")
|
|
if before.deaf != after.deaf:
|
|
changes.append(f"server_deaf: {before.deaf} -> {after.deaf}")
|
|
|
|
if changes:
|
|
context.update({
|
|
'action': 'state_change',
|
|
'channel_id': after.channel.id,
|
|
'channel_name': after.channel.name,
|
|
'changes': ' | '.join(changes)
|
|
})
|
|
bot_logger.debug("Voice state changed", context)
|
|
|
|
async def on_guild_join(self, guild: discord.Guild):
|
|
"""Log when bot joins a new guild."""
|
|
bot_logger.info("Bot joined new guild", {
|
|
'guild_id': guild.id,
|
|
'guild_name': guild.name,
|
|
'member_count': guild.member_count,
|
|
'voice_channels': len([c for c in guild.channels if isinstance(c, discord.VoiceChannel)]),
|
|
'text_channels': len([c for c in guild.channels if isinstance(c, discord.TextChannel)])
|
|
})
|
|
|
|
if self.metrics:
|
|
self.metrics.increment("guild_joins")
|
|
|
|
async def on_guild_remove(self, guild: discord.Guild):
|
|
"""Log when bot leaves a guild."""
|
|
bot_logger.info("Bot removed from guild", {
|
|
'guild_id': guild.id,
|
|
'guild_name': guild.name
|
|
})
|
|
|
|
if self.metrics:
|
|
self.metrics.increment("guild_leaves")
|
|
|
|
async def on_app_command_completion(self, interaction: discord.Interaction, command: discord.app_commands.Command):
|
|
"""Log successful slash command executions."""
|
|
context = {
|
|
'command': command.name,
|
|
'user_id': interaction.user.id,
|
|
'username': interaction.user.display_name,
|
|
'guild_id': interaction.guild.id if interaction.guild else None,
|
|
'channel_id': interaction.channel.id if interaction.channel else None
|
|
}
|
|
|
|
# Log command parameters if in debug mode
|
|
if self.settings.debug_mode and hasattr(interaction, 'namespace'):
|
|
params = {}
|
|
for param, value in vars(interaction.namespace).items():
|
|
# Sanitize sensitive information
|
|
if any(sensitive in param.lower() for sensitive in ['token', 'key', 'password', 'secret']):
|
|
params[param] = '[REDACTED]'
|
|
else:
|
|
params[param] = str(value) if value is not None else None
|
|
context['parameters'] = params
|
|
|
|
bot_logger.info("Slash command completed successfully", context)
|
|
|
|
if self.metrics:
|
|
self.metrics.increment("commands_completed", {'command': command.name})
|
|
|
|
async def on_app_command_error(self, interaction: discord.Interaction, error: discord.app_commands.AppCommandError):
|
|
"""Log slash command errors with full context."""
|
|
context = {
|
|
'command': interaction.command.name if interaction.command else 'unknown',
|
|
'user_id': interaction.user.id,
|
|
'username': interaction.user.display_name,
|
|
'guild_id': interaction.guild.id if interaction.guild else None,
|
|
'channel_id': interaction.channel.id if interaction.channel else None,
|
|
'error_type': type(error).__name__
|
|
}
|
|
|
|
bot_logger.error("Slash command failed", context, exc_info=True)
|
|
|
|
if self.metrics:
|
|
self.metrics.increment("app_command_errors", {
|
|
'command': interaction.command.name if interaction.command else 'unknown',
|
|
'error_type': type(error).__name__
|
|
})
|
|
|
|
async def on_message(self, message: discord.Message):
|
|
"""Log message events for debugging (only in verbose mode)."""
|
|
# Skip bot messages and only log in verbose mode
|
|
if message.author.bot or not self.settings.verbose_logging:
|
|
return
|
|
|
|
# Log message in channels where bot is recording
|
|
if self.active_recordings and message.channel.id in [r['channel_id'] for r in self.active_recordings.values()]:
|
|
bot_logger.debug("Message in recorded channel", {
|
|
'user_id': message.author.id,
|
|
'username': message.author.display_name,
|
|
'channel_id': message.channel.id,
|
|
'guild_id': message.guild.id if message.guild else None,
|
|
'message_length': len(message.content),
|
|
'has_attachments': len(message.attachments) > 0
|
|
})
|
|
|
|
async def on_error(self, event, *args, **kwargs):
|
|
"""Handle uncaught errors"""
|
|
# Extract context from event arguments
|
|
context = {'event': event}
|
|
|
|
# Try to extract useful context from common event args
|
|
if args:
|
|
first_arg = args[0]
|
|
if hasattr(first_arg, 'guild') and first_arg.guild:
|
|
context['guild_id'] = first_arg.guild.id
|
|
if hasattr(first_arg, 'channel') and first_arg.channel:
|
|
context['channel_id'] = first_arg.channel.id
|
|
if hasattr(first_arg, 'author') and first_arg.author:
|
|
context['user_id'] = first_arg.author.id
|
|
|
|
bot_logger.error(f"Uncaught error in Discord event", context, exc_info=True)
|
|
|
|
if self.metrics:
|
|
self.metrics.increment("bot_errors", {'event': event})
|
|
|
|
async def on_command_error(
|
|
self, context: commands.Context[BotT], exception: commands.CommandError, /
|
|
) -> None:
|
|
"""Handle command errors"""
|
|
if isinstance(exception, commands.CommandNotFound):
|
|
return # Ignore unknown commands
|
|
|
|
error_context = {
|
|
'command': context.command.name if context.command else 'unknown',
|
|
'user_id': context.author.id,
|
|
'guild_id': context.guild.id if context.guild else None,
|
|
'channel_id': context.channel.id,
|
|
'error_type': type(exception).__name__,
|
|
'message_content': context.message.content[:100] if hasattr(context, 'message') else None
|
|
}
|
|
|
|
bot_logger.error(f"Command execution failed", error_context, exc_info=True)
|
|
|
|
if self.metrics:
|
|
self.metrics.increment("command_errors", {
|
|
'command': context.command.name if context.command else 'unknown',
|
|
'error_type': type(exception).__name__
|
|
})
|
|
|
|
# Send user-friendly error message
|
|
embed = discord.Embed(
|
|
title="❌ Command Error",
|
|
description=f"An error occurred: {str(exception)[:200]}",
|
|
color=0xFF0000,
|
|
)
|
|
|
|
try:
|
|
# Try application command response first, fall back to regular message
|
|
try:
|
|
await context.respond(embed=embed, ephemeral=True) # type: ignore
|
|
except AttributeError:
|
|
await context.send(embed=embed)
|
|
|
|
bot_logger.debug("Error response sent to user", {'user_id': context.author.id})
|
|
except Exception as send_error:
|
|
bot_logger.warning("Failed to send error response", {
|
|
'user_id': context.author.id,
|
|
'send_error': str(send_error)
|
|
})
|
|
|
|
async def close(self):
|
|
"""Cleanup when bot shuts down"""
|
|
start_time = time.perf_counter()
|
|
bot_logger.info("Initiating bot shutdown")
|
|
|
|
try:
|
|
# Stop background tasks
|
|
if hasattr(self, "processing_queue"):
|
|
bot_logger.info("Stopping background processing tasks")
|
|
await self.processing_queue.put(None) # Signal shutdown
|
|
|
|
# Close services in reverse initialization order
|
|
services_to_close = [
|
|
('audio_recorder', 'cleanup'),
|
|
('transcription_service', 'close'),
|
|
('quote_analyzer', 'close'),
|
|
('laughter_detector', 'close'),
|
|
('tts_service', 'close'),
|
|
('speaker_diarization', 'close'),
|
|
('speaker_recognition', 'close'),
|
|
('user_tagging', 'close'),
|
|
('feedback_system', 'close'),
|
|
('health_monitor', 'close'),
|
|
('response_scheduler', 'stop'),
|
|
('memory_manager', 'close'),
|
|
('db_manager', 'close'),
|
|
('ai_manager', 'close')
|
|
]
|
|
|
|
for service_name, method_name in services_to_close:
|
|
service = getattr(self, service_name, None)
|
|
if service and hasattr(service, method_name):
|
|
try:
|
|
close_start = time.perf_counter()
|
|
await getattr(service, method_name)()
|
|
close_time = round((time.perf_counter() - close_start) * 1000, 2)
|
|
bot_logger.debug(f"Service {service_name} closed", {'duration_ms': close_time})
|
|
except Exception as e:
|
|
bot_logger.warning(f"Error closing {service_name}", {
|
|
'service': service_name,
|
|
'error': str(e)
|
|
})
|
|
|
|
total_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
bot_logger.info("Bot shutdown completed", {'shutdown_duration_ms': total_time})
|
|
|
|
except Exception as e:
|
|
total_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
bot_logger.error("Error during shutdown", {
|
|
'shutdown_duration_ms': total_time,
|
|
'error_type': type(e).__name__
|
|
}, exc_info=True)
|
|
|
|
await super().close()
|
|
|
|
|
|
async def main():
|
|
"""Main function to run the bot"""
|
|
# Setup signal handlers for graceful shutdown
|
|
bot = QuoteBot()
|
|
|
|
def signal_handler(signum, _frame):
|
|
bot_logger.info(f"Received shutdown signal {signum}")
|
|
asyncio.create_task(bot.close())
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
try:
|
|
# Start the bot
|
|
discord_token = os.getenv("DISCORD_TOKEN")
|
|
if not discord_token:
|
|
raise ValueError("DISCORD_TOKEN environment variable is required")
|
|
|
|
bot_logger.info("Starting bot with Discord API")
|
|
start_time = time.perf_counter()
|
|
|
|
await bot.start(discord_token)
|
|
|
|
except KeyboardInterrupt:
|
|
bot_logger.info("Received keyboard interrupt, shutting down gracefully")
|
|
except Exception as e:
|
|
bot_logger.error(f"Fatal error during bot execution", {
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)
|
|
}, exc_info=True)
|
|
raise
|
|
finally:
|
|
if not bot.is_closed():
|
|
final_close_start = time.perf_counter()
|
|
await bot.close()
|
|
final_close_time = round((time.perf_counter() - final_close_start) * 1000, 2)
|
|
bot_logger.info(f"Final cleanup completed", {'duration_ms': final_close_time})
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
# Initialize basic logger for startup messages
|
|
startup_logger = logging.getLogger("disbord.startup")
|
|
startup_logger.info("Discord Quote Bot starting up...")
|
|
|
|
asyncio.run(main())
|
|
|
|
except KeyboardInterrupt:
|
|
print("\nBot stopped by user")
|
|
except Exception as e:
|
|
print(f"Fatal error during startup: {e}")
|
|
logging.getLogger("disbord.startup").error("Fatal startup error", exc_info=True)
|
|
sys.exit(1)
|
|
finally:
|
|
print("Discord Quote Bot shutdown complete")
|