- Deleted final.md, fix_async_fixtures.py, fix_cog_tests.py, fix_fixture_scoping.py, and run_race_condition_tests.sh as they are no longer needed. - Updated main.py to enhance logging and error handling. - Refactored various components for improved performance and maintainability. - Introduced new logging utilities for better context and performance monitoring. - Ensured all datetime operations are timezone-aware and consistent across the codebase.
1328 lines
52 KiB
Python
1328 lines
52 KiB
Python
"""
|
|
Database Manager for Discord Voice Chat Quote Bot
|
|
|
|
Handles PostgreSQL connections, schema management, and all database operations
|
|
including user consent, quotes, speaker profiles, and feedback tracking.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from contextlib import asynccontextmanager
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import asyncpg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@dataclass
|
|
class QuoteData:
|
|
"""Data structure for quote information"""
|
|
|
|
id: Optional[int] = None
|
|
user_id: Optional[int] = None
|
|
speaker_label: str = ""
|
|
username: Optional[str] = None
|
|
quote: str = ""
|
|
timestamp: Optional[datetime] = None
|
|
guild_id: int = 0
|
|
channel_id: int = 0
|
|
funny_score: float = 0.0
|
|
dark_score: float = 0.0
|
|
silly_score: float = 0.0
|
|
suspicious_score: float = 0.0
|
|
asinine_score: float = 0.0
|
|
overall_score: float = 0.0
|
|
laughter_duration: float = 0.0
|
|
laughter_intensity: float = 0.0
|
|
response_type: str = "none"
|
|
audio_clip_path: Optional[str] = None
|
|
speaker_confidence: float = 0.0
|
|
user_feedback: Optional[int] = None
|
|
created_at: Optional[datetime] = None
|
|
|
|
|
|
@dataclass
|
|
class UserConsent:
|
|
"""Data structure for user consent information"""
|
|
|
|
user_id: int
|
|
guild_id: int
|
|
consent_given: bool = False
|
|
consent_timestamp: Optional[datetime] = None
|
|
global_opt_out: bool = False
|
|
first_name: Optional[str] = None
|
|
created_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
|
|
|
|
@dataclass
|
|
class SpeakerProfile:
|
|
"""Data structure for speaker profile information"""
|
|
|
|
id: Optional[int] = None
|
|
user_id: int = 0
|
|
voice_embedding: Optional[bytes] = None
|
|
enrollment_status: str = "none"
|
|
enrollment_phrase: Optional[str] = None
|
|
personality_summary: Optional[str] = None
|
|
quote_count: int = 0
|
|
avg_humor_score: float = 0.0
|
|
last_seen: Optional[datetime] = None
|
|
training_samples: int = 0
|
|
recognition_accuracy: float = 0.0
|
|
created_at: Optional[datetime] = None
|
|
updated_at: Optional[datetime] = None
|
|
|
|
|
|
class DatabaseManager:
|
|
"""
|
|
PostgreSQL database manager for the Discord Quote Bot
|
|
|
|
Manages connections, schema, and all database operations with proper
|
|
connection pooling, error handling, and transaction management.
|
|
"""
|
|
|
|
def __init__(
|
|
self, database_url: str, pool_min_size: int = 5, pool_max_size: int = 20
|
|
):
|
|
self.database_url = database_url
|
|
self.pool_min_size = pool_min_size
|
|
self.pool_max_size = pool_max_size
|
|
self.pool: Optional[asyncpg.Pool] = None
|
|
self._initialized = False
|
|
|
|
async def initialize(self):
|
|
"""Initialize database connection pool and schema"""
|
|
if self._initialized:
|
|
return
|
|
|
|
try:
|
|
logger.info("Initializing database connection pool...")
|
|
|
|
# Create connection pool
|
|
self.pool = await asyncpg.create_pool(
|
|
self.database_url,
|
|
min_size=self.pool_min_size,
|
|
max_size=self.pool_max_size,
|
|
command_timeout=60,
|
|
server_settings={"jit": "off"}, # Disable JIT for better compatibility
|
|
)
|
|
|
|
# Initialize schema
|
|
await self._initialize_schema()
|
|
|
|
# Run any pending migrations
|
|
await self._run_migrations()
|
|
|
|
self._initialized = True
|
|
logger.info("Database initialization completed successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize database: {e}")
|
|
raise
|
|
|
|
async def close(self):
|
|
"""Close database connection pool"""
|
|
if self.pool:
|
|
await self.pool.close()
|
|
logger.info("Database connection pool closed")
|
|
|
|
@asynccontextmanager
|
|
async def acquire_connection(self):
|
|
"""Context manager for acquiring database connections"""
|
|
if not self.pool:
|
|
raise RuntimeError("Database not initialized")
|
|
|
|
async with self.pool.acquire() as connection:
|
|
yield connection
|
|
|
|
async def execute_query(
|
|
self, query: str, *args, fetch_one: bool = False, fetch_all: bool = False
|
|
):
|
|
"""Execute a database query with proper error handling"""
|
|
try:
|
|
async with self.acquire_connection() as conn:
|
|
if fetch_one:
|
|
return await conn.fetchrow(query, *args)
|
|
elif fetch_all:
|
|
result = await conn.fetch(query, *args)
|
|
query_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
db_logger.debug("Query executed successfully", {
|
|
'query_type': query_type,
|
|
'duration_ms': query_time,
|
|
'result_type': 'multiple_rows',
|
|
'row_count': len(result)
|
|
})
|
|
return result
|
|
else:
|
|
result = await conn.execute(query, *args)
|
|
query_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
db_logger.debug("Query executed successfully", {
|
|
'query_type': query_type,
|
|
'duration_ms': query_time,
|
|
'result_type': 'execute',
|
|
'status': result
|
|
})
|
|
return result
|
|
|
|
except Exception as e:
|
|
query_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
db_logger.error("Database query failed", {
|
|
'query_type': query_type,
|
|
'duration_ms': query_time,
|
|
'arg_count': len(args),
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)[:200],
|
|
'query_preview': query[:100].replace('\n', ' ') if len(query) > 100 else query.replace('\n', ' ')
|
|
}, exc_info=True)
|
|
raise
|
|
|
|
async def _initialize_schema(self):
|
|
"""Initialize database schema with all required tables"""
|
|
schema_queries = [
|
|
# User Consent Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS user_consent (
|
|
user_id BIGINT PRIMARY KEY,
|
|
guild_id BIGINT NOT NULL,
|
|
consent_given BOOLEAN NOT NULL DEFAULT FALSE,
|
|
consent_timestamp TIMESTAMP WITH TIME ZONE,
|
|
global_opt_out BOOLEAN NOT NULL DEFAULT FALSE,
|
|
first_name VARCHAR(100),
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Quotes Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS quotes (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id BIGINT,
|
|
speaker_label VARCHAR(20) NOT NULL,
|
|
username VARCHAR(100),
|
|
quote TEXT NOT NULL,
|
|
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
funny_score DECIMAL(3,1) DEFAULT 0.0,
|
|
dark_score DECIMAL(3,1) DEFAULT 0.0,
|
|
silly_score DECIMAL(3,1) DEFAULT 0.0,
|
|
suspicious_score DECIMAL(3,1) DEFAULT 0.0,
|
|
asinine_score DECIMAL(3,1) DEFAULT 0.0,
|
|
overall_score DECIMAL(4,2) NOT NULL,
|
|
laughter_duration DECIMAL(5,2) DEFAULT 0.0,
|
|
laughter_intensity DECIMAL(3,2) DEFAULT 0.0,
|
|
response_type VARCHAR(20) NOT NULL,
|
|
audio_clip_path VARCHAR(500),
|
|
speaker_confidence DECIMAL(3,2) DEFAULT 0.0,
|
|
user_feedback INTEGER,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Speaker Profiles Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS speaker_profiles (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id BIGINT NOT NULL UNIQUE,
|
|
voice_embedding BYTEA,
|
|
enrollment_status VARCHAR(20) DEFAULT 'none',
|
|
enrollment_phrase VARCHAR(200),
|
|
personality_summary TEXT,
|
|
quote_count INTEGER DEFAULT 0,
|
|
avg_humor_score DECIMAL(3,1) DEFAULT 0.0,
|
|
last_seen TIMESTAMP WITH TIME ZONE,
|
|
training_samples INTEGER DEFAULT 0,
|
|
recognition_accuracy DECIMAL(3,2) DEFAULT 0.0,
|
|
recognition_threshold DECIMAL(3,2) DEFAULT 0.75,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Quote Feedback Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS quote_feedback (
|
|
id SERIAL PRIMARY KEY,
|
|
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
|
|
user_id BIGINT NOT NULL,
|
|
feedback_type VARCHAR(20) NOT NULL,
|
|
feedback_value TEXT NOT NULL,
|
|
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Audio Clips Table (for tracking temporary files)
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS audio_clips (
|
|
id SERIAL PRIMARY KEY,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
file_path VARCHAR(500) NOT NULL,
|
|
duration DECIMAL(5,2) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
processed BOOLEAN DEFAULT FALSE,
|
|
delete_after TIMESTAMP WITH TIME ZONE NOT NULL
|
|
)
|
|
""",
|
|
# Memory Embeddings Table (for long-term memory)
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS memory_embeddings (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id BIGINT NOT NULL,
|
|
content TEXT NOT NULL,
|
|
context_type VARCHAR(50) NOT NULL,
|
|
embedding_vector BYTEA,
|
|
metadata JSONB,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Speaker Diarization Results Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS speaker_diarizations (
|
|
id SERIAL PRIMARY KEY,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
audio_file_path TEXT NOT NULL,
|
|
total_duration DECIMAL(8,3) NOT NULL,
|
|
unique_speakers INTEGER NOT NULL,
|
|
processing_time DECIMAL(6,3) NOT NULL,
|
|
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Individual Speaker Segments Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS speaker_segments (
|
|
id SERIAL PRIMARY KEY,
|
|
diarization_id INTEGER NOT NULL REFERENCES speaker_diarizations(id) ON DELETE CASCADE,
|
|
start_time DECIMAL(8,3) NOT NULL,
|
|
end_time DECIMAL(8,3) NOT NULL,
|
|
speaker_label VARCHAR(100) NOT NULL,
|
|
confidence DECIMAL(4,3) NOT NULL,
|
|
user_id BIGINT,
|
|
needs_tagging BOOLEAN DEFAULT TRUE,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Transcription Sessions Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS transcription_sessions (
|
|
id SERIAL PRIMARY KEY,
|
|
clip_id VARCHAR(100) NOT NULL UNIQUE,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
audio_file_path TEXT NOT NULL,
|
|
total_duration DECIMAL(8,3) NOT NULL,
|
|
processing_time DECIMAL(6,3) NOT NULL,
|
|
ai_provider_used VARCHAR(50) NOT NULL,
|
|
ai_model_used VARCHAR(100) NOT NULL,
|
|
total_words INTEGER NOT NULL DEFAULT 0,
|
|
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Transcribed Segments Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS transcribed_segments (
|
|
id SERIAL PRIMARY KEY,
|
|
transcription_id INTEGER NOT NULL REFERENCES transcription_sessions(id) ON DELETE CASCADE,
|
|
start_time DECIMAL(8,3) NOT NULL,
|
|
end_time DECIMAL(8,3) NOT NULL,
|
|
speaker_label VARCHAR(100) NOT NULL,
|
|
text TEXT NOT NULL,
|
|
confidence DECIMAL(4,3) NOT NULL,
|
|
user_id BIGINT,
|
|
language VARCHAR(10) DEFAULT 'en',
|
|
word_count INTEGER NOT NULL DEFAULT 0,
|
|
is_quote_candidate BOOLEAN DEFAULT FALSE,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Quote Analysis Metadata Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS quote_analysis_metadata (
|
|
id SERIAL PRIMARY KEY,
|
|
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
|
|
ai_provider VARCHAR(50) NOT NULL,
|
|
ai_model VARCHAR(100) NOT NULL,
|
|
processing_time DECIMAL(6,3) NOT NULL,
|
|
reasoning TEXT,
|
|
category_tags JSONB,
|
|
speaker_context JSONB,
|
|
conversation_context JSONB,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Laughter Analyses Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS laughter_analyses (
|
|
id SERIAL PRIMARY KEY,
|
|
audio_file_path TEXT NOT NULL,
|
|
total_duration DECIMAL(8,3) NOT NULL,
|
|
total_laughter_duration DECIMAL(8,3) NOT NULL,
|
|
average_intensity DECIMAL(4,3) NOT NULL,
|
|
peak_intensity DECIMAL(4,3) NOT NULL,
|
|
laughter_density DECIMAL(5,4) NOT NULL,
|
|
processing_time DECIMAL(6,3) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Laughter Segments Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS laughter_segments (
|
|
id SERIAL PRIMARY KEY,
|
|
analysis_id INTEGER NOT NULL REFERENCES laughter_analyses(id) ON DELETE CASCADE,
|
|
start_time DECIMAL(8,3) NOT NULL,
|
|
end_time DECIMAL(8,3) NOT NULL,
|
|
duration DECIMAL(8,3) NOT NULL,
|
|
intensity DECIMAL(4,3) NOT NULL,
|
|
confidence DECIMAL(4,3) NOT NULL,
|
|
frequency_characteristics JSONB,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Rotation Queue Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS rotation_queue (
|
|
id SERIAL PRIMARY KEY,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
|
|
quote_score DECIMAL(4,2) NOT NULL,
|
|
queued_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
sent BOOLEAN DEFAULT FALSE,
|
|
sent_at TIMESTAMP WITH TIME ZONE,
|
|
UNIQUE(quote_id)
|
|
)
|
|
""",
|
|
# Daily Queue Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS daily_queue (
|
|
id SERIAL PRIMARY KEY,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
|
|
quote_score DECIMAL(4,2) NOT NULL,
|
|
queued_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
sent BOOLEAN DEFAULT FALSE,
|
|
sent_at TIMESTAMP WITH TIME ZONE,
|
|
UNIQUE(quote_id)
|
|
)
|
|
""",
|
|
# Quote Explanations Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS quote_explanations (
|
|
id SERIAL PRIMARY KEY,
|
|
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
|
|
explanation_data JSONB NOT NULL,
|
|
explanation_depth VARCHAR(20) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
UNIQUE(quote_id, explanation_depth)
|
|
)
|
|
""",
|
|
# Speaker Tagging Sessions Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS speaker_tagging_sessions (
|
|
id SERIAL PRIMARY KEY,
|
|
session_id VARCHAR(100) NOT NULL UNIQUE,
|
|
guild_id BIGINT NOT NULL,
|
|
channel_id BIGINT NOT NULL,
|
|
requestor_id BIGINT NOT NULL,
|
|
clip_id VARCHAR(100) NOT NULL,
|
|
audio_file_path TEXT NOT NULL,
|
|
unknown_speakers JSONB NOT NULL,
|
|
status VARCHAR(20) NOT NULL DEFAULT 'pending',
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
|
|
message_id BIGINT
|
|
)
|
|
""",
|
|
# Speaker Identifications Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS speaker_identifications (
|
|
id SERIAL PRIMARY KEY,
|
|
session_id VARCHAR(100) NOT NULL REFERENCES speaker_tagging_sessions(session_id) ON DELETE CASCADE,
|
|
speaker_label VARCHAR(100) NOT NULL,
|
|
identified_user_id BIGINT NOT NULL,
|
|
identifier_username VARCHAR(100) NOT NULL,
|
|
confidence DECIMAL(3,2) NOT NULL,
|
|
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
UNIQUE(session_id, speaker_label)
|
|
)
|
|
""",
|
|
# Voice Embeddings Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS voice_embeddings (
|
|
id SERIAL PRIMARY KEY,
|
|
user_id BIGINT NOT NULL,
|
|
embedding_vector BYTEA NOT NULL,
|
|
confidence DECIMAL(3,2) NOT NULL,
|
|
method VARCHAR(20) NOT NULL DEFAULT 'embeddings',
|
|
sample_duration DECIMAL(5,2) NOT NULL,
|
|
audio_quality DECIMAL(3,2) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
UNIQUE(user_id, created_at)
|
|
)
|
|
""",
|
|
# Speaker Recognition Results Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS speaker_recognition_results (
|
|
id SERIAL PRIMARY KEY,
|
|
speaker_label VARCHAR(100) NOT NULL,
|
|
identified_user_id BIGINT,
|
|
confidence DECIMAL(4,3) NOT NULL,
|
|
method VARCHAR(20) NOT NULL,
|
|
embedding_similarity DECIMAL(4,3) NOT NULL,
|
|
processing_time DECIMAL(6,3) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Memory Entries Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS memory_entries (
|
|
id VARCHAR(36) PRIMARY KEY,
|
|
user_id BIGINT NOT NULL,
|
|
guild_id BIGINT NOT NULL,
|
|
memory_type VARCHAR(30) NOT NULL,
|
|
content TEXT NOT NULL,
|
|
metadata JSONB,
|
|
importance_score DECIMAL(3,2) NOT NULL,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
last_accessed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
access_count INTEGER DEFAULT 0
|
|
)
|
|
""",
|
|
# Personality Profiles Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS personality_profiles (
|
|
user_id BIGINT PRIMARY KEY,
|
|
humor_preferences JSONB DEFAULT '{}',
|
|
communication_style JSONB DEFAULT '{}',
|
|
interaction_patterns JSONB DEFAULT '{}',
|
|
topic_interests JSONB DEFAULT '[]',
|
|
activity_periods JSONB DEFAULT '[]',
|
|
personality_keywords JSONB DEFAULT '[]',
|
|
last_updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
# Server Configuration Table
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS server_config (
|
|
guild_id BIGINT PRIMARY KEY,
|
|
quote_threshold DECIMAL(3,1) NOT NULL DEFAULT 6.0,
|
|
auto_record BOOLEAN NOT NULL DEFAULT FALSE,
|
|
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
|
|
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
""",
|
|
]
|
|
|
|
# Create indexes for performance
|
|
index_queries = [
|
|
"CREATE INDEX IF NOT EXISTS idx_quotes_guild_id ON quotes(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_quotes_user_id ON quotes(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_quotes_timestamp ON quotes(timestamp)",
|
|
"CREATE INDEX IF NOT EXISTS idx_quotes_overall_score ON quotes(overall_score)",
|
|
"CREATE INDEX IF NOT EXISTS idx_quotes_response_type ON quotes(response_type)",
|
|
"CREATE INDEX IF NOT EXISTS idx_user_consent_guild_id ON user_consent(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_profiles_user_id ON speaker_profiles(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_quote_feedback_quote_id ON quote_feedback(quote_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_audio_clips_delete_after ON audio_clips(delete_after)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_embeddings_user_id ON memory_embeddings(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_diarizations_guild_id ON speaker_diarizations(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_diarizations_channel_id ON speaker_diarizations(channel_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_diarizations_timestamp ON speaker_diarizations(timestamp)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_segments_diarization_id ON speaker_segments(diarization_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_segments_user_id ON speaker_segments(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_segments_needs_tagging ON speaker_segments(needs_tagging)",
|
|
"CREATE INDEX IF NOT EXISTS idx_transcription_sessions_clip_id ON transcription_sessions(clip_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_transcription_sessions_guild_id ON transcription_sessions(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_transcription_sessions_timestamp ON transcription_sessions(timestamp)",
|
|
"CREATE INDEX IF NOT EXISTS idx_transcribed_segments_transcription_id ON transcribed_segments(transcription_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_transcribed_segments_user_id ON transcribed_segments(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_transcribed_segments_is_quote_candidate ON transcribed_segments(is_quote_candidate)",
|
|
"CREATE INDEX IF NOT EXISTS idx_quote_analysis_metadata_quote_id ON quote_analysis_metadata(quote_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_laughter_analyses_audio_file_path ON laughter_analyses(audio_file_path)",
|
|
"CREATE INDEX IF NOT EXISTS idx_laughter_analyses_created_at ON laughter_analyses(created_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_laughter_segments_analysis_id ON laughter_segments(analysis_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_rotation_queue_guild_id ON rotation_queue(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_rotation_queue_sent ON rotation_queue(sent)",
|
|
"CREATE INDEX IF NOT EXISTS idx_rotation_queue_quote_score ON rotation_queue(quote_score)",
|
|
"CREATE INDEX IF NOT EXISTS idx_daily_queue_guild_id ON daily_queue(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_daily_queue_sent ON daily_queue(sent)",
|
|
"CREATE INDEX IF NOT EXISTS idx_daily_queue_quote_score ON daily_queue(quote_score)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_session_id ON speaker_tagging_sessions(session_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_guild_id ON speaker_tagging_sessions(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_status ON speaker_tagging_sessions(status)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_expires_at ON speaker_tagging_sessions(expires_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_identifications_session_id ON speaker_identifications(session_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_identifications_identified_user_id ON speaker_identifications(identified_user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_voice_embeddings_user_id ON voice_embeddings(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_voice_embeddings_created_at ON voice_embeddings(created_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_voice_embeddings_method ON voice_embeddings(method)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_recognition_results_identified_user_id ON speaker_recognition_results(identified_user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_recognition_results_created_at ON speaker_recognition_results(created_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_speaker_recognition_results_method ON speaker_recognition_results(method)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_entries_user_id ON memory_entries(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_entries_guild_id ON memory_entries(guild_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_entries_memory_type ON memory_entries(memory_type)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_entries_created_at ON memory_entries(created_at)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_entries_importance_score ON memory_entries(importance_score)",
|
|
"CREATE INDEX IF NOT EXISTS idx_memory_entries_last_accessed ON memory_entries(last_accessed)",
|
|
"CREATE INDEX IF NOT EXISTS idx_personality_profiles_user_id ON personality_profiles(user_id)",
|
|
"CREATE INDEX IF NOT EXISTS idx_personality_profiles_last_updated ON personality_profiles(last_updated)",
|
|
]
|
|
|
|
try:
|
|
async with self.acquire_connection() as conn:
|
|
# Create tables
|
|
for query in schema_queries:
|
|
await conn.execute(query)
|
|
|
|
# Create indexes
|
|
for query in index_queries:
|
|
await conn.execute(query)
|
|
|
|
logger.info("Database schema initialized successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize schema: {e}")
|
|
raise
|
|
|
|
async def _run_migrations(self):
|
|
"""Run any pending database migrations"""
|
|
# Create migrations table if it doesn't exist
|
|
await self.execute_query(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
version VARCHAR(50) PRIMARY KEY,
|
|
executed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
|
|
)
|
|
"""
|
|
)
|
|
|
|
# Add any migration logic here
|
|
logger.info("Database migrations completed")
|
|
|
|
async def check_health(self) -> Dict[str, Any]:
|
|
"""Check database health and return status"""
|
|
try:
|
|
start_time = asyncio.get_event_loop().time()
|
|
|
|
# Simple connectivity test
|
|
await self.execute_query("SELECT 1", fetch_one=True)
|
|
|
|
end_time = asyncio.get_event_loop().time()
|
|
latency = (end_time - start_time) * 1000 # Convert to milliseconds
|
|
|
|
# Check pool status
|
|
pool_info = {}
|
|
if self.pool:
|
|
pool_info = {
|
|
"size": self.pool.get_size(),
|
|
"min_size": self.pool.get_min_size(),
|
|
"max_size": self.pool.get_max_size(),
|
|
"idle_connections": self.pool.get_idle_size(),
|
|
}
|
|
|
|
return {"healthy": True, "latency_ms": latency, "pool_info": pool_info}
|
|
|
|
except Exception as e:
|
|
return {"healthy": False, "error": str(e)}
|
|
|
|
# User Consent Management Methods
|
|
|
|
async def grant_consent(
|
|
self, user_id: int, guild_id: int, first_name: Optional[str] = None
|
|
) -> bool:
|
|
"""Grant recording consent for a user in a guild"""
|
|
try:
|
|
await self.execute_query(
|
|
"""
|
|
INSERT INTO user_consent (user_id, guild_id, consent_given, consent_timestamp, first_name)
|
|
VALUES ($1, $2, TRUE, NOW(), $3)
|
|
ON CONFLICT (user_id)
|
|
DO UPDATE SET
|
|
consent_given = TRUE,
|
|
consent_timestamp = NOW(),
|
|
first_name = COALESCE(EXCLUDED.first_name, user_consent.first_name),
|
|
updated_at = NOW()
|
|
""",
|
|
user_id,
|
|
guild_id,
|
|
first_name,
|
|
)
|
|
|
|
logger.info(f"Consent granted for user {user_id} in guild {guild_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to grant consent: {e}")
|
|
return False
|
|
|
|
async def revoke_consent(self, user_id: int, guild_id: int) -> bool:
|
|
"""Revoke recording consent for a user in a guild"""
|
|
try:
|
|
await self.execute_query(
|
|
"""
|
|
UPDATE user_consent
|
|
SET consent_given = FALSE, updated_at = NOW()
|
|
WHERE user_id = $1 AND guild_id = $2
|
|
""",
|
|
user_id,
|
|
guild_id,
|
|
)
|
|
|
|
logger.info(f"Consent revoked for user {user_id} in guild {guild_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to revoke consent: {e}")
|
|
return False
|
|
|
|
async def check_consent(self, user_id: int, guild_id: int) -> bool:
|
|
"""Check if user has given consent for recording in guild"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"""
|
|
SELECT consent_given, global_opt_out
|
|
FROM user_consent
|
|
WHERE user_id = $1 AND guild_id = $2
|
|
""",
|
|
user_id,
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
|
|
if result:
|
|
return result["consent_given"] and not result["global_opt_out"]
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to check consent: {e}")
|
|
return False
|
|
|
|
async def set_global_opt_out(self, user_id: int, opt_out: bool) -> bool:
|
|
"""Set global opt-out status for user across all guilds"""
|
|
try:
|
|
await self.execute_query(
|
|
"""
|
|
UPDATE user_consent
|
|
SET global_opt_out = $2, updated_at = NOW()
|
|
WHERE user_id = $1
|
|
""",
|
|
user_id,
|
|
opt_out,
|
|
)
|
|
|
|
logger.info(f"Global opt-out set to {opt_out} for user {user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to set global opt-out: {e}")
|
|
return False
|
|
|
|
async def get_consented_users(self, guild_id: int) -> List[int]:
|
|
"""Get list of users who have consented to recording in guild"""
|
|
try:
|
|
results = await self.execute_query(
|
|
"""
|
|
SELECT user_id
|
|
FROM user_consent
|
|
WHERE guild_id = $1 AND consent_given = TRUE AND global_opt_out = FALSE
|
|
""",
|
|
guild_id,
|
|
fetch_all=True,
|
|
)
|
|
|
|
return [row["user_id"] for row in results]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get consented users: {e}")
|
|
return []
|
|
|
|
# Quote Management Methods
|
|
|
|
async def save_quote(self, quote_data: QuoteData) -> Optional[int]:
|
|
"""Save a quote to the database and return the quote ID"""
|
|
try:
|
|
async with self.acquire_connection() as conn:
|
|
async with conn.transaction():
|
|
# Insert quote
|
|
result = await conn.fetchrow(
|
|
"""
|
|
INSERT INTO quotes (
|
|
user_id, speaker_label, username, quote, timestamp, guild_id, channel_id,
|
|
funny_score, dark_score, silly_score, suspicious_score, asinine_score,
|
|
overall_score, laughter_duration, laughter_intensity, response_type,
|
|
audio_clip_path, speaker_confidence
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18
|
|
) RETURNING id
|
|
""",
|
|
quote_data.user_id,
|
|
quote_data.speaker_label,
|
|
quote_data.username,
|
|
quote_data.quote,
|
|
quote_data.timestamp or datetime.now(timezone.utc),
|
|
quote_data.guild_id,
|
|
quote_data.channel_id,
|
|
quote_data.funny_score,
|
|
quote_data.dark_score,
|
|
quote_data.silly_score,
|
|
quote_data.suspicious_score,
|
|
quote_data.asinine_score,
|
|
quote_data.overall_score,
|
|
quote_data.laughter_duration,
|
|
quote_data.laughter_intensity,
|
|
quote_data.response_type,
|
|
quote_data.audio_clip_path,
|
|
quote_data.speaker_confidence,
|
|
)
|
|
|
|
quote_id = result["id"]
|
|
logger.info(f"Quote saved with ID: {quote_id}")
|
|
|
|
# Update speaker profile stats if user is known (within transaction)
|
|
if quote_data.user_id:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO speaker_profiles (user_id, quote_count, avg_humor_score, last_seen)
|
|
VALUES ($1, 1, $2, NOW())
|
|
ON CONFLICT (user_id) DO UPDATE SET
|
|
quote_count = speaker_profiles.quote_count + 1,
|
|
avg_humor_score = (
|
|
(speaker_profiles.avg_humor_score * speaker_profiles.quote_count + $2) /
|
|
(speaker_profiles.quote_count + 1)
|
|
),
|
|
last_seen = NOW(),
|
|
updated_at = NOW()
|
|
""",
|
|
quote_data.user_id,
|
|
quote_data.funny_score,
|
|
)
|
|
|
|
return quote_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to save quote: {e}")
|
|
return None
|
|
|
|
async def get_quotes_by_score(
|
|
self, guild_id: int, min_score: float, limit: int = 50
|
|
) -> List[Dict]:
|
|
"""Get quotes above a minimum score threshold"""
|
|
try:
|
|
results = await self.execute_query(
|
|
"""
|
|
SELECT * FROM quotes
|
|
WHERE guild_id = $1 AND overall_score >= $2
|
|
ORDER BY overall_score DESC, timestamp DESC
|
|
LIMIT $3
|
|
""",
|
|
guild_id,
|
|
min_score,
|
|
limit,
|
|
fetch_all=True,
|
|
)
|
|
|
|
quotes = [dict(row) for row in results] if results else []
|
|
query_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
|
|
db_logger.info("Retrieved quotes by score", {
|
|
'guild_id': guild_id,
|
|
'min_score': min_score,
|
|
'limit': limit,
|
|
'result_count': len(quotes),
|
|
'duration_ms': query_time
|
|
})
|
|
|
|
return quotes
|
|
|
|
except Exception as e:
|
|
query_time = round((time.perf_counter() - start_time) * 1000, 2)
|
|
db_logger.error("Failed to get quotes by score", {
|
|
'guild_id': guild_id,
|
|
'min_score': min_score,
|
|
'limit': limit,
|
|
'duration_ms': query_time,
|
|
'error_type': type(e).__name__,
|
|
'error_message': str(e)[:200]
|
|
}, exc_info=True)
|
|
return []
|
|
|
|
async def get_user_quotes(
|
|
self, user_id: int, guild_id: int, limit: int = 50
|
|
) -> List[Dict]:
|
|
"""Get quotes from a specific user"""
|
|
try:
|
|
results = await self.execute_query(
|
|
"""
|
|
SELECT * FROM quotes
|
|
WHERE user_id = $1 AND guild_id = $2
|
|
ORDER BY timestamp DESC
|
|
LIMIT $3
|
|
""",
|
|
user_id,
|
|
guild_id,
|
|
limit,
|
|
fetch_all=True,
|
|
)
|
|
|
|
return [dict(row) for row in results]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get user quotes: {e}")
|
|
return []
|
|
|
|
async def delete_user_quotes(
|
|
self, user_id: int, guild_id: Optional[int] = None
|
|
) -> int:
|
|
"""Delete all quotes from a user, optionally in a specific guild"""
|
|
try:
|
|
if guild_id:
|
|
result = await self.execute_query(
|
|
"""
|
|
DELETE FROM quotes WHERE user_id = $1 AND guild_id = $2
|
|
""",
|
|
user_id,
|
|
guild_id,
|
|
)
|
|
else:
|
|
result = await self.execute_query(
|
|
"""
|
|
DELETE FROM quotes WHERE user_id = $1
|
|
""",
|
|
user_id,
|
|
)
|
|
|
|
# Extract number of deleted rows from result string
|
|
deleted_count = int(result.split()[-1]) if result else 0
|
|
logger.info(f"Deleted {deleted_count} quotes for user {user_id}")
|
|
return deleted_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to delete user quotes: {e}")
|
|
return 0
|
|
|
|
async def update_quote_speaker(
|
|
self, quote_id: int, user_id: int, tagger_id: int
|
|
) -> bool:
|
|
"""Update quote with identified speaker information"""
|
|
try:
|
|
await self.execute_query(
|
|
"""
|
|
UPDATE quotes
|
|
SET user_id = $2, speaker_confidence = 1.0
|
|
WHERE id = $1
|
|
""",
|
|
quote_id,
|
|
user_id,
|
|
)
|
|
|
|
# Record the tagging feedback
|
|
await self.execute_query(
|
|
"""
|
|
INSERT INTO quote_feedback (quote_id, user_id, feedback_type, feedback_value)
|
|
VALUES ($1, $2, 'tag_speaker', $3)
|
|
""",
|
|
quote_id,
|
|
tagger_id,
|
|
json.dumps({"tagged_user_id": user_id}),
|
|
)
|
|
|
|
logger.info(f"Quote {quote_id} tagged as user {user_id} by {tagger_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update quote speaker: {e}")
|
|
return False
|
|
|
|
# Speaker Profile Management Methods
|
|
|
|
async def store_speaker_profile(
|
|
self, user_id: int, voice_embedding: bytes, metadata: Dict
|
|
) -> bool:
|
|
"""Store speaker voice embedding and metadata"""
|
|
try:
|
|
await self.execute_query(
|
|
"""
|
|
INSERT INTO speaker_profiles (
|
|
user_id, voice_embedding, enrollment_status, enrollment_phrase,
|
|
training_samples
|
|
) VALUES ($1, $2, $3, $4, 1)
|
|
ON CONFLICT (user_id) DO UPDATE SET
|
|
voice_embedding = EXCLUDED.voice_embedding,
|
|
enrollment_status = EXCLUDED.enrollment_status,
|
|
enrollment_phrase = EXCLUDED.enrollment_phrase,
|
|
training_samples = speaker_profiles.training_samples + 1,
|
|
updated_at = NOW()
|
|
""",
|
|
user_id,
|
|
voice_embedding,
|
|
metadata.get("status", "enrolled"),
|
|
metadata.get("phrase"),
|
|
)
|
|
|
|
logger.info(f"Speaker profile stored for user {user_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store speaker profile: {e}")
|
|
return False
|
|
|
|
async def get_speaker_profile(self, user_id: int) -> Optional[Dict]:
|
|
"""Get speaker profile for a user"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"""
|
|
SELECT * FROM speaker_profiles WHERE user_id = $1
|
|
""",
|
|
user_id,
|
|
fetch_one=True,
|
|
)
|
|
|
|
return dict(result) if result else None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get speaker profile: {e}")
|
|
return None
|
|
|
|
# Audio Clip Management Methods
|
|
|
|
async def register_audio_clip(
|
|
self,
|
|
guild_id: int,
|
|
channel_id: int,
|
|
file_path: str,
|
|
duration: float,
|
|
delete_after_hours: int = 24,
|
|
) -> int:
|
|
"""Register an audio clip for tracking and cleanup"""
|
|
try:
|
|
delete_after = datetime.now(timezone.utc) + timedelta(
|
|
hours=delete_after_hours
|
|
)
|
|
|
|
result = await self.execute_query(
|
|
"""
|
|
INSERT INTO audio_clips (guild_id, channel_id, file_path, duration, delete_after)
|
|
VALUES ($1, $2, $3, $4, $5)
|
|
RETURNING id
|
|
""",
|
|
guild_id,
|
|
channel_id,
|
|
file_path,
|
|
duration,
|
|
delete_after,
|
|
fetch_one=True,
|
|
)
|
|
|
|
return result["id"]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to register audio clip: {e}")
|
|
return 0
|
|
|
|
async def get_expired_audio_clips(self) -> List[Dict]:
|
|
"""Get audio clips that should be deleted"""
|
|
try:
|
|
results = await self.execute_query(
|
|
"""
|
|
SELECT * FROM audio_clips
|
|
WHERE delete_after <= NOW() AND processed = TRUE
|
|
""",
|
|
fetch_all=True,
|
|
)
|
|
|
|
return [dict(row) for row in results]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get expired audio clips: {e}")
|
|
return []
|
|
|
|
async def mark_audio_clip_processed(self, clip_id: int) -> bool:
|
|
"""Mark an audio clip as processed"""
|
|
try:
|
|
await self.execute_query(
|
|
"""
|
|
UPDATE audio_clips SET processed = TRUE WHERE id = $1
|
|
""",
|
|
clip_id,
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to mark audio clip as processed: {e}")
|
|
return False
|
|
|
|
async def cleanup_expired_clips(self) -> int:
|
|
"""Remove expired audio clip records from database"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"""
|
|
DELETE FROM audio_clips WHERE delete_after <= NOW()
|
|
"""
|
|
)
|
|
|
|
deleted_count = int(result.split()[-1]) if result else 0
|
|
logger.info(f"Cleaned up {deleted_count} expired audio clip records")
|
|
return deleted_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to cleanup expired clips: {e}")
|
|
return 0
|
|
|
|
async def search_quotes(
|
|
self,
|
|
guild_id: int,
|
|
search_term: Optional[str] = None,
|
|
user_id: Optional[int] = None,
|
|
limit: int = 5,
|
|
) -> List[Dict]:
|
|
"""Search quotes with optional filters"""
|
|
try:
|
|
query = """
|
|
SELECT q.*, u.username as speaker_name
|
|
FROM quotes q
|
|
LEFT JOIN user_consent u ON q.user_id = u.user_id AND q.guild_id = u.guild_id
|
|
WHERE q.guild_id = $1
|
|
"""
|
|
params: List[Any] = [guild_id]
|
|
param_count = 2
|
|
|
|
if search_term:
|
|
query += f" AND (q.quote ILIKE ${param_count} OR u.username ILIKE ${param_count})"
|
|
params.append(f"%{search_term}%")
|
|
param_count += 1
|
|
|
|
if user_id:
|
|
query += f" AND q.user_id = ${param_count}"
|
|
params.append(user_id)
|
|
param_count += 1
|
|
|
|
query += " ORDER BY q.overall_score DESC, q.timestamp DESC"
|
|
query += f" LIMIT ${param_count}"
|
|
params.append(limit)
|
|
|
|
result = await self.execute_query(query, *params, fetch_all=True)
|
|
return [dict(row) for row in result] if result else []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to search quotes: {e}")
|
|
return []
|
|
|
|
async def get_quote_stats(self, guild_id: int) -> Dict[str, Any]:
|
|
"""Get quote statistics for a guild"""
|
|
try:
|
|
stats = {}
|
|
|
|
# Total quotes
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(*) FROM quotes WHERE guild_id = $1",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
stats["total_quotes"] = result[0] if result else 0
|
|
|
|
# Unique speakers
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(DISTINCT user_id) FROM quotes WHERE guild_id = $1",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
stats["unique_speakers"] = result[0] if result else 0
|
|
|
|
# Average score
|
|
result = await self.execute_query(
|
|
"SELECT AVG(overall_score) FROM quotes WHERE guild_id = $1",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
stats["avg_score"] = float(result[0]) if result and result[0] else 0.0
|
|
|
|
# Max score
|
|
result = await self.execute_query(
|
|
"SELECT MAX(overall_score) FROM quotes WHERE guild_id = $1",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
stats["max_score"] = float(result[0]) if result and result[0] else 0.0
|
|
|
|
# This week
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(*) FROM quotes WHERE guild_id = $1 AND timestamp >= NOW() - INTERVAL '7 days'",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
stats["quotes_this_week"] = result[0] if result else 0
|
|
|
|
# This month
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(*) FROM quotes WHERE guild_id = $1 AND timestamp >= NOW() - INTERVAL '30 days'",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
stats["quotes_this_month"] = result[0] if result else 0
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get quote stats: {e}")
|
|
return {}
|
|
|
|
async def get_top_quotes(self, guild_id: int, limit: int = 5) -> List[Dict]:
|
|
"""Get top-rated quotes for a guild"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"""
|
|
SELECT q.*, u.username as speaker_name
|
|
FROM quotes q
|
|
LEFT JOIN user_consent u ON q.user_id = u.user_id AND q.guild_id = u.guild_id
|
|
WHERE q.guild_id = $1
|
|
ORDER BY q.overall_score DESC, q.timestamp DESC
|
|
LIMIT $2
|
|
""",
|
|
guild_id,
|
|
limit,
|
|
fetch_all=True,
|
|
)
|
|
|
|
return [dict(row) for row in result] if result else []
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get top quotes: {e}")
|
|
return []
|
|
|
|
async def get_random_quote(self, guild_id: int) -> Optional[Dict]:
|
|
"""Get a random quote from a guild"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"""
|
|
SELECT q.*, u.username as speaker_name
|
|
FROM quotes q
|
|
LEFT JOIN user_consent u ON q.user_id = u.user_id AND q.guild_id = u.guild_id
|
|
WHERE q.guild_id = $1
|
|
ORDER BY RANDOM()
|
|
LIMIT 1
|
|
""",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
|
|
return dict(result) if result else None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get random quote: {e}")
|
|
return None
|
|
|
|
async def get_admin_stats(self) -> Dict[str, Any]:
|
|
"""Get comprehensive admin statistics"""
|
|
try:
|
|
stats = {}
|
|
|
|
# Total quotes across all guilds
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(*) FROM quotes", fetch_one=True
|
|
)
|
|
stats["total_quotes"] = result[0] if result else 0
|
|
|
|
# Unique speakers across all guilds
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(DISTINCT user_id) FROM quotes", fetch_one=True
|
|
)
|
|
stats["unique_speakers"] = result[0] if result else 0
|
|
|
|
# Active consents
|
|
result = await self.execute_query(
|
|
"SELECT COUNT(*) FROM user_consent WHERE consent_given = TRUE AND global_opt_out = FALSE",
|
|
fetch_one=True,
|
|
)
|
|
stats["active_consents"] = result[0] if result else 0
|
|
|
|
return stats
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get admin stats: {e}")
|
|
return {}
|
|
|
|
async def get_server_config(self, guild_id: int) -> Dict[str, Any]:
|
|
"""Get server configuration"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"SELECT * FROM server_config WHERE guild_id = $1",
|
|
guild_id,
|
|
fetch_one=True,
|
|
)
|
|
|
|
if result:
|
|
return dict(result)
|
|
else:
|
|
# Return defaults
|
|
return {"quote_threshold": 6.0, "auto_record": False}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get server config: {e}")
|
|
return {"quote_threshold": 6.0, "auto_record": False}
|
|
|
|
async def update_server_config(self, guild_id: int, config: Dict[str, Any]) -> bool:
|
|
"""Update server configuration"""
|
|
try:
|
|
# Insert or update configuration
|
|
query = """
|
|
INSERT INTO server_config (guild_id, quote_threshold, auto_record)
|
|
VALUES ($1, $2, $3)
|
|
ON CONFLICT (guild_id)
|
|
DO UPDATE SET
|
|
quote_threshold = COALESCE($2, server_config.quote_threshold),
|
|
auto_record = COALESCE($3, server_config.auto_record)
|
|
"""
|
|
|
|
await self.execute_query(
|
|
query,
|
|
guild_id,
|
|
config.get("quote_threshold"),
|
|
config.get("auto_record"),
|
|
)
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update server config: {e}")
|
|
return False
|
|
|
|
async def purge_user_quotes(self, guild_id: int, user_id: int) -> int:
|
|
"""Purge all quotes from a specific user in a guild"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"DELETE FROM quotes WHERE guild_id = $1 AND user_id = $2",
|
|
guild_id,
|
|
user_id,
|
|
)
|
|
|
|
# Extract count from result string
|
|
deleted_count = int(result.split()[-1]) if result else 0
|
|
return deleted_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to purge user quotes: {e}")
|
|
return 0
|
|
|
|
async def purge_old_quotes(self, guild_id: int, days: int) -> int:
|
|
"""Purge quotes older than specified days"""
|
|
try:
|
|
result = await self.execute_query(
|
|
"DELETE FROM quotes WHERE guild_id = $1 AND timestamp < NOW() - $2 * INTERVAL '1 day'",
|
|
guild_id,
|
|
days,
|
|
)
|
|
|
|
# Extract count from result string
|
|
deleted_count = int(result.split()[-1]) if result else 0
|
|
return deleted_count
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to purge old quotes: {e}")
|
|
return 0
|