""" Memory System for Discord Voice Chat Quote Bot Implements long-term memory using Qdrant vector database for personality tracking, conversation context, and intelligent quote analysis with semantic understanding. """ import asyncio import json import logging import uuid from dataclasses import dataclass from datetime import datetime, timedelta, timezone from enum import Enum from typing import TYPE_CHECKING, Any, Dict, List, Optional if TYPE_CHECKING: import numpy as np else: try: import numpy as np except ImportError: # Fallback for environments without numpy np = None try: from qdrant_client import QdrantClient from qdrant_client.http import models from qdrant_client.http.models import (Distance, FieldCondition, Filter, PointStruct, VectorParams) except ImportError: # Fallback for environments without qdrant-client QdrantClient = None models = None Distance = None VectorParams = None PointStruct = None Filter = None FieldCondition = None from config.settings import Settings from core.ai_manager import AIProviderManager from core.database import DatabaseManager logger = logging.getLogger(__name__) class MemoryType(Enum): """Types of memories stored in the system""" CONVERSATION = "conversation" PERSONALITY = "personality" QUOTE_CONTEXT = "quote_context" USER_INTERACTION = "user_interaction" BEHAVIORAL_PATTERN = "behavioral_pattern" HUMOR_PREFERENCE = "humor_preference" class RelevanceType(Enum): """Types of relevance scoring for memory retrieval""" SEMANTIC = "semantic" TEMPORAL = "temporal" CONTEXTUAL = "contextual" PERSONALITY = "personality" @dataclass class MemoryEntry: """Individual memory entry""" id: str user_id: int guild_id: int memory_type: MemoryType content: str embedding: Optional["np.ndarray"] metadata: Dict[str, Any] relevance_score: float created_at: datetime last_accessed: datetime access_count: int importance_score: float @dataclass class ConversationContext: """Context for ongoing conversations""" guild_id: int channel_id: int participants: List[int] topic_keywords: List[str] emotional_tone: str start_time: datetime last_activity: datetime message_count: int @dataclass class PersonalityProfile: """User personality profile based on conversation history""" user_id: int humor_preferences: Dict[str, float] # funny, dark, silly, etc. communication_style: Dict[str, float] # formal, casual, sarcastic, etc. interaction_patterns: Dict[str, Any] topic_interests: List[str] activity_periods: List[Dict[str, Any]] personality_keywords: List[str] last_updated: datetime class MemoryManager: """ Long-term memory system using Qdrant vector database Features: - Semantic memory storage and retrieval - Personality profiling and tracking - Conversation context management - Intelligent quote contextualization - Behavioral pattern recognition - Memory importance scoring and pruning - Multi-dimensional similarity search """ def __init__( self, ai_manager: AIProviderManager, db_manager: DatabaseManager, settings: Settings, ): self.ai_manager = ai_manager self.db_manager = db_manager self.settings = settings # Qdrant client self.qdrant_client: Optional["QdrantClient"] = None self.collection_name = "quote_bot_memories" # Memory configuration self.embedding_dimension = 384 # Standard sentence transformer dimension self.max_memories_per_user = 10000 # Memory limit per user self.memory_retention_days = 90 # Days to retain memories self.importance_threshold = 0.3 # Minimum importance to retain # Context tracking self.active_conversations: Dict[str, ConversationContext] = {} self.personality_profiles: Dict[int, PersonalityProfile] = {} # Background tasks self._memory_consolidation_task: Optional[asyncio.Task[None]] = None self._personality_update_task: Optional[asyncio.Task[None]] = None self._context_cleanup_task: Optional[asyncio.Task[None]] = None # Statistics self.total_memories = 0 self.total_retrievals = 0 self.cache_hits = 0 self.embedding_generations = 0 self._initialized = False async def initialize(self): """Initialize the memory system""" if self._initialized: logger.debug("Memory system already initialized, skipping") return try: logger.info( "Initializing memory system", extra={ "component": "memory_manager", "operation": "initialize", "collection_name": self.collection_name, "embedding_dimension": self.embedding_dimension, "max_memories_per_user": self.max_memories_per_user, "retention_days": self.memory_retention_days, "importance_threshold": self.importance_threshold } ) # Initialize Qdrant client await self._initialize_qdrant() # Load existing personality profiles await self._load_personality_profiles() # Start background tasks self._memory_consolidation_task = asyncio.create_task( self._memory_consolidation_worker() ) self._personality_update_task = asyncio.create_task( self._personality_update_worker() ) self._context_cleanup_task = asyncio.create_task( self._context_cleanup_worker() ) self._initialized = True logger.info( "Memory system initialized successfully", extra={ "component": "memory_manager", "operation": "initialize", "status": "success", "qdrant_connected": self.qdrant_client is not None, "personality_profiles_loaded": len(self.personality_profiles), "background_tasks_started": 3 } ) except Exception as e: logger.error( "Failed to initialize memory system", extra={ "component": "memory_manager", "operation": "initialize", "error": str(e), "error_type": type(e).__name__ }, exc_info=True ) raise async def _initialize_qdrant(self): """Initialize Qdrant vector database""" try: logger.debug( "Initializing Qdrant connection", extra={ "component": "memory_manager", "operation": "_initialize_qdrant", "collection_name": self.collection_name, "embedding_dimension": self.embedding_dimension } ) if QdrantClient is None: logger.warning( "Qdrant client not available - memory system will use basic functionality" ) return # Get Qdrant connection settings qdrant_url = self.settings.qdrant_url or "http://localhost:6333" qdrant_api_key = self.settings.qdrant_api_key # Create Qdrant client if qdrant_api_key: self.qdrant_client = QdrantClient( url=qdrant_url, api_key=qdrant_api_key ) else: self.qdrant_client = QdrantClient(url=qdrant_url) # Check if collection exists if self.qdrant_client is None: raise ValueError("Qdrant client not initialized") collections = await asyncio.get_event_loop().run_in_executor( None, self.qdrant_client.get_collections ) collection_exists = any( collection.name == self.collection_name for collection in collections.collections ) if not collection_exists: # Create collection if self.qdrant_client is None: raise ValueError("Qdrant client not initialized") await asyncio.get_event_loop().run_in_executor( None, lambda: self.qdrant_client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams( size=self.embedding_dimension, distance=Distance.COSINE ), ), ) logger.info(f"Created Qdrant collection: {self.collection_name}") else: logger.info(f"Using existing Qdrant collection: {self.collection_name}") except Exception as e: logger.error(f"Failed to initialize Qdrant: {e}") self.qdrant_client = None async def store_memory( self, user_id: int, guild_id: int, memory_type: MemoryType, content: str, metadata: Optional[Dict[str, Any]] = None, ) -> str: """ Store a new memory entry Args: user_id: Discord user ID guild_id: Discord guild ID memory_type: Type of memory content: Memory content metadata: Additional metadata Returns: str: Memory ID """ # Generate unique memory ID first memory_id = str(uuid.uuid4()) try: if not self._initialized: await self.initialize() # Generate embedding for content embedding = await self._generate_embedding(content) if embedding is None: logger.warning( f"Failed to generate embedding for memory: {content[:50]}..." ) # Still store the memory without embedding # Calculate importance score importance_score = await self._calculate_importance_score( content, memory_type, metadata or {} ) # Create memory entry memory_entry = MemoryEntry( id=memory_id, user_id=user_id, guild_id=guild_id, memory_type=memory_type, content=content, embedding=embedding, metadata=metadata or {}, relevance_score=1.0, # Initial relevance created_at=datetime.now(timezone.utc), last_accessed=datetime.now(timezone.utc), access_count=0, importance_score=importance_score, ) # Store in Qdrant if available and embedding generated if self.qdrant_client and embedding is not None: await self._store_in_qdrant(memory_entry) # Store metadata in PostgreSQL for efficient filtering await self._store_memory_metadata(memory_entry) # Update personality profile if relevant if memory_type in [MemoryType.PERSONALITY, MemoryType.USER_INTERACTION]: await self._update_personality_profile(user_id, memory_entry) self.total_memories += 1 logger.debug(f"Stored memory {memory_id} for user {user_id}") return memory_id except Exception as e: logger.error(f"Failed to store memory: {e}") return memory_id async def retrieve_memories( self, user_id: int, query: str, memory_types: Optional[List[MemoryType]] = None, limit: int = 10, relevance_threshold: float = 0.7, ) -> List[MemoryEntry]: """ Retrieve relevant memories for a user and query Args: user_id: Discord user ID query: Query text for semantic search memory_types: Optional filter by memory types limit: Maximum number of memories to return relevance_threshold: Minimum relevance score Returns: List[MemoryEntry]: Retrieved memories """ try: if not self._initialized: await self.initialize() # If no Qdrant client or models, fall back to database-only search if not self.qdrant_client or not models or not Filter or not FieldCondition: return await self._retrieve_memories_fallback( user_id, query, memory_types, limit ) # Generate query embedding query_embedding = await self._generate_embedding(query) if query_embedding is None: logger.warning("Failed to generate query embedding") return await self._retrieve_memories_fallback( user_id, query, memory_types, limit ) # Build filter conditions filter_conditions = [ FieldCondition(key="user_id", match=models.MatchValue(value=user_id)) ] if memory_types: type_values = [mt.value for mt in memory_types] filter_conditions.append( FieldCondition( key="memory_type", match=models.MatchAny(any=type_values) ) ) # Search in Qdrant search_results = await asyncio.get_event_loop().run_in_executor( None, lambda: self.qdrant_client.search( collection_name=self.collection_name, query_vector=query_embedding.tolist(), query_filter=Filter(must=filter_conditions), limit=limit * 2, # Get more results for filtering score_threshold=relevance_threshold, ), ) # Convert results to memory entries memories = [] for result in search_results: try: memory_entry = await self._reconstruct_memory_entry(result) if memory_entry: # Update access tracking memory_entry.last_accessed = datetime.now(timezone.utc) memory_entry.access_count += 1 memory_entry.relevance_score = result.score memories.append(memory_entry) # Update access in database await self._update_memory_access(memory_entry.id) except Exception as e: logger.warning(f"Failed to reconstruct memory entry: {e}") continue # Sort by relevance and importance memories.sort( key=lambda m: (m.relevance_score * 0.7 + m.importance_score * 0.3), reverse=True, ) # Limit results memories = memories[:limit] self.total_retrievals += 1 logger.debug(f"Retrieved {len(memories)} memories for user {user_id}") return memories except Exception as e: logger.error(f"Failed to retrieve memories: {e}") return [] async def _retrieve_memories_fallback( self, user_id: int, query: str, memory_types: Optional[List[MemoryType]] = None, limit: int = 10, ) -> List[MemoryEntry]: """Fallback method for retrieving memories without vector search""" try: # Build SQL query with text search type_filter = "" params = [user_id] if memory_types: type_values = [mt.value for mt in memory_types] type_filter = "AND memory_type = ANY($2)" params.append(type_values) limit_param = "$3" else: limit_param = "$2" params.append(limit) # Simple text search query query_sql = f""" SELECT * FROM memory_entries WHERE user_id = $1 {type_filter} AND (content ILIKE '%' || ${'$3' if memory_types else '$2'} || '%') ORDER BY importance_score DESC, created_at DESC LIMIT {limit_param} """ if memory_types: search_params = params[:-1] + [f"%{query}%"] + [params[-1]] else: search_params = params[:-1] + [f"%{query}%"] + [params[-1]] results = await self.db_manager.execute_query( query_sql, *search_params, fetch_all=True ) memories = [] for result in results: memory_entry = MemoryEntry( id=result["id"], user_id=result["user_id"], guild_id=result["guild_id"], memory_type=MemoryType(result["memory_type"]), content=result["content"], embedding=None, metadata=result.get("metadata", {}), relevance_score=0.8, # Default relevance for text match created_at=result["created_at"], last_accessed=result["last_accessed"], access_count=result["access_count"], importance_score=result["importance_score"], ) memories.append(memory_entry) return memories except Exception as e: logger.error(f"Failed to retrieve memories with fallback: {e}") return [] async def get_personality_profile( self, user_id: int ) -> Optional[PersonalityProfile]: """Get personality profile for a user""" try: if user_id in self.personality_profiles: return self.personality_profiles[user_id] # Load from database profile_data = await self.db_manager.execute_query( """ SELECT * FROM personality_profiles WHERE user_id = $1 """, user_id, fetch_one=True, ) if profile_data: profile = PersonalityProfile( user_id=user_id, humor_preferences=json.loads(profile_data["humor_preferences"]), communication_style=json.loads(profile_data["communication_style"]), interaction_patterns=json.loads( profile_data["interaction_patterns"] ), topic_interests=json.loads(profile_data["topic_interests"]), activity_periods=json.loads(profile_data["activity_periods"]), personality_keywords=json.loads( profile_data["personality_keywords"] ), last_updated=profile_data["last_updated"], ) self.personality_profiles[user_id] = profile return profile return None except Exception as e: logger.error(f"Failed to get personality profile: {e}") return None async def update_conversation_context( self, guild_id: int, channel_id: int, participants: List[int], content: str ): """Update ongoing conversation context""" try: context_key = f"{guild_id}_{channel_id}" if context_key not in self.active_conversations: # Create new conversation context self.active_conversations[context_key] = ConversationContext( guild_id=guild_id, channel_id=channel_id, participants=participants, topic_keywords=[], emotional_tone="neutral", start_time=datetime.now(timezone.utc), last_activity=datetime.now(timezone.utc), message_count=0, ) context = self.active_conversations[context_key] context.last_activity = datetime.now(timezone.utc) context.message_count += 1 # Update participants for participant in participants: if participant not in context.participants: context.participants.append(participant) # Extract keywords and emotional tone keywords = await self._extract_keywords(content) context.topic_keywords.extend(keywords) # Keep only recent keywords if len(context.topic_keywords) > 20: context.topic_keywords = context.topic_keywords[-20:] # Detect emotional tone emotional_tone = await self._detect_emotional_tone(content) if emotional_tone != "neutral": context.emotional_tone = emotional_tone except Exception as e: logger.error(f"Failed to update conversation context: {e}") async def get_contextual_insights( self, user_id: int, recent_quotes: List[str] ) -> Dict[str, Any]: """Get contextual insights for quote analysis""" try: insights = { "personality_match": 0.0, "humor_consistency": 0.0, "interaction_patterns": {}, "topic_relevance": 0.0, "behavioral_prediction": {}, } # Get personality profile profile = await self.get_personality_profile(user_id) if not profile: return insights # Analyze quote consistency with personality for quote in recent_quotes: # Get relevant memories memories = await self.retrieve_memories( user_id, quote, [MemoryType.PERSONALITY, MemoryType.HUMOR_PREFERENCE], limit=5, ) if memories: # Calculate personality match score personality_scores = [] for memory in memories: if "humor_score" in memory.metadata: personality_scores.append(memory.metadata["humor_score"]) if personality_scores and np: insights["personality_match"] = np.mean(personality_scores) # Analyze humor consistency humor_preferences = profile.humor_preferences if humor_preferences: consistency_scores = [] for quote in recent_quotes: # This would analyze quote humor against preferences # Simplified implementation consistency_scores.append(0.8) # Placeholder insights["humor_consistency"] = ( np.mean(consistency_scores) if consistency_scores and np else 0.0 ) # Add interaction patterns insights["interaction_patterns"] = profile.interaction_patterns return insights except Exception as e: logger.error(f"Failed to get contextual insights: {e}") return {} async def _generate_embedding(self, text: str) -> Optional[np.ndarray]: """Generate embedding for text content""" try: if np is None: logger.warning("NumPy not available - cannot generate embeddings") return None # Use AI manager to generate embeddings result = await self.ai_manager.generate_embedding(text) if result: # Handle different result formats if hasattr(result, "embedding"): embedding_data = result.embedding elif isinstance(result, list): embedding_data = result elif isinstance(result, dict) and "embedding" in result: embedding_data = result["embedding"] else: logger.warning( f"Unexpected embedding result format: {type(result)}" ) return None if not np: logger.warning("NumPy not available, cannot process embeddings") return None embedding = np.array(embedding_data, dtype=np.float32) # Ensure correct dimension if len(embedding) != self.embedding_dimension: # Pad or truncate as needed if len(embedding) < self.embedding_dimension: padding = np.zeros(self.embedding_dimension - len(embedding)) embedding = np.concatenate([embedding, padding]) else: embedding = embedding[: self.embedding_dimension] # Normalize norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm self.embedding_generations += 1 return embedding return None except Exception as e: logger.error(f"Failed to generate embedding: {e}") return None async def _calculate_importance_score( self, content: str, memory_type: MemoryType, metadata: Dict[str, Any] ) -> float: """Calculate importance score for memory entry""" try: base_scores = { MemoryType.CONVERSATION: 0.5, MemoryType.PERSONALITY: 0.8, MemoryType.QUOTE_CONTEXT: 0.7, MemoryType.USER_INTERACTION: 0.6, MemoryType.BEHAVIORAL_PATTERN: 0.9, MemoryType.HUMOR_PREFERENCE: 0.7, } base_score = base_scores.get(memory_type, 0.5) # Adjust based on content length and quality content_factor = min( 1.0, len(content.split()) / 10 ) # Longer content = more important # Adjust based on metadata indicators metadata_factor = 1.0 if "humor_score" in metadata and metadata["humor_score"] > 7: metadata_factor += 0.2 if "laughter_duration" in metadata and metadata["laughter_duration"] > 2: metadata_factor += 0.1 if "engagement_level" in metadata: metadata_factor += metadata["engagement_level"] * 0.1 importance_score = base_score * content_factor * metadata_factor return min(1.0, importance_score) except Exception as e: logger.error(f"Failed to calculate importance score: {e}") return 0.5 async def _store_in_qdrant(self, memory_entry: MemoryEntry): """Store memory entry in Qdrant vector database""" try: if ( memory_entry.embedding is None or not self.qdrant_client or not PointStruct ): return # Prepare payload payload = { "user_id": memory_entry.user_id, "guild_id": memory_entry.guild_id, "memory_type": memory_entry.memory_type.value, "content": memory_entry.content, "metadata": memory_entry.metadata, "importance_score": memory_entry.importance_score, "created_at": memory_entry.created_at.isoformat(), "access_count": memory_entry.access_count, } # Create point point = PointStruct( id=memory_entry.id, vector=memory_entry.embedding.tolist(), payload=payload, ) # Upsert to Qdrant await asyncio.get_event_loop().run_in_executor( None, lambda: self.qdrant_client.upsert( collection_name=self.collection_name, points=[point] ), ) except Exception as e: logger.error(f"Failed to store in Qdrant: {e}") async def _store_memory_metadata(self, memory_entry: MemoryEntry): """Store memory metadata in PostgreSQL for efficient filtering""" try: await self.db_manager.execute_query( """ INSERT INTO memory_entries (id, user_id, guild_id, memory_type, content, metadata, importance_score, created_at, last_accessed, access_count) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (id) DO UPDATE SET last_accessed = EXCLUDED.last_accessed, access_count = EXCLUDED.access_count """, memory_entry.id, memory_entry.user_id, memory_entry.guild_id, memory_entry.memory_type.value, memory_entry.content, json.dumps(memory_entry.metadata), memory_entry.importance_score, memory_entry.created_at, memory_entry.last_accessed, memory_entry.access_count, ) except Exception as e: logger.error(f"Failed to store memory metadata: {e}") async def _reconstruct_memory_entry(self, qdrant_result) -> Optional[MemoryEntry]: """Reconstruct memory entry from Qdrant search result""" try: payload = qdrant_result.payload memory_entry = MemoryEntry( id=qdrant_result.id, user_id=payload["user_id"], guild_id=payload["guild_id"], memory_type=MemoryType(payload["memory_type"]), content=payload["content"], embedding=None, # Not needed for retrieval metadata=payload.get("metadata", {}), relevance_score=qdrant_result.score, created_at=datetime.fromisoformat(payload["created_at"]), last_accessed=datetime.now(timezone.utc), access_count=payload.get("access_count", 0), importance_score=payload.get("importance_score", 0.5), ) return memory_entry except Exception as e: logger.error(f"Failed to reconstruct memory entry: {e}") return None async def _update_memory_access(self, memory_id: str): """Update memory access tracking""" try: await self.db_manager.execute_query( """ UPDATE memory_entries SET last_accessed = NOW(), access_count = access_count + 1 WHERE id = $1 """, memory_id, ) except Exception as e: logger.error(f"Failed to update memory access: {e}") async def _update_personality_profile( self, user_id: int, memory_entry: MemoryEntry ): """Update personality profile based on new memory""" try: profile = await self.get_personality_profile(user_id) if not profile: # Create new profile profile = PersonalityProfile( user_id=user_id, humor_preferences={}, communication_style={}, interaction_patterns={}, topic_interests=[], activity_periods=[], personality_keywords=[], last_updated=datetime.now(timezone.utc), ) # Update humor preferences if available if "humor_scores" in memory_entry.metadata: humor_scores = memory_entry.metadata["humor_scores"] for humor_type, score in humor_scores.items(): if humor_type in profile.humor_preferences: # Average with existing preference profile.humor_preferences[humor_type] = ( profile.humor_preferences[humor_type] + score ) / 2 else: profile.humor_preferences[humor_type] = score # Extract keywords from content keywords = await self._extract_keywords(memory_entry.content) profile.personality_keywords.extend(keywords) # Keep only recent keywords if len(profile.personality_keywords) > 50: profile.personality_keywords = profile.personality_keywords[-50:] # Update activity pattern current_hour = memory_entry.created_at.hour activity_pattern = { "hour": current_hour, "day_of_week": memory_entry.created_at.weekday(), "activity_type": memory_entry.memory_type.value, } profile.activity_periods.append(activity_pattern) # Keep only recent activity if len(profile.activity_periods) > 100: profile.activity_periods = profile.activity_periods[-100:] profile.last_updated = datetime.now(timezone.utc) # Store updated profile await self._store_personality_profile(profile) self.personality_profiles[user_id] = profile except Exception as e: logger.error(f"Failed to update personality profile: {e}") async def _store_personality_profile(self, profile: PersonalityProfile): """Store personality profile in database""" try: await self.db_manager.execute_query( """ INSERT INTO personality_profiles (user_id, humor_preferences, communication_style, interaction_patterns, topic_interests, activity_periods, personality_keywords, last_updated) VALUES ($1, $2, $3, $4, $5, $6, $7, $8) ON CONFLICT (user_id) DO UPDATE SET humor_preferences = EXCLUDED.humor_preferences, communication_style = EXCLUDED.communication_style, interaction_patterns = EXCLUDED.interaction_patterns, topic_interests = EXCLUDED.topic_interests, activity_periods = EXCLUDED.activity_periods, personality_keywords = EXCLUDED.personality_keywords, last_updated = EXCLUDED.last_updated """, profile.user_id, json.dumps(profile.humor_preferences), json.dumps(profile.communication_style), json.dumps(profile.interaction_patterns), json.dumps(profile.topic_interests), json.dumps(profile.activity_periods), json.dumps(profile.personality_keywords), profile.last_updated, ) except Exception as e: logger.error(f"Failed to store personality profile: {e}") async def _load_personality_profiles(self): """Load existing personality profiles from database""" try: profiles_data = await self.db_manager.execute_query( """ SELECT * FROM personality_profiles """, fetch_all=True, ) for profile_data in profiles_data: profile = PersonalityProfile( user_id=profile_data["user_id"], humor_preferences=json.loads(profile_data["humor_preferences"]), communication_style=json.loads(profile_data["communication_style"]), interaction_patterns=json.loads( profile_data["interaction_patterns"] ), topic_interests=json.loads(profile_data["topic_interests"]), activity_periods=json.loads(profile_data["activity_periods"]), personality_keywords=json.loads( profile_data["personality_keywords"] ), last_updated=profile_data["last_updated"], ) self.personality_profiles[profile.user_id] = profile logger.info(f"Loaded {len(self.personality_profiles)} personality profiles") except Exception as e: logger.error(f"Failed to load personality profiles: {e}") async def _extract_keywords(self, text: str) -> List[str]: """Extract keywords from text content""" try: # Simple keyword extraction (can be enhanced with NLP libraries) words = text.lower().split() # Filter common words stop_words = { "the", "a", "an", "and", "or", "but", "in", "on", "at", "to", "for", "of", "with", "by", "from", "as", "is", "was", "are", "were", "be", "been", "being", "have", "has", "had", "do", "does", "did", "will", "would", "could", "should", "may", "might", "must", "can", "this", "that", "these", "those", "i", "you", "he", "she", "it", "we", "they", } keywords = [] for word in words: # Clean word word = "".join(c for c in word if c.isalnum()) if len(word) > 3 and word not in stop_words: keywords.append(word) # Return unique keywords return list(set(keywords))[:10] # Limit to 10 keywords except Exception as e: logger.error(f"Failed to extract keywords: {e}") return [] async def _detect_emotional_tone(self, text: str) -> str: """Detect emotional tone of text""" try: text_lower = text.lower() # Simple rule-based emotion detection positive_words = [ "happy", "joy", "love", "great", "awesome", "amazing", "wonderful", ] negative_words = [ "sad", "angry", "hate", "terrible", "awful", "bad", "annoying", ] humorous_words = [ "funny", "hilarious", "lol", "haha", "joke", "comedy", "laugh", ] sarcastic_words = ["obviously", "totally", "definitely", "sure", "right"] positive_count = sum(1 for word in positive_words if word in text_lower) negative_count = sum(1 for word in negative_words if word in text_lower) humorous_count = sum(1 for word in humorous_words if word in text_lower) sarcastic_count = sum(1 for word in sarcastic_words if word in text_lower) if humorous_count > 0: return "humorous" elif sarcastic_count > 1: return "sarcastic" elif positive_count > negative_count: return "positive" elif negative_count > positive_count: return "negative" else: return "neutral" except Exception as e: logger.error(f"Failed to detect emotional tone: {e}") return "neutral" async def _memory_consolidation_worker(self): """Background worker for memory consolidation and cleanup""" while True: try: logger.info("Starting memory consolidation...") # Clean up old memories cutoff_date = datetime.now(timezone.utc) - timedelta( days=self.memory_retention_days ) # Get old, low-importance memories old_memories = await self.db_manager.execute_query( """ SELECT id FROM memory_entries WHERE created_at < $1 AND importance_score < $2 """, cutoff_date, self.importance_threshold, fetch_all=True, ) if old_memories: memory_ids = [m["id"] for m in old_memories] # Delete from Qdrant if self.qdrant_client and models: await asyncio.get_event_loop().run_in_executor( None, lambda: self.qdrant_client.delete( collection_name=self.collection_name, points_selector=models.PointIdsList(points=memory_ids), ), ) # Delete from PostgreSQL await self.db_manager.execute_query( """ DELETE FROM memory_entries WHERE id = ANY($1) """, memory_ids, ) logger.info(f"Consolidated {len(memory_ids)} old memories") # Sleep for 24 hours await asyncio.sleep(86400) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in memory consolidation worker: {e}") await asyncio.sleep(86400) async def _personality_update_worker(self): """Background worker for updating personality profiles""" while True: try: # Update personality profiles based on recent activity cutoff_date = datetime.now(timezone.utc) - timedelta(hours=6) # Find users with recent activity active_users = await self.db_manager.execute_query( """ SELECT DISTINCT user_id FROM memory_entries WHERE created_at > $1 """, cutoff_date, fetch_all=True, ) for user_data in active_users: user_id = user_data["user_id"] # Get recent memories for this user recent_memories = await self.retrieve_memories( user_id, "personality humor behavior", # General query [MemoryType.PERSONALITY, MemoryType.USER_INTERACTION], limit=20, relevance_threshold=0.3, ) if recent_memories: # Update personality based on recent memories await self._analyze_personality_trends(user_id, recent_memories) # Sleep for 6 hours await asyncio.sleep(21600) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in personality update worker: {e}") await asyncio.sleep(21600) async def _analyze_personality_trends( self, user_id: int, memories: List[MemoryEntry] ): """Analyze personality trends from recent memories""" try: profile = await self.get_personality_profile(user_id) if not profile: return # Analyze humor preference trends humor_scores = [] for memory in memories: if "humor_scores" in memory.metadata: humor_scores.append(memory.metadata["humor_scores"]) if humor_scores: # Calculate trending humor preferences for humor_type in ["funny", "dark", "silly", "suspicious", "asinine"]: scores = [ h.get(humor_type, 0) for h in humor_scores if humor_type in h ] if scores: avg_score = sum(scores) / len(scores) # Update with weighted average (recent activity weighted more) if humor_type in profile.humor_preferences: profile.humor_preferences[humor_type] = ( profile.humor_preferences[humor_type] * 0.7 + avg_score * 0.3 ) else: profile.humor_preferences[humor_type] = avg_score # Update communication style content_analysis = await self._analyze_communication_style(memories) for style, score in content_analysis.items(): if style in profile.communication_style: profile.communication_style[style] = ( profile.communication_style[style] * 0.8 + score * 0.2 ) else: profile.communication_style[style] = score profile.last_updated = datetime.now(timezone.utc) await self._store_personality_profile(profile) except Exception as e: logger.error(f"Failed to analyze personality trends: {e}") async def _analyze_communication_style( self, memories: List[MemoryEntry] ) -> Dict[str, float]: """Analyze communication style from memories""" try: style_scores = { "formal": 0.0, "casual": 0.0, "sarcastic": 0.0, "enthusiastic": 0.0, "direct": 0.0, } total_content = " ".join([m.content for m in memories]) content_lower = total_content.lower() # Simple style detection based on word patterns formal_indicators = [ "please", "thank you", "would you", "could you", "sir", "madam", ] casual_indicators = ["yeah", "yep", "nah", "gonna", "wanna", "kinda"] sarcastic_indicators = ["obviously", "totally", "sure thing", "right"] enthusiastic_indicators = ["!", "awesome", "amazing", "love it", "so good"] direct_indicators = ["no", "yes", "exactly", "wrong", "correct"] formal_count = sum( 1 for indicator in formal_indicators if indicator in content_lower ) casual_count = sum( 1 for indicator in casual_indicators if indicator in content_lower ) sarcastic_count = sum( 1 for indicator in sarcastic_indicators if indicator in content_lower ) enthusiastic_count = sum( 1 for indicator in enthusiastic_indicators if indicator in content_lower ) direct_count = sum( 1 for indicator in direct_indicators if indicator in content_lower ) total_indicators = ( formal_count + casual_count + sarcastic_count + enthusiastic_count + direct_count ) if total_indicators > 0: style_scores["formal"] = formal_count / total_indicators style_scores["casual"] = casual_count / total_indicators style_scores["sarcastic"] = sarcastic_count / total_indicators style_scores["enthusiastic"] = enthusiastic_count / total_indicators style_scores["direct"] = direct_count / total_indicators return style_scores except Exception as e: logger.error(f"Failed to analyze communication style: {e}") return {} async def _context_cleanup_worker(self): """Background worker to clean up inactive conversation contexts""" while True: try: current_time = datetime.now(timezone.utc) inactive_contexts = [] for context_key, context in self.active_conversations.items(): # Remove contexts inactive for more than 30 minutes if current_time - context.last_activity > timedelta(minutes=30): inactive_contexts.append(context_key) for context_key in inactive_contexts: del self.active_conversations[context_key] if inactive_contexts: logger.debug( f"Cleaned up {len(inactive_contexts)} inactive conversation contexts" ) # Sleep for 10 minutes await asyncio.sleep(600) except asyncio.CancelledError: break except Exception as e: logger.error(f"Error in context cleanup worker: {e}") await asyncio.sleep(600) async def get_memory_stats(self) -> Dict[str, Any]: """Get memory system statistics""" try: # Get total memories count total_memories_db = await self.db_manager.execute_query( """ SELECT COUNT(*) as count FROM memory_entries """, fetch_one=True, ) total_memories = total_memories_db["count"] if total_memories_db else 0 # Get memory type distribution type_distribution = await self.db_manager.execute_query( """ SELECT memory_type, COUNT(*) as count FROM memory_entries GROUP BY memory_type """, fetch_all=True, ) type_dist_dict = { row["memory_type"]: row["count"] for row in type_distribution } cache_hit_rate = self.cache_hits / max(self.total_retrievals, 1) return { "total_memories": total_memories, "total_retrievals": self.total_retrievals, "cache_hit_rate": cache_hit_rate, "embedding_generations": self.embedding_generations, "active_conversations": len(self.active_conversations), "personality_profiles": len(self.personality_profiles), "memory_type_distribution": type_dist_dict, } except Exception as e: logger.error(f"Failed to get memory stats: {e}") return {} async def check_health(self) -> Dict[str, Any]: """Check health of memory system""" try: # Test Qdrant connection qdrant_healthy = False if self.qdrant_client: try: await asyncio.get_event_loop().run_in_executor( None, self.qdrant_client.get_collections ) qdrant_healthy = True except Exception: pass return { "initialized": self._initialized, "qdrant_healthy": qdrant_healthy, "total_memories": self.total_memories, "total_retrievals": self.total_retrievals, "personality_profiles": len(self.personality_profiles), } except Exception as e: return {"error": str(e), "healthy": False} async def close(self): """Close memory system""" try: logger.info("Closing memory system...") # Cancel background tasks tasks = [ self._memory_consolidation_task, self._personality_update_task, self._context_cleanup_task, ] for task in tasks: if task: task.cancel() # Wait for tasks to complete await asyncio.gather(*[t for t in tasks if t], return_exceptions=True) # Close Qdrant client if self.qdrant_client: self.qdrant_client.close() # Clear caches self.active_conversations.clear() self.personality_profiles.clear() logger.info("Memory system closed") except Exception as e: logger.error(f"Error closing memory system: {e}")