fix: replace deprecated datetime.utcnow() with timezone-aware datetime.now(timezone.utc)

- Add timezone import from datetime module
- Replace all datetime.utcnow() calls with datetime.now(timezone.utc)
- Fix undefined response_queue attribute reference (use pending_responses)
- Fix schedule_custom_response method to match ScheduledResponse dataclass
- Add placeholder methods to prevent runtime errors
- Maintain all existing functionality with modern timezone handling

This resolves deprecation warnings and runtime attribute errors while
ensuring all datetime operations are timezone-aware.
This commit is contained in:
2025-08-27 16:42:54 -04:00
parent cca4c02cd7
commit 87daf59a01

View File

@@ -9,23 +9,28 @@ Manages configurable response system with three threshold levels:
import asyncio
import logging
from datetime import datetime, timedelta, time as dt_time
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
from datetime import datetime, timezone
from datetime import time as dt_time
from datetime import timedelta
from enum import Enum
from typing import Any, Dict, List, Optional
import discord
from discord.ext import commands
from core.database import DatabaseManager
from core.ai_manager import AIProviderManager
from ..quotes.quote_analyzer import QuoteAnalysis
from config.settings import Settings
from core.ai_manager import AIProviderManager
from core.database import DatabaseManager
from ..quotes.quote_analyzer import QuoteAnalysis
logger = logging.getLogger(__name__)
class ResponseType(Enum):
"""Types of automated responses"""
REALTIME = "realtime"
ROTATION = "rotation"
DAILY = "daily"
@@ -34,6 +39,7 @@ class ResponseType(Enum):
@dataclass
class ScheduledResponse:
"""Scheduled response data"""
response_id: str
guild_id: int
channel_id: int
@@ -48,7 +54,7 @@ class ScheduledResponse:
class ResponseScheduler:
"""
Manages automated responses based on quote scores and timing
Features:
- Three-tier response system (realtime, rotation, daily)
- Configurable thresholds and timing
@@ -56,91 +62,112 @@ class ResponseScheduler:
- Rate limiting and cooldown management
- Discord channel management
"""
def __init__(self, db_manager: DatabaseManager, ai_manager: AIProviderManager,
settings: Settings, bot: Optional[discord.Bot] = None):
def __init__(
self,
db_manager: DatabaseManager,
ai_manager: AIProviderManager,
settings: Settings,
bot: Optional["commands.Bot"] = None,
):
self.db_manager = db_manager
self.ai_manager = ai_manager
self.settings = settings
self.bot = bot
# Response thresholds
self.thresholds = {
ResponseType.REALTIME: settings.quote_threshold_realtime,
ResponseType.ROTATION: settings.quote_threshold_rotation,
ResponseType.DAILY: settings.quote_threshold_daily
ResponseType.DAILY: settings.quote_threshold_daily,
}
# Timing configuration
self.rotation_interval = timedelta(hours=6)
self.daily_time = dt_time(20, 0) # 8 PM daily summary
self.realtime_cooldown = timedelta(minutes=5) # Cooldown between realtime responses
self.realtime_cooldown = timedelta(
minutes=5
) # Cooldown between realtime responses
# Response queues and state
self.pending_responses: List[ScheduledResponse] = []
self.last_realtime_response: Dict[int, datetime] = {} # guild_id -> last response time
self.last_realtime_response: Dict[int, datetime] = (
{}
) # guild_id -> last response time
self.last_rotation_response: Dict[int, datetime] = {}
self.last_daily_response: Dict[int, datetime] = {}
# Background tasks
self._scheduler_task = None
self._rotation_task = None
self._daily_task = None
# Statistics
self.responses_sent = {"realtime": 0, "rotation": 0, "daily": 0}
self._initialized = False
async def _load_pending_responses(self):
"""Load pending responses from database"""
try:
# In a real implementation, this would query the database
# for any pending responses that need to be rescheduled
logger.debug("Loading pending responses from database...")
# For now, just initialize empty pending responses
self.pending_responses = []
except Exception as e:
logger.error(f"Failed to load pending responses: {e}")
raise
async def initialize(self):
"""Initialize the response scheduler"""
if self._initialized:
return
try:
logger.info("Initializing response scheduler...")
# Load pending responses from database
await self._load_pending_responses()
self._initialized = True
logger.info("Response scheduler initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize response scheduler: {e}")
raise
async def start_scheduler(self):
"""Start the background scheduling tasks"""
if not self._initialized:
await self.initialize()
try:
logger.info("Starting response scheduler tasks...")
# Start main scheduler task
self._scheduler_task = asyncio.create_task(self._scheduler_worker())
# Start rotation task
self._rotation_task = asyncio.create_task(self._rotation_worker())
# Start daily summary task
self._daily_task = asyncio.create_task(self._daily_worker())
logger.info("Response scheduler tasks started")
except Exception as e:
logger.error(f"Failed to start scheduler tasks: {e}")
raise
async def process_quote_score(self, quote_analysis: QuoteAnalysis):
"""Process a quote analysis and determine appropriate response"""
try:
if not self._initialized:
await self.initialize()
overall_score = quote_analysis.overall_score
# Determine response type based on score
if overall_score >= self.thresholds[ResponseType.REALTIME]:
await self._schedule_realtime_response(quote_analysis)
@@ -148,278 +175,322 @@ class ResponseScheduler:
await self._queue_for_rotation(quote_analysis)
elif overall_score >= self.thresholds[ResponseType.DAILY]:
await self._queue_for_daily(quote_analysis)
logger.debug(f"Processed quote score {overall_score:.2f} for response scheduling")
logger.debug(
f"Processed quote score {overall_score:.2f} for response scheduling"
)
except Exception as e:
logger.error(f"Failed to process quote score: {e}")
async def _schedule_realtime_response(self, quote_analysis: QuoteAnalysis):
"""Schedule immediate response for exceptional quotes"""
try:
guild_id = quote_analysis.guild_id
# Check cooldown
if guild_id in self.last_realtime_response:
time_since_last = datetime.utcnow() - self.last_realtime_response[guild_id]
time_since_last = (
datetime.now(timezone.utc) - self.last_realtime_response[guild_id]
)
if time_since_last < self.realtime_cooldown:
logger.info(f"Realtime response on cooldown for guild {guild_id}")
return
# Generate response content
content = await self._generate_response_content(quote_analysis, ResponseType.REALTIME)
content = await self._generate_response_content(
quote_analysis, ResponseType.REALTIME
)
if not content:
logger.warning("Failed to generate realtime response content")
return
# Create scheduled response
response = ScheduledResponse(
response_id=f"realtime_{quote_analysis.quote_id}_{int(datetime.utcnow().timestamp())}",
response_id=f"realtime_{quote_analysis.quote_id}_{int(datetime.now(timezone.utc).timestamp())}",
guild_id=guild_id,
channel_id=quote_analysis.channel_id,
response_type=ResponseType.REALTIME,
quote_analysis=quote_analysis,
scheduled_time=datetime.utcnow() + timedelta(seconds=5), # Small delay
scheduled_time=datetime.now(timezone.utc) + timedelta(seconds=5), # Small delay
content=content,
embed_data=await self._create_response_embed(quote_analysis, ResponseType.REALTIME)
embed_data=await self._create_response_embed(
quote_analysis, ResponseType.REALTIME
),
)
# Add to pending responses
self.pending_responses.append(response)
# Store in database
await self._store_scheduled_response(response)
# Update last response time
self.last_realtime_response[guild_id] = datetime.utcnow()
logger.info(f"Scheduled realtime response for quote {quote_analysis.quote_id}")
self.last_realtime_response[guild_id] = datetime.now(timezone.utc)
logger.info(
f"Scheduled realtime response for quote {quote_analysis.quote_id}"
)
except Exception as e:
logger.error(f"Failed to schedule realtime response: {e}")
async def _queue_for_rotation(self, quote_analysis: QuoteAnalysis):
"""Queue quote for next rotation response"""
try:
await self.db_manager.execute_query("""
await self.db_manager.execute_query(
"""
INSERT INTO rotation_queue (guild_id, channel_id, quote_id, quote_score, queued_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (quote_id) DO NOTHING
""", quote_analysis.guild_id, quote_analysis.channel_id,
quote_analysis.quote_id, quote_analysis.overall_score, datetime.utcnow())
""",
quote_analysis.guild_id,
quote_analysis.channel_id,
quote_analysis.quote_id,
quote_analysis.overall_score,
datetime.now(timezone.utc),
)
logger.debug(f"Queued quote {quote_analysis.quote_id} for rotation")
except Exception as e:
logger.error(f"Failed to queue for rotation: {e}")
async def _queue_for_daily(self, quote_analysis: QuoteAnalysis):
"""Queue quote for daily summary"""
try:
await self.db_manager.execute_query("""
await self.db_manager.execute_query(
"""
INSERT INTO daily_queue (guild_id, channel_id, quote_id, quote_score, queued_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (quote_id) DO NOTHING
""", quote_analysis.guild_id, quote_analysis.channel_id,
quote_analysis.quote_id, quote_analysis.overall_score, datetime.utcnow())
""",
quote_analysis.guild_id,
quote_analysis.channel_id,
quote_analysis.quote_id,
quote_analysis.overall_score,
datetime.now(timezone.utc),
)
logger.debug(f"Queued quote {quote_analysis.quote_id} for daily summary")
except Exception as e:
logger.error(f"Failed to queue for daily: {e}")
async def _scheduler_worker(self):
"""Main scheduler worker for processing pending responses"""
logger.info("Response scheduler worker started")
while True:
try:
current_time = datetime.utcnow()
current_time = datetime.now(timezone.utc)
# Process pending responses
responses_to_send = [
r for r in self.pending_responses
r
for r in self.pending_responses
if not r.sent and r.scheduled_time <= current_time
]
for response in responses_to_send:
try:
await self._send_response(response)
response.sent = True
self.responses_sent[response.response_type.value] += 1
except Exception as e:
logger.error(f"Failed to send response {response.response_id}: {e}")
logger.error(
f"Failed to send response {response.response_id}: {e}"
)
# Clean up sent responses
self.pending_responses = [r for r in self.pending_responses if not r.sent]
self.pending_responses = [
r for r in self.pending_responses if not r.sent
]
# Sleep for 30 seconds
await asyncio.sleep(30)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in scheduler worker: {e}")
await asyncio.sleep(30)
async def _rotation_worker(self):
"""Worker for 6-hour rotation responses"""
logger.info("Rotation worker started")
while True:
try:
# Calculate next rotation time
now = datetime.utcnow()
now = datetime.now(timezone.utc)
next_rotation = now.replace(minute=0, second=0, microsecond=0)
# Round to next 6-hour mark
hour = (next_rotation.hour // 6 + 1) * 6
if hour >= 24:
next_rotation = next_rotation.replace(day=next_rotation.day + 1, hour=hour - 24)
next_rotation = next_rotation.replace(
day=next_rotation.day + 1, hour=hour - 24
)
else:
next_rotation = next_rotation.replace(hour=hour)
# Wait until next rotation time
wait_time = (next_rotation - now).total_seconds()
logger.info(f"Next rotation in {wait_time/3600:.1f} hours")
await asyncio.sleep(wait_time)
# Process rotation for all guilds
await self._process_rotation_responses()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in rotation worker: {e}")
await asyncio.sleep(3600) # Wait 1 hour on error
async def _daily_worker(self):
"""Worker for daily summary responses"""
logger.info("Daily summary worker started")
while True:
try:
# Calculate next daily summary time
now = datetime.utcnow()
now = datetime.now(timezone.utc)
next_daily = now.replace(
hour=self.daily_time.hour,
minute=self.daily_time.minute,
second=0,
microsecond=0
microsecond=0,
)
if next_daily <= now:
next_daily += timedelta(days=1)
# Wait until daily summary time
wait_time = (next_daily - now).total_seconds()
logger.info(f"Next daily summary in {wait_time/3600:.1f} hours")
await asyncio.sleep(wait_time)
# Process daily summaries for all guilds
await self._process_daily_summaries()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in daily worker: {e}")
await asyncio.sleep(3600) # Wait 1 hour on error
async def _process_rotation_responses(self):
"""Process 6-hour rotation responses for all guilds"""
try:
# Get guilds with queued rotation quotes
guilds = await self.db_manager.execute_query("""
guilds = await self.db_manager.execute_query(
"""
SELECT DISTINCT guild_id FROM rotation_queue
WHERE sent = FALSE
""", fetch_all=True)
""",
fetch_all=True,
)
for guild_row in guilds:
guild_id = guild_row['guild_id']
guild_id = guild_row["guild_id"]
await self._create_rotation_response(guild_id)
logger.info(f"Processed rotation responses for {len(guilds)} guilds")
except Exception as e:
logger.error(f"Failed to process rotation responses: {e}")
async def _create_rotation_response(self, guild_id: int):
"""Create rotation response for a specific guild"""
try:
# Get top quotes from rotation queue
quotes = await self.db_manager.execute_query("""
quotes = await self.db_manager.execute_query(
"""
SELECT q.*, rq.quote_score FROM quotes q
JOIN rotation_queue rq ON q.id = rq.quote_id
WHERE rq.guild_id = $1 AND rq.sent = FALSE
ORDER BY rq.quote_score DESC
LIMIT 3
""", guild_id, fetch_all=True)
""",
guild_id,
fetch_all=True,
)
if not quotes:
return
# Generate rotation content
content = await self._generate_rotation_content(quotes)
embed_data = await self._create_rotation_embed(quotes)
# Get primary channel for this guild
channel_id = await self._get_primary_channel(guild_id)
if channel_id:
# Create scheduled response
response = ScheduledResponse(
response_id=f"rotation_{guild_id}_{int(datetime.utcnow().timestamp())}",
response_id=f"rotation_{guild_id}_{int(datetime.now(timezone.utc).timestamp())}",
guild_id=guild_id,
channel_id=channel_id,
response_type=ResponseType.ROTATION,
quote_analysis=None, # Multiple quotes
scheduled_time=datetime.utcnow() + timedelta(seconds=10),
scheduled_time=datetime.now(timezone.utc) + timedelta(seconds=10),
content=content,
embed_data=embed_data
embed_data=embed_data,
)
self.pending_responses.append(response)
# Mark quotes as sent
quote_ids = [q['id'] for q in quotes]
await self.db_manager.execute_query("""
quote_ids = [q["id"] for q in quotes]
await self.db_manager.execute_query(
"""
UPDATE rotation_queue SET sent = TRUE
WHERE quote_id = ANY($1)
""", quote_ids)
""",
quote_ids,
)
except Exception as e:
logger.error(f"Failed to create rotation response for guild {guild_id}: {e}")
logger.error(
f"Failed to create rotation response for guild {guild_id}: {e}"
)
async def _send_response(self, response: ScheduledResponse):
"""Send a scheduled response to Discord"""
try:
if not self.bot:
logger.warning("No bot instance available for sending response")
return
channel = self.bot.get_channel(response.channel_id)
if not channel:
logger.warning(f"Channel {response.channel_id} not found")
return
# Send message with embed if available
if response.embed_data:
embed = discord.Embed.from_dict(response.embed_data)
await channel.send(content=response.content, embed=embed)
else:
await channel.send(content=response.content)
logger.info(f"Sent {response.response_type.value} response to channel {response.channel_id}")
logger.info(
f"Sent {response.response_type.value} response to channel {response.channel_id}"
)
except Exception as e:
logger.error(f"Failed to send response: {e}")
raise
async def _generate_response_content(self, quote_analysis: QuoteAnalysis,
response_type: ResponseType) -> Optional[str]:
async def _generate_response_content(
self, quote_analysis: QuoteAnalysis, response_type: ResponseType
) -> Optional[str]:
"""Generate AI commentary for quote response"""
try:
prompt = f"""Generate a witty Discord response for this memorable quote:
@@ -430,9 +501,9 @@ Response Type: {response_type.value}
Create a brief, engaging comment that acknowledges the humor without overshadowing it.
Keep it under 100 characters. Use emojis sparingly (max 1-2)."""
ai_response = await self.ai_manager.generate_commentary(prompt)
if ai_response.success:
return ai_response.content.strip()
else:
@@ -440,14 +511,14 @@ Keep it under 100 characters. Use emojis sparingly (max 1-2)."""
fallbacks = {
ResponseType.REALTIME: "🔥 That was legendary!",
ResponseType.ROTATION: "📊 Rotation highlights coming up...",
ResponseType.DAILY: "📝 Daily quote digest ready!"
ResponseType.DAILY: "📝 Daily quote digest ready!",
}
return fallbacks.get(response_type, "✨ Memorable quote detected!")
except Exception as e:
logger.error(f"Failed to generate response content: {e}")
return None
async def check_health(self) -> Dict[str, Any]:
"""Check health of response scheduler"""
try:
@@ -455,30 +526,135 @@ Keep it under 100 characters. Use emojis sparingly (max 1-2)."""
"initialized": self._initialized,
"pending_responses": len(self.pending_responses),
"responses_sent": self.responses_sent.copy(),
"scheduler_running": self._scheduler_task and not self._scheduler_task.done(),
"rotation_running": self._rotation_task and not self._rotation_task.done(),
"daily_running": self._daily_task and not self._daily_task.done()
"scheduler_running": self._scheduler_task
and not self._scheduler_task.done(),
"rotation_running": self._rotation_task
and not self._rotation_task.done(),
"daily_running": self._daily_task and not self._daily_task.done(),
}
except Exception as e:
return {"error": str(e), "healthy": False}
async def stop(self):
"""Stop the response scheduler"""
try:
logger.info("Stopping response scheduler...")
# Cancel background tasks
for task in [self._scheduler_task, self._rotation_task, self._daily_task]:
if task and not task.done():
task.cancel()
# Wait for tasks to complete
await asyncio.gather(
self._scheduler_task, self._rotation_task, self._daily_task,
return_exceptions=True
self._scheduler_task,
self._rotation_task,
self._daily_task,
return_exceptions=True,
)
logger.info("Response scheduler stopped")
except Exception as e:
logger.error(f"Error stopping response scheduler: {e}")
logger.error(f"Error stopping response scheduler: {e}")
async def get_status(self) -> Dict[str, Any]:
"""Get detailed status information for the scheduler"""
try:
next_rotation = datetime.now(timezone.utc) + timedelta(
hours=6
) # Placeholder
next_daily = datetime.now(timezone.utc) + timedelta(hours=24) # Placeholder
return {
"is_running": self._scheduler_task and not self._scheduler_task.done(),
"queue_size": len(self.pending_responses),
"next_rotation": next_rotation.timestamp(),
"next_daily": next_daily.timestamp(),
}
except Exception as e:
logger.error(f"Error getting scheduler status: {e}")
return {"is_running": False, "queue_size": 0, "error": str(e)}
async def schedule_custom_response(
self,
guild_id: int,
channel_id: int,
message: str,
scheduled_time: datetime,
requester_id: int,
) -> bool:
"""Schedule a custom response message"""
try:
# Create a custom scheduled response
response = ScheduledResponse(
response_id=f"custom_{guild_id}_{requester_id}_{int(datetime.now(timezone.utc).timestamp())}",
guild_id=guild_id,
channel_id=channel_id,
response_type=ResponseType.REALTIME, # Use existing enum value
quote_analysis=None, # Custom responses don't have quote analysis
scheduled_time=scheduled_time,
content=message,
embed_data=None,
sent=False,
)
# Add to queue
self.pending_responses.append(response)
logger.info(f"Custom response scheduled by user {requester_id}")
return True
except Exception as e:
logger.error(f"Failed to schedule custom response: {e}")
return False
# Placeholder methods for functionality that needs implementation
async def _create_response_embed(
self, quote_analysis: QuoteAnalysis, response_type: ResponseType
) -> Optional[Dict[str, Any]]:
"""Create embed for quote response"""
# TODO: Implement embed creation logic
return None
async def _store_scheduled_response(self, response: ScheduledResponse) -> None:
"""Store scheduled response in database"""
# TODO: Implement database storage
pass
async def _get_primary_channel(self, guild_id: int) -> Optional[int]:
"""Get primary channel ID for guild"""
# TODO: Implement channel selection logic
return None
async def _generate_rotation_content(self, quotes: List[Dict[str, Any]]) -> str:
"""Generate content for rotation response"""
# TODO: Implement rotation content generation
return f"🌟 Top {len(quotes)} quotes from the past 6 hours!"
async def _create_rotation_embed(self, quotes: List[Dict[str, Any]]) -> Optional[Dict[str, Any]]:
"""Create embed for rotation response"""
# TODO: Implement rotation embed creation
return None
async def _process_daily_summaries(self) -> None:
"""Process daily summary responses for all guilds"""
# TODO: Implement daily summary processing
logger.info("Daily summary processing - placeholder implementation")
async def start_tasks(self) -> bool:
"""Start scheduler tasks"""
try:
await self.start_scheduler()
return True
except Exception as e:
logger.error(f"Failed to start scheduler tasks: {e}")
return False
async def stop_tasks(self) -> bool:
"""Stop scheduler tasks"""
try:
await self.stop()
return True
except Exception as e:
logger.error(f"Failed to stop scheduler tasks: {e}")
return False