Files
disbord/core/database.py
Travis Vasceannie 8f6f958426 chore: remove deprecated files and scripts
- Deleted final.md, fix_async_fixtures.py, fix_cog_tests.py, fix_fixture_scoping.py, and run_race_condition_tests.sh as they are no longer needed.
- Updated main.py to enhance logging and error handling.
- Refactored various components for improved performance and maintainability.
- Introduced new logging utilities for better context and performance monitoring.
- Ensured all datetime operations are timezone-aware and consistent across the codebase.
2025-08-28 00:54:28 -04:00

1328 lines
52 KiB
Python

"""
Database Manager for Discord Voice Chat Quote Bot
Handles PostgreSQL connections, schema management, and all database operations
including user consent, quotes, speaker profiles, and feedback tracking.
"""
import asyncio
import json
import logging
from contextlib import asynccontextmanager
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Optional
import asyncpg
logger = logging.getLogger(__name__)
@dataclass
class QuoteData:
"""Data structure for quote information"""
id: Optional[int] = None
user_id: Optional[int] = None
speaker_label: str = ""
username: Optional[str] = None
quote: str = ""
timestamp: Optional[datetime] = None
guild_id: int = 0
channel_id: int = 0
funny_score: float = 0.0
dark_score: float = 0.0
silly_score: float = 0.0
suspicious_score: float = 0.0
asinine_score: float = 0.0
overall_score: float = 0.0
laughter_duration: float = 0.0
laughter_intensity: float = 0.0
response_type: str = "none"
audio_clip_path: Optional[str] = None
speaker_confidence: float = 0.0
user_feedback: Optional[int] = None
created_at: Optional[datetime] = None
@dataclass
class UserConsent:
"""Data structure for user consent information"""
user_id: int
guild_id: int
consent_given: bool = False
consent_timestamp: Optional[datetime] = None
global_opt_out: bool = False
first_name: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
@dataclass
class SpeakerProfile:
"""Data structure for speaker profile information"""
id: Optional[int] = None
user_id: int = 0
voice_embedding: Optional[bytes] = None
enrollment_status: str = "none"
enrollment_phrase: Optional[str] = None
personality_summary: Optional[str] = None
quote_count: int = 0
avg_humor_score: float = 0.0
last_seen: Optional[datetime] = None
training_samples: int = 0
recognition_accuracy: float = 0.0
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class DatabaseManager:
"""
PostgreSQL database manager for the Discord Quote Bot
Manages connections, schema, and all database operations with proper
connection pooling, error handling, and transaction management.
"""
def __init__(
self, database_url: str, pool_min_size: int = 5, pool_max_size: int = 20
):
self.database_url = database_url
self.pool_min_size = pool_min_size
self.pool_max_size = pool_max_size
self.pool: Optional[asyncpg.Pool] = None
self._initialized = False
async def initialize(self):
"""Initialize database connection pool and schema"""
if self._initialized:
return
try:
logger.info("Initializing database connection pool...")
# Create connection pool
self.pool = await asyncpg.create_pool(
self.database_url,
min_size=self.pool_min_size,
max_size=self.pool_max_size,
command_timeout=60,
server_settings={"jit": "off"}, # Disable JIT for better compatibility
)
# Initialize schema
await self._initialize_schema()
# Run any pending migrations
await self._run_migrations()
self._initialized = True
logger.info("Database initialization completed successfully")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
async def close(self):
"""Close database connection pool"""
if self.pool:
await self.pool.close()
logger.info("Database connection pool closed")
@asynccontextmanager
async def acquire_connection(self):
"""Context manager for acquiring database connections"""
if not self.pool:
raise RuntimeError("Database not initialized")
async with self.pool.acquire() as connection:
yield connection
async def execute_query(
self, query: str, *args, fetch_one: bool = False, fetch_all: bool = False
):
"""Execute a database query with proper error handling"""
try:
async with self.acquire_connection() as conn:
if fetch_one:
return await conn.fetchrow(query, *args)
elif fetch_all:
result = await conn.fetch(query, *args)
query_time = round((time.perf_counter() - start_time) * 1000, 2)
db_logger.debug("Query executed successfully", {
'query_type': query_type,
'duration_ms': query_time,
'result_type': 'multiple_rows',
'row_count': len(result)
})
return result
else:
result = await conn.execute(query, *args)
query_time = round((time.perf_counter() - start_time) * 1000, 2)
db_logger.debug("Query executed successfully", {
'query_type': query_type,
'duration_ms': query_time,
'result_type': 'execute',
'status': result
})
return result
except Exception as e:
query_time = round((time.perf_counter() - start_time) * 1000, 2)
db_logger.error("Database query failed", {
'query_type': query_type,
'duration_ms': query_time,
'arg_count': len(args),
'error_type': type(e).__name__,
'error_message': str(e)[:200],
'query_preview': query[:100].replace('\n', ' ') if len(query) > 100 else query.replace('\n', ' ')
}, exc_info=True)
raise
async def _initialize_schema(self):
"""Initialize database schema with all required tables"""
schema_queries = [
# User Consent Table
"""
CREATE TABLE IF NOT EXISTS user_consent (
user_id BIGINT PRIMARY KEY,
guild_id BIGINT NOT NULL,
consent_given BOOLEAN NOT NULL DEFAULT FALSE,
consent_timestamp TIMESTAMP WITH TIME ZONE,
global_opt_out BOOLEAN NOT NULL DEFAULT FALSE,
first_name VARCHAR(100),
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Quotes Table
"""
CREATE TABLE IF NOT EXISTS quotes (
id SERIAL PRIMARY KEY,
user_id BIGINT,
speaker_label VARCHAR(20) NOT NULL,
username VARCHAR(100),
quote TEXT NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
funny_score DECIMAL(3,1) DEFAULT 0.0,
dark_score DECIMAL(3,1) DEFAULT 0.0,
silly_score DECIMAL(3,1) DEFAULT 0.0,
suspicious_score DECIMAL(3,1) DEFAULT 0.0,
asinine_score DECIMAL(3,1) DEFAULT 0.0,
overall_score DECIMAL(4,2) NOT NULL,
laughter_duration DECIMAL(5,2) DEFAULT 0.0,
laughter_intensity DECIMAL(3,2) DEFAULT 0.0,
response_type VARCHAR(20) NOT NULL,
audio_clip_path VARCHAR(500),
speaker_confidence DECIMAL(3,2) DEFAULT 0.0,
user_feedback INTEGER,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Speaker Profiles Table
"""
CREATE TABLE IF NOT EXISTS speaker_profiles (
id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL UNIQUE,
voice_embedding BYTEA,
enrollment_status VARCHAR(20) DEFAULT 'none',
enrollment_phrase VARCHAR(200),
personality_summary TEXT,
quote_count INTEGER DEFAULT 0,
avg_humor_score DECIMAL(3,1) DEFAULT 0.0,
last_seen TIMESTAMP WITH TIME ZONE,
training_samples INTEGER DEFAULT 0,
recognition_accuracy DECIMAL(3,2) DEFAULT 0.0,
recognition_threshold DECIMAL(3,2) DEFAULT 0.75,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Quote Feedback Table
"""
CREATE TABLE IF NOT EXISTS quote_feedback (
id SERIAL PRIMARY KEY,
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
user_id BIGINT NOT NULL,
feedback_type VARCHAR(20) NOT NULL,
feedback_value TEXT NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Audio Clips Table (for tracking temporary files)
"""
CREATE TABLE IF NOT EXISTS audio_clips (
id SERIAL PRIMARY KEY,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
file_path VARCHAR(500) NOT NULL,
duration DECIMAL(5,2) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
processed BOOLEAN DEFAULT FALSE,
delete_after TIMESTAMP WITH TIME ZONE NOT NULL
)
""",
# Memory Embeddings Table (for long-term memory)
"""
CREATE TABLE IF NOT EXISTS memory_embeddings (
id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
content TEXT NOT NULL,
context_type VARCHAR(50) NOT NULL,
embedding_vector BYTEA,
metadata JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Speaker Diarization Results Table
"""
CREATE TABLE IF NOT EXISTS speaker_diarizations (
id SERIAL PRIMARY KEY,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
audio_file_path TEXT NOT NULL,
total_duration DECIMAL(8,3) NOT NULL,
unique_speakers INTEGER NOT NULL,
processing_time DECIMAL(6,3) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Individual Speaker Segments Table
"""
CREATE TABLE IF NOT EXISTS speaker_segments (
id SERIAL PRIMARY KEY,
diarization_id INTEGER NOT NULL REFERENCES speaker_diarizations(id) ON DELETE CASCADE,
start_time DECIMAL(8,3) NOT NULL,
end_time DECIMAL(8,3) NOT NULL,
speaker_label VARCHAR(100) NOT NULL,
confidence DECIMAL(4,3) NOT NULL,
user_id BIGINT,
needs_tagging BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Transcription Sessions Table
"""
CREATE TABLE IF NOT EXISTS transcription_sessions (
id SERIAL PRIMARY KEY,
clip_id VARCHAR(100) NOT NULL UNIQUE,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
audio_file_path TEXT NOT NULL,
total_duration DECIMAL(8,3) NOT NULL,
processing_time DECIMAL(6,3) NOT NULL,
ai_provider_used VARCHAR(50) NOT NULL,
ai_model_used VARCHAR(100) NOT NULL,
total_words INTEGER NOT NULL DEFAULT 0,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Transcribed Segments Table
"""
CREATE TABLE IF NOT EXISTS transcribed_segments (
id SERIAL PRIMARY KEY,
transcription_id INTEGER NOT NULL REFERENCES transcription_sessions(id) ON DELETE CASCADE,
start_time DECIMAL(8,3) NOT NULL,
end_time DECIMAL(8,3) NOT NULL,
speaker_label VARCHAR(100) NOT NULL,
text TEXT NOT NULL,
confidence DECIMAL(4,3) NOT NULL,
user_id BIGINT,
language VARCHAR(10) DEFAULT 'en',
word_count INTEGER NOT NULL DEFAULT 0,
is_quote_candidate BOOLEAN DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Quote Analysis Metadata Table
"""
CREATE TABLE IF NOT EXISTS quote_analysis_metadata (
id SERIAL PRIMARY KEY,
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
ai_provider VARCHAR(50) NOT NULL,
ai_model VARCHAR(100) NOT NULL,
processing_time DECIMAL(6,3) NOT NULL,
reasoning TEXT,
category_tags JSONB,
speaker_context JSONB,
conversation_context JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Laughter Analyses Table
"""
CREATE TABLE IF NOT EXISTS laughter_analyses (
id SERIAL PRIMARY KEY,
audio_file_path TEXT NOT NULL,
total_duration DECIMAL(8,3) NOT NULL,
total_laughter_duration DECIMAL(8,3) NOT NULL,
average_intensity DECIMAL(4,3) NOT NULL,
peak_intensity DECIMAL(4,3) NOT NULL,
laughter_density DECIMAL(5,4) NOT NULL,
processing_time DECIMAL(6,3) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Laughter Segments Table
"""
CREATE TABLE IF NOT EXISTS laughter_segments (
id SERIAL PRIMARY KEY,
analysis_id INTEGER NOT NULL REFERENCES laughter_analyses(id) ON DELETE CASCADE,
start_time DECIMAL(8,3) NOT NULL,
end_time DECIMAL(8,3) NOT NULL,
duration DECIMAL(8,3) NOT NULL,
intensity DECIMAL(4,3) NOT NULL,
confidence DECIMAL(4,3) NOT NULL,
frequency_characteristics JSONB,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Rotation Queue Table
"""
CREATE TABLE IF NOT EXISTS rotation_queue (
id SERIAL PRIMARY KEY,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
quote_score DECIMAL(4,2) NOT NULL,
queued_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
sent BOOLEAN DEFAULT FALSE,
sent_at TIMESTAMP WITH TIME ZONE,
UNIQUE(quote_id)
)
""",
# Daily Queue Table
"""
CREATE TABLE IF NOT EXISTS daily_queue (
id SERIAL PRIMARY KEY,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
quote_score DECIMAL(4,2) NOT NULL,
queued_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
sent BOOLEAN DEFAULT FALSE,
sent_at TIMESTAMP WITH TIME ZONE,
UNIQUE(quote_id)
)
""",
# Quote Explanations Table
"""
CREATE TABLE IF NOT EXISTS quote_explanations (
id SERIAL PRIMARY KEY,
quote_id INTEGER NOT NULL REFERENCES quotes(id) ON DELETE CASCADE,
explanation_data JSONB NOT NULL,
explanation_depth VARCHAR(20) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(quote_id, explanation_depth)
)
""",
# Speaker Tagging Sessions Table
"""
CREATE TABLE IF NOT EXISTS speaker_tagging_sessions (
id SERIAL PRIMARY KEY,
session_id VARCHAR(100) NOT NULL UNIQUE,
guild_id BIGINT NOT NULL,
channel_id BIGINT NOT NULL,
requestor_id BIGINT NOT NULL,
clip_id VARCHAR(100) NOT NULL,
audio_file_path TEXT NOT NULL,
unknown_speakers JSONB NOT NULL,
status VARCHAR(20) NOT NULL DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE NOT NULL,
message_id BIGINT
)
""",
# Speaker Identifications Table
"""
CREATE TABLE IF NOT EXISTS speaker_identifications (
id SERIAL PRIMARY KEY,
session_id VARCHAR(100) NOT NULL REFERENCES speaker_tagging_sessions(session_id) ON DELETE CASCADE,
speaker_label VARCHAR(100) NOT NULL,
identified_user_id BIGINT NOT NULL,
identifier_username VARCHAR(100) NOT NULL,
confidence DECIMAL(3,2) NOT NULL,
timestamp TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(session_id, speaker_label)
)
""",
# Voice Embeddings Table
"""
CREATE TABLE IF NOT EXISTS voice_embeddings (
id SERIAL PRIMARY KEY,
user_id BIGINT NOT NULL,
embedding_vector BYTEA NOT NULL,
confidence DECIMAL(3,2) NOT NULL,
method VARCHAR(20) NOT NULL DEFAULT 'embeddings',
sample_duration DECIMAL(5,2) NOT NULL,
audio_quality DECIMAL(3,2) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
UNIQUE(user_id, created_at)
)
""",
# Speaker Recognition Results Table
"""
CREATE TABLE IF NOT EXISTS speaker_recognition_results (
id SERIAL PRIMARY KEY,
speaker_label VARCHAR(100) NOT NULL,
identified_user_id BIGINT,
confidence DECIMAL(4,3) NOT NULL,
method VARCHAR(20) NOT NULL,
embedding_similarity DECIMAL(4,3) NOT NULL,
processing_time DECIMAL(6,3) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Memory Entries Table
"""
CREATE TABLE IF NOT EXISTS memory_entries (
id VARCHAR(36) PRIMARY KEY,
user_id BIGINT NOT NULL,
guild_id BIGINT NOT NULL,
memory_type VARCHAR(30) NOT NULL,
content TEXT NOT NULL,
metadata JSONB,
importance_score DECIMAL(3,2) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
last_accessed TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
access_count INTEGER DEFAULT 0
)
""",
# Personality Profiles Table
"""
CREATE TABLE IF NOT EXISTS personality_profiles (
user_id BIGINT PRIMARY KEY,
humor_preferences JSONB DEFAULT '{}',
communication_style JSONB DEFAULT '{}',
interaction_patterns JSONB DEFAULT '{}',
topic_interests JSONB DEFAULT '[]',
activity_periods JSONB DEFAULT '[]',
personality_keywords JSONB DEFAULT '[]',
last_updated TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
# Server Configuration Table
"""
CREATE TABLE IF NOT EXISTS server_config (
guild_id BIGINT PRIMARY KEY,
quote_threshold DECIMAL(3,1) NOT NULL DEFAULT 6.0,
auto_record BOOLEAN NOT NULL DEFAULT FALSE,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
""",
]
# Create indexes for performance
index_queries = [
"CREATE INDEX IF NOT EXISTS idx_quotes_guild_id ON quotes(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_quotes_user_id ON quotes(user_id)",
"CREATE INDEX IF NOT EXISTS idx_quotes_timestamp ON quotes(timestamp)",
"CREATE INDEX IF NOT EXISTS idx_quotes_overall_score ON quotes(overall_score)",
"CREATE INDEX IF NOT EXISTS idx_quotes_response_type ON quotes(response_type)",
"CREATE INDEX IF NOT EXISTS idx_user_consent_guild_id ON user_consent(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_profiles_user_id ON speaker_profiles(user_id)",
"CREATE INDEX IF NOT EXISTS idx_quote_feedback_quote_id ON quote_feedback(quote_id)",
"CREATE INDEX IF NOT EXISTS idx_audio_clips_delete_after ON audio_clips(delete_after)",
"CREATE INDEX IF NOT EXISTS idx_memory_embeddings_user_id ON memory_embeddings(user_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_diarizations_guild_id ON speaker_diarizations(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_diarizations_channel_id ON speaker_diarizations(channel_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_diarizations_timestamp ON speaker_diarizations(timestamp)",
"CREATE INDEX IF NOT EXISTS idx_speaker_segments_diarization_id ON speaker_segments(diarization_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_segments_user_id ON speaker_segments(user_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_segments_needs_tagging ON speaker_segments(needs_tagging)",
"CREATE INDEX IF NOT EXISTS idx_transcription_sessions_clip_id ON transcription_sessions(clip_id)",
"CREATE INDEX IF NOT EXISTS idx_transcription_sessions_guild_id ON transcription_sessions(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_transcription_sessions_timestamp ON transcription_sessions(timestamp)",
"CREATE INDEX IF NOT EXISTS idx_transcribed_segments_transcription_id ON transcribed_segments(transcription_id)",
"CREATE INDEX IF NOT EXISTS idx_transcribed_segments_user_id ON transcribed_segments(user_id)",
"CREATE INDEX IF NOT EXISTS idx_transcribed_segments_is_quote_candidate ON transcribed_segments(is_quote_candidate)",
"CREATE INDEX IF NOT EXISTS idx_quote_analysis_metadata_quote_id ON quote_analysis_metadata(quote_id)",
"CREATE INDEX IF NOT EXISTS idx_laughter_analyses_audio_file_path ON laughter_analyses(audio_file_path)",
"CREATE INDEX IF NOT EXISTS idx_laughter_analyses_created_at ON laughter_analyses(created_at)",
"CREATE INDEX IF NOT EXISTS idx_laughter_segments_analysis_id ON laughter_segments(analysis_id)",
"CREATE INDEX IF NOT EXISTS idx_rotation_queue_guild_id ON rotation_queue(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_rotation_queue_sent ON rotation_queue(sent)",
"CREATE INDEX IF NOT EXISTS idx_rotation_queue_quote_score ON rotation_queue(quote_score)",
"CREATE INDEX IF NOT EXISTS idx_daily_queue_guild_id ON daily_queue(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_daily_queue_sent ON daily_queue(sent)",
"CREATE INDEX IF NOT EXISTS idx_daily_queue_quote_score ON daily_queue(quote_score)",
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_session_id ON speaker_tagging_sessions(session_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_guild_id ON speaker_tagging_sessions(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_status ON speaker_tagging_sessions(status)",
"CREATE INDEX IF NOT EXISTS idx_speaker_tagging_sessions_expires_at ON speaker_tagging_sessions(expires_at)",
"CREATE INDEX IF NOT EXISTS idx_speaker_identifications_session_id ON speaker_identifications(session_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_identifications_identified_user_id ON speaker_identifications(identified_user_id)",
"CREATE INDEX IF NOT EXISTS idx_voice_embeddings_user_id ON voice_embeddings(user_id)",
"CREATE INDEX IF NOT EXISTS idx_voice_embeddings_created_at ON voice_embeddings(created_at)",
"CREATE INDEX IF NOT EXISTS idx_voice_embeddings_method ON voice_embeddings(method)",
"CREATE INDEX IF NOT EXISTS idx_speaker_recognition_results_identified_user_id ON speaker_recognition_results(identified_user_id)",
"CREATE INDEX IF NOT EXISTS idx_speaker_recognition_results_created_at ON speaker_recognition_results(created_at)",
"CREATE INDEX IF NOT EXISTS idx_speaker_recognition_results_method ON speaker_recognition_results(method)",
"CREATE INDEX IF NOT EXISTS idx_memory_entries_user_id ON memory_entries(user_id)",
"CREATE INDEX IF NOT EXISTS idx_memory_entries_guild_id ON memory_entries(guild_id)",
"CREATE INDEX IF NOT EXISTS idx_memory_entries_memory_type ON memory_entries(memory_type)",
"CREATE INDEX IF NOT EXISTS idx_memory_entries_created_at ON memory_entries(created_at)",
"CREATE INDEX IF NOT EXISTS idx_memory_entries_importance_score ON memory_entries(importance_score)",
"CREATE INDEX IF NOT EXISTS idx_memory_entries_last_accessed ON memory_entries(last_accessed)",
"CREATE INDEX IF NOT EXISTS idx_personality_profiles_user_id ON personality_profiles(user_id)",
"CREATE INDEX IF NOT EXISTS idx_personality_profiles_last_updated ON personality_profiles(last_updated)",
]
try:
async with self.acquire_connection() as conn:
# Create tables
for query in schema_queries:
await conn.execute(query)
# Create indexes
for query in index_queries:
await conn.execute(query)
logger.info("Database schema initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize schema: {e}")
raise
async def _run_migrations(self):
"""Run any pending database migrations"""
# Create migrations table if it doesn't exist
await self.execute_query(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
version VARCHAR(50) PRIMARY KEY,
executed_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW()
)
"""
)
# Add any migration logic here
logger.info("Database migrations completed")
async def check_health(self) -> Dict[str, Any]:
"""Check database health and return status"""
try:
start_time = asyncio.get_event_loop().time()
# Simple connectivity test
await self.execute_query("SELECT 1", fetch_one=True)
end_time = asyncio.get_event_loop().time()
latency = (end_time - start_time) * 1000 # Convert to milliseconds
# Check pool status
pool_info = {}
if self.pool:
pool_info = {
"size": self.pool.get_size(),
"min_size": self.pool.get_min_size(),
"max_size": self.pool.get_max_size(),
"idle_connections": self.pool.get_idle_size(),
}
return {"healthy": True, "latency_ms": latency, "pool_info": pool_info}
except Exception as e:
return {"healthy": False, "error": str(e)}
# User Consent Management Methods
async def grant_consent(
self, user_id: int, guild_id: int, first_name: Optional[str] = None
) -> bool:
"""Grant recording consent for a user in a guild"""
try:
await self.execute_query(
"""
INSERT INTO user_consent (user_id, guild_id, consent_given, consent_timestamp, first_name)
VALUES ($1, $2, TRUE, NOW(), $3)
ON CONFLICT (user_id)
DO UPDATE SET
consent_given = TRUE,
consent_timestamp = NOW(),
first_name = COALESCE(EXCLUDED.first_name, user_consent.first_name),
updated_at = NOW()
""",
user_id,
guild_id,
first_name,
)
logger.info(f"Consent granted for user {user_id} in guild {guild_id}")
return True
except Exception as e:
logger.error(f"Failed to grant consent: {e}")
return False
async def revoke_consent(self, user_id: int, guild_id: int) -> bool:
"""Revoke recording consent for a user in a guild"""
try:
await self.execute_query(
"""
UPDATE user_consent
SET consent_given = FALSE, updated_at = NOW()
WHERE user_id = $1 AND guild_id = $2
""",
user_id,
guild_id,
)
logger.info(f"Consent revoked for user {user_id} in guild {guild_id}")
return True
except Exception as e:
logger.error(f"Failed to revoke consent: {e}")
return False
async def check_consent(self, user_id: int, guild_id: int) -> bool:
"""Check if user has given consent for recording in guild"""
try:
result = await self.execute_query(
"""
SELECT consent_given, global_opt_out
FROM user_consent
WHERE user_id = $1 AND guild_id = $2
""",
user_id,
guild_id,
fetch_one=True,
)
if result:
return result["consent_given"] and not result["global_opt_out"]
return False
except Exception as e:
logger.error(f"Failed to check consent: {e}")
return False
async def set_global_opt_out(self, user_id: int, opt_out: bool) -> bool:
"""Set global opt-out status for user across all guilds"""
try:
await self.execute_query(
"""
UPDATE user_consent
SET global_opt_out = $2, updated_at = NOW()
WHERE user_id = $1
""",
user_id,
opt_out,
)
logger.info(f"Global opt-out set to {opt_out} for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to set global opt-out: {e}")
return False
async def get_consented_users(self, guild_id: int) -> List[int]:
"""Get list of users who have consented to recording in guild"""
try:
results = await self.execute_query(
"""
SELECT user_id
FROM user_consent
WHERE guild_id = $1 AND consent_given = TRUE AND global_opt_out = FALSE
""",
guild_id,
fetch_all=True,
)
return [row["user_id"] for row in results]
except Exception as e:
logger.error(f"Failed to get consented users: {e}")
return []
# Quote Management Methods
async def save_quote(self, quote_data: QuoteData) -> Optional[int]:
"""Save a quote to the database and return the quote ID"""
try:
async with self.acquire_connection() as conn:
async with conn.transaction():
# Insert quote
result = await conn.fetchrow(
"""
INSERT INTO quotes (
user_id, speaker_label, username, quote, timestamp, guild_id, channel_id,
funny_score, dark_score, silly_score, suspicious_score, asinine_score,
overall_score, laughter_duration, laughter_intensity, response_type,
audio_clip_path, speaker_confidence
) VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18
) RETURNING id
""",
quote_data.user_id,
quote_data.speaker_label,
quote_data.username,
quote_data.quote,
quote_data.timestamp or datetime.now(timezone.utc),
quote_data.guild_id,
quote_data.channel_id,
quote_data.funny_score,
quote_data.dark_score,
quote_data.silly_score,
quote_data.suspicious_score,
quote_data.asinine_score,
quote_data.overall_score,
quote_data.laughter_duration,
quote_data.laughter_intensity,
quote_data.response_type,
quote_data.audio_clip_path,
quote_data.speaker_confidence,
)
quote_id = result["id"]
logger.info(f"Quote saved with ID: {quote_id}")
# Update speaker profile stats if user is known (within transaction)
if quote_data.user_id:
await conn.execute(
"""
INSERT INTO speaker_profiles (user_id, quote_count, avg_humor_score, last_seen)
VALUES ($1, 1, $2, NOW())
ON CONFLICT (user_id) DO UPDATE SET
quote_count = speaker_profiles.quote_count + 1,
avg_humor_score = (
(speaker_profiles.avg_humor_score * speaker_profiles.quote_count + $2) /
(speaker_profiles.quote_count + 1)
),
last_seen = NOW(),
updated_at = NOW()
""",
quote_data.user_id,
quote_data.funny_score,
)
return quote_id
except Exception as e:
logger.error(f"Failed to save quote: {e}")
return None
async def get_quotes_by_score(
self, guild_id: int, min_score: float, limit: int = 50
) -> List[Dict]:
"""Get quotes above a minimum score threshold"""
try:
results = await self.execute_query(
"""
SELECT * FROM quotes
WHERE guild_id = $1 AND overall_score >= $2
ORDER BY overall_score DESC, timestamp DESC
LIMIT $3
""",
guild_id,
min_score,
limit,
fetch_all=True,
)
quotes = [dict(row) for row in results] if results else []
query_time = round((time.perf_counter() - start_time) * 1000, 2)
db_logger.info("Retrieved quotes by score", {
'guild_id': guild_id,
'min_score': min_score,
'limit': limit,
'result_count': len(quotes),
'duration_ms': query_time
})
return quotes
except Exception as e:
query_time = round((time.perf_counter() - start_time) * 1000, 2)
db_logger.error("Failed to get quotes by score", {
'guild_id': guild_id,
'min_score': min_score,
'limit': limit,
'duration_ms': query_time,
'error_type': type(e).__name__,
'error_message': str(e)[:200]
}, exc_info=True)
return []
async def get_user_quotes(
self, user_id: int, guild_id: int, limit: int = 50
) -> List[Dict]:
"""Get quotes from a specific user"""
try:
results = await self.execute_query(
"""
SELECT * FROM quotes
WHERE user_id = $1 AND guild_id = $2
ORDER BY timestamp DESC
LIMIT $3
""",
user_id,
guild_id,
limit,
fetch_all=True,
)
return [dict(row) for row in results]
except Exception as e:
logger.error(f"Failed to get user quotes: {e}")
return []
async def delete_user_quotes(
self, user_id: int, guild_id: Optional[int] = None
) -> int:
"""Delete all quotes from a user, optionally in a specific guild"""
try:
if guild_id:
result = await self.execute_query(
"""
DELETE FROM quotes WHERE user_id = $1 AND guild_id = $2
""",
user_id,
guild_id,
)
else:
result = await self.execute_query(
"""
DELETE FROM quotes WHERE user_id = $1
""",
user_id,
)
# Extract number of deleted rows from result string
deleted_count = int(result.split()[-1]) if result else 0
logger.info(f"Deleted {deleted_count} quotes for user {user_id}")
return deleted_count
except Exception as e:
logger.error(f"Failed to delete user quotes: {e}")
return 0
async def update_quote_speaker(
self, quote_id: int, user_id: int, tagger_id: int
) -> bool:
"""Update quote with identified speaker information"""
try:
await self.execute_query(
"""
UPDATE quotes
SET user_id = $2, speaker_confidence = 1.0
WHERE id = $1
""",
quote_id,
user_id,
)
# Record the tagging feedback
await self.execute_query(
"""
INSERT INTO quote_feedback (quote_id, user_id, feedback_type, feedback_value)
VALUES ($1, $2, 'tag_speaker', $3)
""",
quote_id,
tagger_id,
json.dumps({"tagged_user_id": user_id}),
)
logger.info(f"Quote {quote_id} tagged as user {user_id} by {tagger_id}")
return True
except Exception as e:
logger.error(f"Failed to update quote speaker: {e}")
return False
# Speaker Profile Management Methods
async def store_speaker_profile(
self, user_id: int, voice_embedding: bytes, metadata: Dict
) -> bool:
"""Store speaker voice embedding and metadata"""
try:
await self.execute_query(
"""
INSERT INTO speaker_profiles (
user_id, voice_embedding, enrollment_status, enrollment_phrase,
training_samples
) VALUES ($1, $2, $3, $4, 1)
ON CONFLICT (user_id) DO UPDATE SET
voice_embedding = EXCLUDED.voice_embedding,
enrollment_status = EXCLUDED.enrollment_status,
enrollment_phrase = EXCLUDED.enrollment_phrase,
training_samples = speaker_profiles.training_samples + 1,
updated_at = NOW()
""",
user_id,
voice_embedding,
metadata.get("status", "enrolled"),
metadata.get("phrase"),
)
logger.info(f"Speaker profile stored for user {user_id}")
return True
except Exception as e:
logger.error(f"Failed to store speaker profile: {e}")
return False
async def get_speaker_profile(self, user_id: int) -> Optional[Dict]:
"""Get speaker profile for a user"""
try:
result = await self.execute_query(
"""
SELECT * FROM speaker_profiles WHERE user_id = $1
""",
user_id,
fetch_one=True,
)
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to get speaker profile: {e}")
return None
# Audio Clip Management Methods
async def register_audio_clip(
self,
guild_id: int,
channel_id: int,
file_path: str,
duration: float,
delete_after_hours: int = 24,
) -> int:
"""Register an audio clip for tracking and cleanup"""
try:
delete_after = datetime.now(timezone.utc) + timedelta(
hours=delete_after_hours
)
result = await self.execute_query(
"""
INSERT INTO audio_clips (guild_id, channel_id, file_path, duration, delete_after)
VALUES ($1, $2, $3, $4, $5)
RETURNING id
""",
guild_id,
channel_id,
file_path,
duration,
delete_after,
fetch_one=True,
)
return result["id"]
except Exception as e:
logger.error(f"Failed to register audio clip: {e}")
return 0
async def get_expired_audio_clips(self) -> List[Dict]:
"""Get audio clips that should be deleted"""
try:
results = await self.execute_query(
"""
SELECT * FROM audio_clips
WHERE delete_after <= NOW() AND processed = TRUE
""",
fetch_all=True,
)
return [dict(row) for row in results]
except Exception as e:
logger.error(f"Failed to get expired audio clips: {e}")
return []
async def mark_audio_clip_processed(self, clip_id: int) -> bool:
"""Mark an audio clip as processed"""
try:
await self.execute_query(
"""
UPDATE audio_clips SET processed = TRUE WHERE id = $1
""",
clip_id,
)
return True
except Exception as e:
logger.error(f"Failed to mark audio clip as processed: {e}")
return False
async def cleanup_expired_clips(self) -> int:
"""Remove expired audio clip records from database"""
try:
result = await self.execute_query(
"""
DELETE FROM audio_clips WHERE delete_after <= NOW()
"""
)
deleted_count = int(result.split()[-1]) if result else 0
logger.info(f"Cleaned up {deleted_count} expired audio clip records")
return deleted_count
except Exception as e:
logger.error(f"Failed to cleanup expired clips: {e}")
return 0
async def search_quotes(
self,
guild_id: int,
search_term: Optional[str] = None,
user_id: Optional[int] = None,
limit: int = 5,
) -> List[Dict]:
"""Search quotes with optional filters"""
try:
query = """
SELECT q.*, u.username as speaker_name
FROM quotes q
LEFT JOIN user_consent u ON q.user_id = u.user_id AND q.guild_id = u.guild_id
WHERE q.guild_id = $1
"""
params: List[Any] = [guild_id]
param_count = 2
if search_term:
query += f" AND (q.quote ILIKE ${param_count} OR u.username ILIKE ${param_count})"
params.append(f"%{search_term}%")
param_count += 1
if user_id:
query += f" AND q.user_id = ${param_count}"
params.append(user_id)
param_count += 1
query += " ORDER BY q.overall_score DESC, q.timestamp DESC"
query += f" LIMIT ${param_count}"
params.append(limit)
result = await self.execute_query(query, *params, fetch_all=True)
return [dict(row) for row in result] if result else []
except Exception as e:
logger.error(f"Failed to search quotes: {e}")
return []
async def get_quote_stats(self, guild_id: int) -> Dict[str, Any]:
"""Get quote statistics for a guild"""
try:
stats = {}
# Total quotes
result = await self.execute_query(
"SELECT COUNT(*) FROM quotes WHERE guild_id = $1",
guild_id,
fetch_one=True,
)
stats["total_quotes"] = result[0] if result else 0
# Unique speakers
result = await self.execute_query(
"SELECT COUNT(DISTINCT user_id) FROM quotes WHERE guild_id = $1",
guild_id,
fetch_one=True,
)
stats["unique_speakers"] = result[0] if result else 0
# Average score
result = await self.execute_query(
"SELECT AVG(overall_score) FROM quotes WHERE guild_id = $1",
guild_id,
fetch_one=True,
)
stats["avg_score"] = float(result[0]) if result and result[0] else 0.0
# Max score
result = await self.execute_query(
"SELECT MAX(overall_score) FROM quotes WHERE guild_id = $1",
guild_id,
fetch_one=True,
)
stats["max_score"] = float(result[0]) if result and result[0] else 0.0
# This week
result = await self.execute_query(
"SELECT COUNT(*) FROM quotes WHERE guild_id = $1 AND timestamp >= NOW() - INTERVAL '7 days'",
guild_id,
fetch_one=True,
)
stats["quotes_this_week"] = result[0] if result else 0
# This month
result = await self.execute_query(
"SELECT COUNT(*) FROM quotes WHERE guild_id = $1 AND timestamp >= NOW() - INTERVAL '30 days'",
guild_id,
fetch_one=True,
)
stats["quotes_this_month"] = result[0] if result else 0
return stats
except Exception as e:
logger.error(f"Failed to get quote stats: {e}")
return {}
async def get_top_quotes(self, guild_id: int, limit: int = 5) -> List[Dict]:
"""Get top-rated quotes for a guild"""
try:
result = await self.execute_query(
"""
SELECT q.*, u.username as speaker_name
FROM quotes q
LEFT JOIN user_consent u ON q.user_id = u.user_id AND q.guild_id = u.guild_id
WHERE q.guild_id = $1
ORDER BY q.overall_score DESC, q.timestamp DESC
LIMIT $2
""",
guild_id,
limit,
fetch_all=True,
)
return [dict(row) for row in result] if result else []
except Exception as e:
logger.error(f"Failed to get top quotes: {e}")
return []
async def get_random_quote(self, guild_id: int) -> Optional[Dict]:
"""Get a random quote from a guild"""
try:
result = await self.execute_query(
"""
SELECT q.*, u.username as speaker_name
FROM quotes q
LEFT JOIN user_consent u ON q.user_id = u.user_id AND q.guild_id = u.guild_id
WHERE q.guild_id = $1
ORDER BY RANDOM()
LIMIT 1
""",
guild_id,
fetch_one=True,
)
return dict(result) if result else None
except Exception as e:
logger.error(f"Failed to get random quote: {e}")
return None
async def get_admin_stats(self) -> Dict[str, Any]:
"""Get comprehensive admin statistics"""
try:
stats = {}
# Total quotes across all guilds
result = await self.execute_query(
"SELECT COUNT(*) FROM quotes", fetch_one=True
)
stats["total_quotes"] = result[0] if result else 0
# Unique speakers across all guilds
result = await self.execute_query(
"SELECT COUNT(DISTINCT user_id) FROM quotes", fetch_one=True
)
stats["unique_speakers"] = result[0] if result else 0
# Active consents
result = await self.execute_query(
"SELECT COUNT(*) FROM user_consent WHERE consent_given = TRUE AND global_opt_out = FALSE",
fetch_one=True,
)
stats["active_consents"] = result[0] if result else 0
return stats
except Exception as e:
logger.error(f"Failed to get admin stats: {e}")
return {}
async def get_server_config(self, guild_id: int) -> Dict[str, Any]:
"""Get server configuration"""
try:
result = await self.execute_query(
"SELECT * FROM server_config WHERE guild_id = $1",
guild_id,
fetch_one=True,
)
if result:
return dict(result)
else:
# Return defaults
return {"quote_threshold": 6.0, "auto_record": False}
except Exception as e:
logger.error(f"Failed to get server config: {e}")
return {"quote_threshold": 6.0, "auto_record": False}
async def update_server_config(self, guild_id: int, config: Dict[str, Any]) -> bool:
"""Update server configuration"""
try:
# Insert or update configuration
query = """
INSERT INTO server_config (guild_id, quote_threshold, auto_record)
VALUES ($1, $2, $3)
ON CONFLICT (guild_id)
DO UPDATE SET
quote_threshold = COALESCE($2, server_config.quote_threshold),
auto_record = COALESCE($3, server_config.auto_record)
"""
await self.execute_query(
query,
guild_id,
config.get("quote_threshold"),
config.get("auto_record"),
)
return True
except Exception as e:
logger.error(f"Failed to update server config: {e}")
return False
async def purge_user_quotes(self, guild_id: int, user_id: int) -> int:
"""Purge all quotes from a specific user in a guild"""
try:
result = await self.execute_query(
"DELETE FROM quotes WHERE guild_id = $1 AND user_id = $2",
guild_id,
user_id,
)
# Extract count from result string
deleted_count = int(result.split()[-1]) if result else 0
return deleted_count
except Exception as e:
logger.error(f"Failed to purge user quotes: {e}")
return 0
async def purge_old_quotes(self, guild_id: int, days: int) -> int:
"""Purge quotes older than specified days"""
try:
result = await self.execute_query(
"DELETE FROM quotes WHERE guild_id = $1 AND timestamp < NOW() - $2 * INTERVAL '1 day'",
guild_id,
days,
)
# Extract count from result string
deleted_count = int(result.split()[-1]) if result else 0
return deleted_count
except Exception as e:
logger.error(f"Failed to purge old quotes: {e}")
return 0