Files
disbord/main.py
Travis Vasceannie 8f6f958426 chore: remove deprecated files and scripts
- Deleted final.md, fix_async_fixtures.py, fix_cog_tests.py, fix_fixture_scoping.py, and run_race_condition_tests.sh as they are no longer needed.
- Updated main.py to enhance logging and error handling.
- Refactored various components for improved performance and maintainability.
- Introduced new logging utilities for better context and performance monitoring.
- Ensured all datetime operations are timezone-aware and consistent across the codebase.
2025-08-28 00:54:28 -04:00

1156 lines
46 KiB
Python

#!/usr/bin/env python3
"""
Discord Voice Chat Quote Bot - Main Entry Point
An advanced AI-powered Discord bot that continuously records, analyzes, and curates
memorable quotes from voice channel conversations using persistent 120-second audio clips.
"""
import asyncio
import logging
import os
import signal
import sys
import time
from pathlib import Path
from typing import TYPE_CHECKING, Dict, Optional, TypeVar
import discord
from discord.ext import commands
from dotenv import load_dotenv
if TYPE_CHECKING:
from services.audio.laughter_detection import LaughterDetector
from services.audio.speaker_recognition import SpeakerRecognitionService
from services.audio.transcription_service import TranscriptionService
from services.audio.tts_service import TTSService
from services.interaction.feedback_system import FeedbackSystem
from services.interaction.user_assisted_tagging import UserAssistedTaggingService
from services.monitoring.health_monitor import HealthMonitor
from services.quotes.quote_explanation import QuoteExplanationService
from config.settings import Settings
from core.ai_manager import AIProviderManager
from core.consent_manager import ConsentManager
from core.database import DatabaseManager
from core.memory_manager import MemoryManager
from services.audio.audio_recorder import AudioRecorderService
from services.automation.response_scheduler import ResponseScheduler
from services.quotes.quote_analyzer import QuoteAnalyzer
from utils.audio_processor import AudioProcessor
from utils.logging_config import get_context_logger, get_performance_logger, setup_logging
from utils.metrics import MetricsCollector
# Temporary: Comment out due to ONNX/ml_dtypes compatibility issue
# from services.audio.speaker_diarization import SpeakerDiarizationService
# Temporary stub
class SpeakerDiarizationService:
def __init__(self, *args, **kwargs):
pass
async def initialize(self):
pass
async def close(self):
pass
# Basic logging setup - will be enhanced after settings are loaded
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
# Enhanced loggers will be initialized after settings are available
bot_logger: Optional[any] = None
perf_logger: Optional[any] = None
BotT = TypeVar("BotT", bound=commands.Bot)
class QuoteBot(commands.Bot):
"""
Discord Voice Chat Quote Bot
Main bot class that orchestrates all components including audio recording,
speaker recognition, AI analysis, and user interaction systems.
"""
def __init__(self):
# Load environment configuration
load_dotenv()
from config.settings import get_settings
self.settings = get_settings()
# Initialize enhanced logging system
self.loggers = setup_logging(self.settings)
global bot_logger, perf_logger
bot_logger = get_context_logger('disbord.main')
perf_logger = get_performance_logger('disbord.main')
# Log bot initialization start
bot_logger.info("QuoteBot initializing", {
'log_level': self.settings.log_level,
'debug_mode': self.settings.debug_mode,
'verbose_logging': self.settings.verbose_logging
})
# Configure Discord intents
intents = discord.Intents.default()
intents.message_content = True
intents.voice_states = True
intents.guilds = True
intents.members = True
super().__init__(
command_prefix="!",
intents=intents,
help_command=None,
case_insensitive=True,
)
# Initialize core managers
self.db_manager: Optional[DatabaseManager] = None
self.ai_manager: Optional[AIProviderManager] = None
self.memory_manager: Optional[MemoryManager] = None
self.consent_manager: Optional[ConsentManager] = None
# Enhanced loggers for different components
self.db_logger = get_context_logger('disbord.database')
self.ai_logger = get_context_logger('disbord.ai_manager')
self.memory_logger = get_context_logger('disbord.memory_manager')
self.audio_logger = get_context_logger('disbord.audio_recorder')
self.transcription_logger = get_context_logger('disbord.transcription')
self.analysis_logger = get_context_logger('disbord.quote_analyzer')
# Initialize services
self.audio_recorder: Optional[AudioRecorderService] = None
self.speaker_diarization: Optional[SpeakerDiarizationService] = None
self.transcription_service: Optional["TranscriptionService"] = None
self.laughter_detector: Optional["LaughterDetector"] = None
self.quote_analyzer: Optional[QuoteAnalyzer] = None
self.tts_service: Optional["TTSService"] = None
self.response_scheduler: Optional[ResponseScheduler] = None
self.speaker_recognition: Optional["SpeakerRecognitionService"] = None
self.user_tagging: Optional["UserAssistedTaggingService"] = None
self.quote_explanation: Optional["QuoteExplanationService"] = None
self.feedback_system: Optional["FeedbackSystem"] = None
self.health_monitor: Optional["HealthMonitor"] = None
# Initialize utilities
self.metrics: Optional[MetricsCollector] = None
self.audio_processor: Optional[AudioProcessor] = None
# Bot state
self.ready: bool = False
self.active_recordings: Dict[int, object] = {}
self.processing_queue: asyncio.Queue = asyncio.Queue()
# Create necessary directories
self._create_directories()
# Log configuration summary
bot_logger.info("Bot configuration loaded", {
'guilds_configured': bool(self.settings.guild_id),
'ai_providers': list(self.settings.ai_providers.keys()),
'recording_duration': self.settings.recording_clip_duration,
'quote_thresholds': self.settings.thresholds
})
def _create_directories(self):
"""Create required directories for data, logs, and temporary files"""
directories = [
"data",
"logs",
"temp",
"migrations/versions",
"config",
"core",
"services",
"utils",
"cogs",
"extensions",
]
for directory in directories:
Path(directory).mkdir(parents=True, exist_ok=True)
async def setup_hook(self):
"""Initialize all bot components and load cogs"""
start_time = time.perf_counter()
bot_logger.info("Starting Discord Quote Bot initialization")
try:
# Initialize core managers
init_start = time.perf_counter()
await self._initialize_core_managers()
core_init_time = round((time.perf_counter() - init_start) * 1000, 2)
bot_logger.info("Core managers initialized", {'duration_ms': core_init_time})
# Initialize services
init_start = time.perf_counter()
await self._initialize_services()
services_init_time = round((time.perf_counter() - init_start) * 1000, 2)
bot_logger.info("Services initialized", {'duration_ms': services_init_time})
# Initialize utilities
init_start = time.perf_counter()
await self._initialize_utilities()
utils_init_time = round((time.perf_counter() - init_start) * 1000, 2)
bot_logger.info("Utilities initialized", {'duration_ms': utils_init_time})
# Load cogs
init_start = time.perf_counter()
await self._load_cogs()
cogs_init_time = round((time.perf_counter() - init_start) * 1000, 2)
bot_logger.info("Cogs loaded", {'duration_ms': cogs_init_time})
# Start background tasks
init_start = time.perf_counter()
await self._start_background_tasks()
tasks_init_time = round((time.perf_counter() - init_start) * 1000, 2)
bot_logger.info("Background tasks started", {'duration_ms': tasks_init_time})
# Sync slash commands
init_start = time.perf_counter()
synced_commands = await self.tree.sync()
sync_time = round((time.perf_counter() - init_start) * 1000, 2)
bot_logger.info("Slash commands synced", {
'commands_synced': len(synced_commands),
'duration_ms': sync_time
})
total_time = round((time.perf_counter() - start_time) * 1000, 2)
bot_logger.info("Bot initialization completed successfully", {
'total_duration_ms': total_time,
'core_init_ms': core_init_time,
'services_init_ms': services_init_time,
'utils_init_ms': utils_init_time,
'cogs_init_ms': cogs_init_time,
'tasks_init_ms': tasks_init_time,
'sync_ms': sync_time
})
except Exception as e:
total_time = round((time.perf_counter() - start_time) * 1000, 2)
bot_logger.error("Failed to initialize bot", {
'duration_ms': total_time,
'error_type': type(e).__name__,
'error_message': str(e)
}, exc_info=True)
await self.close()
raise
async def _initialize_core_managers(self):
"""Initialize core management systems"""
logger.info("Initializing core managers...")
# Database manager
self.db_manager = DatabaseManager(self.settings.database_url)
await self.db_manager.initialize()
# AI provider manager
self.ai_manager = AIProviderManager(self.settings)
await self.ai_manager.initialize()
# Memory manager for long-term context
self.memory_manager = MemoryManager(
self.ai_manager, self.db_manager, self.settings
)
await self.memory_manager.initialize()
# Consent manager for privacy compliance
self.consent_manager = ConsentManager(self.db_manager)
await self.consent_manager.initialize()
async def _initialize_services(self):
"""Initialize audio and AI processing services"""
logger.info("Initializing processing services...")
# Health monitor (required by slash commands)
assert self.db_manager is not None
from services.monitoring.health_monitor import HealthMonitor
self.health_monitor = HealthMonitor(self.db_manager)
await self.health_monitor.initialize()
# Speaker diarization first (required by other services)
assert self.db_manager is not None
assert self.consent_manager is not None
self.speaker_diarization = SpeakerDiarizationService(
self.db_manager, self.consent_manager, AudioProcessor()
)
await self.speaker_diarization.initialize()
# Speaker recognition service
assert self.ai_manager is not None
from services.audio.speaker_recognition import \
SpeakerRecognitionService
self.speaker_recognition = SpeakerRecognitionService(
self.ai_manager, self.db_manager, AudioProcessor()
)
await self.speaker_recognition.initialize()
# Audio recording service
assert self.consent_manager is not None
assert self.speaker_diarization is not None
self.audio_recorder = AudioRecorderService(
self.settings, self.consent_manager, self.speaker_diarization
)
# Initialize the audio recorder with required dependencies
await self.audio_recorder.initialize(self.db_manager, AudioProcessor())
# Transcription service
from services.audio.transcription_service import TranscriptionService
assert self.ai_manager is not None
assert self.db_manager is not None
assert self.speaker_diarization is not None
self.transcription_service = TranscriptionService(
self.ai_manager, self.db_manager, self.speaker_diarization, AudioProcessor()
)
await self.transcription_service.initialize()
# Laughter detection service
from services.audio.laughter_detection import LaughterDetector
assert self.db_manager is not None
self.laughter_detector = LaughterDetector(AudioProcessor(), self.db_manager)
await self.laughter_detector.initialize()
# TTS service (optional)
try:
from services.audio.tts_service import TTSService
assert self.ai_manager is not None
self.tts_service = TTSService(self.ai_manager, self.settings)
await self.tts_service.initialize()
logger.info("TTS service initialized")
except Exception as e:
logger.warning(f"TTS service initialization failed (non-critical): {e}")
self.tts_service = None
# Quote analysis engine
assert self.ai_manager is not None
assert self.memory_manager is not None
assert self.db_manager is not None
self.quote_analyzer = QuoteAnalyzer(
self.ai_manager, self.memory_manager, self.db_manager, self.settings
)
await self.quote_analyzer.initialize()
# Response scheduling system
assert self.db_manager is not None
assert self.ai_manager is not None
self.response_scheduler = ResponseScheduler(
self.db_manager,
self.ai_manager,
self.settings,
self, # Pass bot instance
)
await self.response_scheduler.initialize()
# User-assisted tagging service
from services.interaction.user_assisted_tagging import \
UserAssistedTaggingService
self.user_tagging = UserAssistedTaggingService(
self, self.db_manager, self.speaker_diarization, self.transcription_service, self.consent_manager
)
await self.user_tagging.initialize()
# Quote explanation service
from services.quotes.quote_explanation import QuoteExplanationService
self.quote_explanation = QuoteExplanationService(
self, self.db_manager, self.ai_manager
)
await self.quote_explanation.initialize()
# Feedback system
from services.interaction.feedback_system import FeedbackSystem
self.feedback_system = FeedbackSystem(self, self.db_manager, self.ai_manager, self.consent_manager)
await self.feedback_system.initialize()
async def _initialize_utilities(self):
"""Initialize utility components"""
logger.info("Initializing utilities...")
# Metrics collection
self.metrics = MetricsCollector()
# Audio processing utilities
self.audio_processor = AudioProcessor()
async def _load_cogs(self):
"""Load all Discord command cogs"""
logger.info("Loading command cogs...")
cogs = [
"cogs.voice_cog",
"cogs.quotes_cog",
"cogs.consent_cog",
"cogs.admin_cog",
"cogs.tasks_cog",
]
for cog in cogs:
try:
await self.load_extension(cog)
logger.info(f"Loaded cog: {cog}")
except Exception as e:
logger.error(f"Failed to load cog {cog}: {e}")
# Load slash commands cog with services
try:
from commands import slash_commands
await slash_commands.setup(
self,
db_manager=self.db_manager,
consent_manager=self.consent_manager,
memory_manager=self.memory_manager,
audio_recorder=self.audio_recorder,
speaker_recognition=self.speaker_recognition,
user_tagging=self.user_tagging,
quote_analyzer=self.quote_analyzer,
tts_service=self.tts_service,
quote_explanation=self.quote_explanation,
feedback_system=self.feedback_system,
health_monitor=self.health_monitor,
)
logger.info("Loaded slash commands cog with services")
except Exception as e:
logger.error(f"Failed to load slash commands cog: {e}")
async def _start_background_tasks(self):
"""Start background processing tasks"""
logger.info("Starting background tasks...")
# Audio processing task
self.loop.create_task(self._audio_processing_worker())
# Response scheduler task
if self.response_scheduler:
self.loop.create_task(self.response_scheduler.start_scheduler())
# Metrics collection task
if self.metrics:
self.loop.create_task(self.metrics.start_collection())
# Health check task
self.loop.create_task(self._health_check_worker())
async def _audio_processing_worker(self):
"""Background worker for processing audio clips"""
logger.info("Audio processing worker started")
while not self.is_closed():
try:
# Wait for audio clips to process
audio_clip = await self.processing_queue.get()
if audio_clip is None: # Shutdown signal
break
# Process the audio clip
await self._process_audio_clip(audio_clip)
# Mark task as done
self.processing_queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in audio processing worker: {e}")
await asyncio.sleep(1) # Brief pause before retrying
async def _process_audio_clip(self, audio_clip):
"""Process a single audio clip through the full pipeline"""
try:
logger.info(f"Processing audio clip: {audio_clip.id}")
# Step 1: Get speaker diarization result (already performed in audio recorder)
diarization_result = getattr(audio_clip, "diarization_result", None)
# Step 2: Transcribe audio with speaker segment mapping
if self.transcription_service and hasattr(
self.transcription_service, "transcribe_audio_clip"
):
transcription_session = await self.transcription_service.transcribe_audio_clip( # type: ignore
audio_clip.file_path,
audio_clip.guild_id,
audio_clip.channel_id,
diarization_result,
audio_clip.id,
)
else:
transcription_session = None
if not transcription_session:
logger.warning(
f"No transcription results for audio clip: {audio_clip.id}"
)
return
# Step 2.5: Analyze laughter in the audio clip
if self.laughter_detector and hasattr(
self.laughter_detector, "detect_laughter"
):
laughter_analysis = await self.laughter_detector.detect_laughter( # type: ignore
audio_clip.file_path, audio_clip.participants
)
else:
laughter_analysis = None
# Step 3: Process each transcribed segment for quote analysis
quote_candidates = []
for segment in transcription_session.transcribed_segments:
if segment.is_quote_candidate and segment.text.strip():
# Find overlapping laughter for this segment
segment_laughter = self._get_segment_laughter_data(
segment, laughter_analysis
)
# Step 4: Quote analysis and scoring with laughter data
if self.quote_analyzer:
quote_data = await self.quote_analyzer.analyze_quote(
segment.text,
segment.speaker_label,
{
"user_id": segment.user_id,
"confidence": segment.confidence,
"timestamp": getattr(transcription_session, 'timestamp', 0.0) if isinstance(getattr(transcription_session, 'timestamp', None), (int, float)) else (transcription_session.timestamp.timestamp() if hasattr(transcription_session.timestamp, 'timestamp') else 0.0),
"duration": segment.end_time - segment.start_time,
"word_count": segment.word_count,
"language": segment.language,
"channel_id": audio_clip.channel_id,
"guild_id": audio_clip.guild_id,
"laughter_duration": segment_laughter.get(
"duration", 0
),
"laughter_intensity": segment_laughter.get(
"intensity", 0
),
"laughter_data": segment_laughter,
},
)
else:
quote_data = None
if quote_data:
quote_candidates.append(quote_data)
# Step 5: Process quote scores and determine responses
if self.response_scheduler:
for quote_data in quote_candidates:
await self.response_scheduler.process_quote_score(quote_data)
# Update metrics
laughter_info = {
"total_laughter_duration": (
laughter_analysis.total_laughter_duration
if laughter_analysis
else 0
),
"laughter_segments": (
len(laughter_analysis.laughter_segments) if laughter_analysis else 0
),
}
if self.metrics:
self.metrics.increment(
"audio_clips_processed",
{
"guild_id": str(audio_clip.guild_id),
"segments_processed": str(
len(transcription_session.transcribed_segments)
),
"quote_candidates": str(len(quote_candidates)),
"laughter_detected": str(
laughter_analysis is not None
and laughter_analysis.total_laughter_duration > 0
),
},
)
logger.info(
f"Processed audio clip {audio_clip.id}: "
f"{len(transcription_session.transcribed_segments)} segments, "
f"{len(quote_candidates)} quote candidates, "
f"{laughter_info['total_laughter_duration']:.2f}s laughter"
)
except Exception as e:
logger.error(f"Failed to process audio clip {audio_clip.id}: {e}")
if self.metrics:
self.metrics.increment(
"audio_processing_errors",
{"guild_id": str(getattr(audio_clip, "guild_id", "unknown"))},
)
def _get_segment_laughter_data(
self, segment, laughter_analysis
) -> Dict[str, float]:
"""Get laughter data that overlaps with a transcribed segment"""
try:
if not laughter_analysis or not laughter_analysis.laughter_segments:
return {"duration": 0.0, "intensity": 0.0, "count": 0}
overlapping_laughter = []
segment_start = segment.start_time
segment_end = segment.end_time
for laughter_seg in laughter_analysis.laughter_segments:
# Check for overlap
overlap_start = max(segment_start, laughter_seg.start_time)
overlap_end = min(segment_end, laughter_seg.end_time)
if overlap_start < overlap_end: # There is overlap
overlap_duration = overlap_end - overlap_start
overlapping_laughter.append(
{
"duration": overlap_duration,
"intensity": laughter_seg.intensity,
"confidence": laughter_seg.confidence,
}
)
if not overlapping_laughter:
return {"duration": 0.0, "intensity": 0.0, "count": 0}
# Aggregate overlapping laughter
total_duration = sum(laugh["duration"] for laugh in overlapping_laughter)
weighted_intensity = (
sum(
laugh["duration"] * laugh["intensity"]
for laugh in overlapping_laughter
)
/ total_duration
if total_duration > 0
else 0
)
return {
"duration": total_duration,
"intensity": weighted_intensity,
"count": len(overlapping_laughter),
}
except Exception as e:
logger.error(f"Failed to get segment laughter data: {e}")
return {"duration": 0.0, "intensity": 0.0, "count": 0}
async def _health_check_worker(self):
"""Background worker for health monitoring"""
while not self.is_closed():
try:
# Check system health
health_status = await self._check_system_health()
# Update metrics
if self.metrics:
overall_score = health_status.get("overall_score")
if isinstance(overall_score, (int, float)):
self.metrics.set_gauge("system_health", overall_score)
# Log warnings for unhealthy components
if isinstance(health_status.get("components"), dict):
for component, status in health_status["components"].items(): # type: ignore
if isinstance(status, dict) and status.get("healthy") is False:
logger.warning(
f"Component {component} is unhealthy: {status.get('error', 'Unknown error')}"
)
await asyncio.sleep(30) # Check every 30 seconds
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in health check worker: {e}")
await asyncio.sleep(30)
async def _check_system_health(self) -> Dict[str, object]:
"""Check health of all system components"""
components: Dict[str, object] = {}
# Database health
try:
if self.db_manager:
await self.db_manager.check_health()
components["database"] = {"healthy": True, "latency": 0}
except Exception as e:
components["database"] = {"healthy": False, "error": str(e)}
# AI providers health
try:
if self.ai_manager:
provider_health = await self.ai_manager.check_health()
if isinstance(provider_health, dict):
components["ai_providers"] = provider_health
else:
components["ai_providers"] = {"healthy": True}
else:
components["ai_providers"] = {
"healthy": False,
"error": "AI manager not initialized",
}
except Exception as e:
components["ai_providers"] = {"healthy": False, "error": str(e)}
# Memory system health
try:
if self.memory_manager:
memory_health = await self.memory_manager.check_health()
if isinstance(memory_health, dict):
components["memory_system"] = memory_health
else:
components["memory_system"] = {"healthy": True}
else:
components["memory_system"] = {
"healthy": False,
"error": "Memory manager not initialized",
}
except Exception as e:
components["memory_system"] = {"healthy": False, "error": str(e)}
# Transcription service health
try:
if self.transcription_service and hasattr(
self.transcription_service, "check_health"
):
transcription_health = await self.transcription_service.check_health() # type: ignore
if isinstance(transcription_health, dict):
components["transcription_service"] = transcription_health
else:
components["transcription_service"] = {"healthy": True}
else:
components["transcription_service"] = {
"healthy": False,
"error": "Transcription service not initialized",
}
except Exception as e:
components["transcription_service"] = {"healthy": False, "error": str(e)}
# Quote analyzer health
try:
if self.quote_analyzer:
analyzer_health = await self.quote_analyzer.check_health()
if isinstance(analyzer_health, dict):
components["quote_analyzer"] = analyzer_health
else:
components["quote_analyzer"] = {"healthy": True}
else:
components["quote_analyzer"] = {
"healthy": False,
"error": "Quote analyzer not initialized",
}
except Exception as e:
components["quote_analyzer"] = {"healthy": False, "error": str(e)}
# Laughter detector health
try:
if self.laughter_detector and hasattr(
self.laughter_detector, "check_health"
):
laughter_health = await self.laughter_detector.check_health() # type: ignore
if isinstance(laughter_health, dict):
components["laughter_detector"] = laughter_health
else:
components["laughter_detector"] = {"healthy": True}
else:
components["laughter_detector"] = {
"healthy": False,
"error": "Laughter detector not initialized",
}
except Exception as e:
components["laughter_detector"] = {"healthy": False, "error": str(e)}
# Calculate overall health score
healthy_count = sum(
1
for comp in components.values()
if isinstance(comp, dict) and comp.get("healthy", False)
)
overall_score = healthy_count / len(components) if components else 0.0
return {
"overall_score": overall_score,
"components": components,
"timestamp": asyncio.get_event_loop().time(),
}
async def on_ready(self):
"""Called when the bot is ready and connected to Discord"""
bot_logger.info("Bot connected to Discord", {
'bot_user': str(self.user),
'bot_id': self.user.id if self.user else None,
'guild_count': len(self.guilds),
'user_count': sum(guild.member_count for guild in self.guilds if guild.member_count),
'latency_ms': round(self.latency * 1000, 2)
})
# Log guild information in debug mode
if self.settings.debug_mode:
for guild in self.guilds:
bot_logger.debug("Connected to guild", {
'guild_id': guild.id,
'guild_name': guild.name,
'member_count': guild.member_count,
'voice_channels': len([c for c in guild.channels if isinstance(c, discord.VoiceChannel)])
})
# Set bot activity status
activity = discord.Activity(
type=discord.ActivityType.listening, name="for memorable quotes 🎤"
)
await self.change_presence(activity=activity)
bot_logger.info("Bot presence updated", {'activity': 'listening for memorable quotes'})
self.ready = True
bot_logger.info("Bot is fully ready and operational")
async def on_voice_state_update(self, member: discord.Member, before: discord.VoiceState, after: discord.VoiceState):
"""Log voice channel join/leave events for debugging."""
# Skip bot's own voice state changes unless in debug mode
if member == self.user and not self.settings.debug_mode:
return
context = {
'user_id': member.id,
'username': member.display_name,
'guild_id': member.guild.id
}
# User joined a voice channel
if before.channel is None and after.channel is not None:
context.update({
'action': 'joined',
'channel_id': after.channel.id,
'channel_name': after.channel.name,
'member_count': len(after.channel.members)
})
bot_logger.info("User joined voice channel", context)
# Check if recording should start automatically
if self.audio_recorder and hasattr(self.audio_recorder, 'should_auto_record'):
try:
should_record = await self.audio_recorder.should_auto_record(after.channel)
if should_record:
bot_logger.info("Auto-recording triggered by voice join", context)
except Exception as e:
bot_logger.warning("Failed to check auto-record condition", {
**context, 'error': str(e)
})
# User left a voice channel
elif before.channel is not None and after.channel is None:
context.update({
'action': 'left',
'channel_id': before.channel.id,
'channel_name': before.channel.name,
'remaining_members': len(before.channel.members) - 1 # -1 because member hasn't left yet
})
bot_logger.info("User left voice channel", context)
# User moved between voice channels
elif before.channel != after.channel and before.channel and after.channel:
context.update({
'action': 'moved',
'from_channel_id': before.channel.id,
'from_channel_name': before.channel.name,
'to_channel_id': after.channel.id,
'to_channel_name': after.channel.name
})
bot_logger.info("User moved between voice channels", context)
# Voice state changes (mute, deafen, etc.)
elif before.channel == after.channel and before.channel:
changes = []
if before.self_mute != after.self_mute:
changes.append(f"self_mute: {before.self_mute} -> {after.self_mute}")
if before.self_deaf != after.self_deaf:
changes.append(f"self_deaf: {before.self_deaf} -> {after.self_deaf}")
if before.mute != after.mute:
changes.append(f"server_mute: {before.mute} -> {after.mute}")
if before.deaf != after.deaf:
changes.append(f"server_deaf: {before.deaf} -> {after.deaf}")
if changes:
context.update({
'action': 'state_change',
'channel_id': after.channel.id,
'channel_name': after.channel.name,
'changes': ' | '.join(changes)
})
bot_logger.debug("Voice state changed", context)
async def on_guild_join(self, guild: discord.Guild):
"""Log when bot joins a new guild."""
bot_logger.info("Bot joined new guild", {
'guild_id': guild.id,
'guild_name': guild.name,
'member_count': guild.member_count,
'voice_channels': len([c for c in guild.channels if isinstance(c, discord.VoiceChannel)]),
'text_channels': len([c for c in guild.channels if isinstance(c, discord.TextChannel)])
})
if self.metrics:
self.metrics.increment("guild_joins")
async def on_guild_remove(self, guild: discord.Guild):
"""Log when bot leaves a guild."""
bot_logger.info("Bot removed from guild", {
'guild_id': guild.id,
'guild_name': guild.name
})
if self.metrics:
self.metrics.increment("guild_leaves")
async def on_app_command_completion(self, interaction: discord.Interaction, command: discord.app_commands.Command):
"""Log successful slash command executions."""
context = {
'command': command.name,
'user_id': interaction.user.id,
'username': interaction.user.display_name,
'guild_id': interaction.guild.id if interaction.guild else None,
'channel_id': interaction.channel.id if interaction.channel else None
}
# Log command parameters if in debug mode
if self.settings.debug_mode and hasattr(interaction, 'namespace'):
params = {}
for param, value in vars(interaction.namespace).items():
# Sanitize sensitive information
if any(sensitive in param.lower() for sensitive in ['token', 'key', 'password', 'secret']):
params[param] = '[REDACTED]'
else:
params[param] = str(value) if value is not None else None
context['parameters'] = params
bot_logger.info("Slash command completed successfully", context)
if self.metrics:
self.metrics.increment("commands_completed", {'command': command.name})
async def on_app_command_error(self, interaction: discord.Interaction, error: discord.app_commands.AppCommandError):
"""Log slash command errors with full context."""
context = {
'command': interaction.command.name if interaction.command else 'unknown',
'user_id': interaction.user.id,
'username': interaction.user.display_name,
'guild_id': interaction.guild.id if interaction.guild else None,
'channel_id': interaction.channel.id if interaction.channel else None,
'error_type': type(error).__name__
}
bot_logger.error("Slash command failed", context, exc_info=True)
if self.metrics:
self.metrics.increment("app_command_errors", {
'command': interaction.command.name if interaction.command else 'unknown',
'error_type': type(error).__name__
})
async def on_message(self, message: discord.Message):
"""Log message events for debugging (only in verbose mode)."""
# Skip bot messages and only log in verbose mode
if message.author.bot or not self.settings.verbose_logging:
return
# Log message in channels where bot is recording
if self.active_recordings and message.channel.id in [r['channel_id'] for r in self.active_recordings.values()]:
bot_logger.debug("Message in recorded channel", {
'user_id': message.author.id,
'username': message.author.display_name,
'channel_id': message.channel.id,
'guild_id': message.guild.id if message.guild else None,
'message_length': len(message.content),
'has_attachments': len(message.attachments) > 0
})
async def on_error(self, event, *args, **kwargs):
"""Handle uncaught errors"""
# Extract context from event arguments
context = {'event': event}
# Try to extract useful context from common event args
if args:
first_arg = args[0]
if hasattr(first_arg, 'guild') and first_arg.guild:
context['guild_id'] = first_arg.guild.id
if hasattr(first_arg, 'channel') and first_arg.channel:
context['channel_id'] = first_arg.channel.id
if hasattr(first_arg, 'author') and first_arg.author:
context['user_id'] = first_arg.author.id
bot_logger.error(f"Uncaught error in Discord event", context, exc_info=True)
if self.metrics:
self.metrics.increment("bot_errors", {'event': event})
async def on_command_error(
self, context: commands.Context[BotT], exception: commands.CommandError, /
) -> None:
"""Handle command errors"""
if isinstance(exception, commands.CommandNotFound):
return # Ignore unknown commands
error_context = {
'command': context.command.name if context.command else 'unknown',
'user_id': context.author.id,
'guild_id': context.guild.id if context.guild else None,
'channel_id': context.channel.id,
'error_type': type(exception).__name__,
'message_content': context.message.content[:100] if hasattr(context, 'message') else None
}
bot_logger.error(f"Command execution failed", error_context, exc_info=True)
if self.metrics:
self.metrics.increment("command_errors", {
'command': context.command.name if context.command else 'unknown',
'error_type': type(exception).__name__
})
# Send user-friendly error message
embed = discord.Embed(
title="❌ Command Error",
description=f"An error occurred: {str(exception)[:200]}",
color=0xFF0000,
)
try:
# Try application command response first, fall back to regular message
try:
await context.respond(embed=embed, ephemeral=True) # type: ignore
except AttributeError:
await context.send(embed=embed)
bot_logger.debug("Error response sent to user", {'user_id': context.author.id})
except Exception as send_error:
bot_logger.warning("Failed to send error response", {
'user_id': context.author.id,
'send_error': str(send_error)
})
async def close(self):
"""Cleanup when bot shuts down"""
start_time = time.perf_counter()
bot_logger.info("Initiating bot shutdown")
try:
# Stop background tasks
if hasattr(self, "processing_queue"):
bot_logger.info("Stopping background processing tasks")
await self.processing_queue.put(None) # Signal shutdown
# Close services in reverse initialization order
services_to_close = [
('audio_recorder', 'cleanup'),
('transcription_service', 'close'),
('quote_analyzer', 'close'),
('laughter_detector', 'close'),
('tts_service', 'close'),
('speaker_diarization', 'close'),
('speaker_recognition', 'close'),
('user_tagging', 'close'),
('feedback_system', 'close'),
('health_monitor', 'close'),
('response_scheduler', 'stop'),
('memory_manager', 'close'),
('db_manager', 'close'),
('ai_manager', 'close')
]
for service_name, method_name in services_to_close:
service = getattr(self, service_name, None)
if service and hasattr(service, method_name):
try:
close_start = time.perf_counter()
await getattr(service, method_name)()
close_time = round((time.perf_counter() - close_start) * 1000, 2)
bot_logger.debug(f"Service {service_name} closed", {'duration_ms': close_time})
except Exception as e:
bot_logger.warning(f"Error closing {service_name}", {
'service': service_name,
'error': str(e)
})
total_time = round((time.perf_counter() - start_time) * 1000, 2)
bot_logger.info("Bot shutdown completed", {'shutdown_duration_ms': total_time})
except Exception as e:
total_time = round((time.perf_counter() - start_time) * 1000, 2)
bot_logger.error("Error during shutdown", {
'shutdown_duration_ms': total_time,
'error_type': type(e).__name__
}, exc_info=True)
await super().close()
async def main():
"""Main function to run the bot"""
# Setup signal handlers for graceful shutdown
bot = QuoteBot()
def signal_handler(signum, _frame):
bot_logger.info(f"Received shutdown signal {signum}")
asyncio.create_task(bot.close())
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
try:
# Start the bot
discord_token = os.getenv("DISCORD_TOKEN")
if not discord_token:
raise ValueError("DISCORD_TOKEN environment variable is required")
bot_logger.info("Starting bot with Discord API")
start_time = time.perf_counter()
await bot.start(discord_token)
except KeyboardInterrupt:
bot_logger.info("Received keyboard interrupt, shutting down gracefully")
except Exception as e:
bot_logger.error(f"Fatal error during bot execution", {
'error_type': type(e).__name__,
'error_message': str(e)
}, exc_info=True)
raise
finally:
if not bot.is_closed():
final_close_start = time.perf_counter()
await bot.close()
final_close_time = round((time.perf_counter() - final_close_start) * 1000, 2)
bot_logger.info(f"Final cleanup completed", {'duration_ms': final_close_time})
if __name__ == "__main__":
try:
# Initialize basic logger for startup messages
startup_logger = logging.getLogger("disbord.startup")
startup_logger.info("Discord Quote Bot starting up...")
asyncio.run(main())
except KeyboardInterrupt:
print("\nBot stopped by user")
except Exception as e:
print(f"Fatal error during startup: {e}")
logging.getLogger("disbord.startup").error("Fatal startup error", exc_info=True)
sys.exit(1)
finally:
print("Discord Quote Bot shutdown complete")