Files
disbord/core/memory_manager.py
Travis Vasceannie 8f6f958426 chore: remove deprecated files and scripts
- Deleted final.md, fix_async_fixtures.py, fix_cog_tests.py, fix_fixture_scoping.py, and run_race_condition_tests.sh as they are no longer needed.
- Updated main.py to enhance logging and error handling.
- Refactored various components for improved performance and maintainability.
- Introduced new logging utilities for better context and performance monitoring.
- Ensured all datetime operations are timezone-aware and consistent across the codebase.
2025-08-28 00:54:28 -04:00

1473 lines
53 KiB
Python

"""
Memory System for Discord Voice Chat Quote Bot
Implements long-term memory using Qdrant vector database for personality tracking,
conversation context, and intelligent quote analysis with semantic understanding.
"""
import asyncio
import json
import logging
import uuid
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from enum import Enum
from typing import TYPE_CHECKING, Any, Dict, List, Optional
if TYPE_CHECKING:
import numpy as np
else:
try:
import numpy as np
except ImportError:
# Fallback for environments without numpy
np = None
try:
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.models import (Distance, FieldCondition, Filter,
PointStruct, VectorParams)
except ImportError:
# Fallback for environments without qdrant-client
QdrantClient = None
models = None
Distance = None
VectorParams = None
PointStruct = None
Filter = None
FieldCondition = None
from config.settings import Settings
from core.ai_manager import AIProviderManager
from core.database import DatabaseManager
logger = logging.getLogger(__name__)
class MemoryType(Enum):
"""Types of memories stored in the system"""
CONVERSATION = "conversation"
PERSONALITY = "personality"
QUOTE_CONTEXT = "quote_context"
USER_INTERACTION = "user_interaction"
BEHAVIORAL_PATTERN = "behavioral_pattern"
HUMOR_PREFERENCE = "humor_preference"
class RelevanceType(Enum):
"""Types of relevance scoring for memory retrieval"""
SEMANTIC = "semantic"
TEMPORAL = "temporal"
CONTEXTUAL = "contextual"
PERSONALITY = "personality"
@dataclass
class MemoryEntry:
"""Individual memory entry"""
id: str
user_id: int
guild_id: int
memory_type: MemoryType
content: str
embedding: Optional["np.ndarray"]
metadata: Dict[str, Any]
relevance_score: float
created_at: datetime
last_accessed: datetime
access_count: int
importance_score: float
@dataclass
class ConversationContext:
"""Context for ongoing conversations"""
guild_id: int
channel_id: int
participants: List[int]
topic_keywords: List[str]
emotional_tone: str
start_time: datetime
last_activity: datetime
message_count: int
@dataclass
class PersonalityProfile:
"""User personality profile based on conversation history"""
user_id: int
humor_preferences: Dict[str, float] # funny, dark, silly, etc.
communication_style: Dict[str, float] # formal, casual, sarcastic, etc.
interaction_patterns: Dict[str, Any]
topic_interests: List[str]
activity_periods: List[Dict[str, Any]]
personality_keywords: List[str]
last_updated: datetime
class MemoryManager:
"""
Long-term memory system using Qdrant vector database
Features:
- Semantic memory storage and retrieval
- Personality profiling and tracking
- Conversation context management
- Intelligent quote contextualization
- Behavioral pattern recognition
- Memory importance scoring and pruning
- Multi-dimensional similarity search
"""
def __init__(
self,
ai_manager: AIProviderManager,
db_manager: DatabaseManager,
settings: Settings,
):
self.ai_manager = ai_manager
self.db_manager = db_manager
self.settings = settings
# Qdrant client
self.qdrant_client: Optional["QdrantClient"] = None
self.collection_name = "quote_bot_memories"
# Memory configuration
self.embedding_dimension = 384 # Standard sentence transformer dimension
self.max_memories_per_user = 10000 # Memory limit per user
self.memory_retention_days = 90 # Days to retain memories
self.importance_threshold = 0.3 # Minimum importance to retain
# Context tracking
self.active_conversations: Dict[str, ConversationContext] = {}
self.personality_profiles: Dict[int, PersonalityProfile] = {}
# Background tasks
self._memory_consolidation_task: Optional[asyncio.Task[None]] = None
self._personality_update_task: Optional[asyncio.Task[None]] = None
self._context_cleanup_task: Optional[asyncio.Task[None]] = None
# Statistics
self.total_memories = 0
self.total_retrievals = 0
self.cache_hits = 0
self.embedding_generations = 0
self._initialized = False
async def initialize(self):
"""Initialize the memory system"""
if self._initialized:
logger.debug("Memory system already initialized, skipping")
return
try:
logger.info(
"Initializing memory system",
extra={
"component": "memory_manager",
"operation": "initialize",
"collection_name": self.collection_name,
"embedding_dimension": self.embedding_dimension,
"max_memories_per_user": self.max_memories_per_user,
"retention_days": self.memory_retention_days,
"importance_threshold": self.importance_threshold
}
)
# Initialize Qdrant client
await self._initialize_qdrant()
# Load existing personality profiles
await self._load_personality_profiles()
# Start background tasks
self._memory_consolidation_task = asyncio.create_task(
self._memory_consolidation_worker()
)
self._personality_update_task = asyncio.create_task(
self._personality_update_worker()
)
self._context_cleanup_task = asyncio.create_task(
self._context_cleanup_worker()
)
self._initialized = True
logger.info(
"Memory system initialized successfully",
extra={
"component": "memory_manager",
"operation": "initialize",
"status": "success",
"qdrant_connected": self.qdrant_client is not None,
"personality_profiles_loaded": len(self.personality_profiles),
"background_tasks_started": 3
}
)
except Exception as e:
logger.error(
"Failed to initialize memory system",
extra={
"component": "memory_manager",
"operation": "initialize",
"error": str(e),
"error_type": type(e).__name__
},
exc_info=True
)
raise
async def _initialize_qdrant(self):
"""Initialize Qdrant vector database"""
try:
logger.debug(
"Initializing Qdrant connection",
extra={
"component": "memory_manager",
"operation": "_initialize_qdrant",
"collection_name": self.collection_name,
"embedding_dimension": self.embedding_dimension
}
)
if QdrantClient is None:
logger.warning(
"Qdrant client not available - memory system will use basic functionality"
)
return
# Get Qdrant connection settings
qdrant_url = self.settings.qdrant_url or "http://localhost:6333"
qdrant_api_key = self.settings.qdrant_api_key
# Create Qdrant client
if qdrant_api_key:
self.qdrant_client = QdrantClient(
url=qdrant_url, api_key=qdrant_api_key
)
else:
self.qdrant_client = QdrantClient(url=qdrant_url)
# Check if collection exists
if self.qdrant_client is None:
raise ValueError("Qdrant client not initialized")
collections = await asyncio.get_event_loop().run_in_executor(
None, self.qdrant_client.get_collections
)
collection_exists = any(
collection.name == self.collection_name
for collection in collections.collections
)
if not collection_exists:
# Create collection
if self.qdrant_client is None:
raise ValueError("Qdrant client not initialized")
await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.qdrant_client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(
size=self.embedding_dimension, distance=Distance.COSINE
),
),
)
logger.info(f"Created Qdrant collection: {self.collection_name}")
else:
logger.info(f"Using existing Qdrant collection: {self.collection_name}")
except Exception as e:
logger.error(f"Failed to initialize Qdrant: {e}")
self.qdrant_client = None
async def store_memory(
self,
user_id: int,
guild_id: int,
memory_type: MemoryType,
content: str,
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""
Store a new memory entry
Args:
user_id: Discord user ID
guild_id: Discord guild ID
memory_type: Type of memory
content: Memory content
metadata: Additional metadata
Returns:
str: Memory ID
"""
# Generate unique memory ID first
memory_id = str(uuid.uuid4())
try:
if not self._initialized:
await self.initialize()
# Generate embedding for content
embedding = await self._generate_embedding(content)
if embedding is None:
logger.warning(
f"Failed to generate embedding for memory: {content[:50]}..."
)
# Still store the memory without embedding
# Calculate importance score
importance_score = await self._calculate_importance_score(
content, memory_type, metadata or {}
)
# Create memory entry
memory_entry = MemoryEntry(
id=memory_id,
user_id=user_id,
guild_id=guild_id,
memory_type=memory_type,
content=content,
embedding=embedding,
metadata=metadata or {},
relevance_score=1.0, # Initial relevance
created_at=datetime.now(timezone.utc),
last_accessed=datetime.now(timezone.utc),
access_count=0,
importance_score=importance_score,
)
# Store in Qdrant if available and embedding generated
if self.qdrant_client and embedding is not None:
await self._store_in_qdrant(memory_entry)
# Store metadata in PostgreSQL for efficient filtering
await self._store_memory_metadata(memory_entry)
# Update personality profile if relevant
if memory_type in [MemoryType.PERSONALITY, MemoryType.USER_INTERACTION]:
await self._update_personality_profile(user_id, memory_entry)
self.total_memories += 1
logger.debug(f"Stored memory {memory_id} for user {user_id}")
return memory_id
except Exception as e:
logger.error(f"Failed to store memory: {e}")
return memory_id
async def retrieve_memories(
self,
user_id: int,
query: str,
memory_types: Optional[List[MemoryType]] = None,
limit: int = 10,
relevance_threshold: float = 0.7,
) -> List[MemoryEntry]:
"""
Retrieve relevant memories for a user and query
Args:
user_id: Discord user ID
query: Query text for semantic search
memory_types: Optional filter by memory types
limit: Maximum number of memories to return
relevance_threshold: Minimum relevance score
Returns:
List[MemoryEntry]: Retrieved memories
"""
try:
if not self._initialized:
await self.initialize()
# If no Qdrant client or models, fall back to database-only search
if not self.qdrant_client or not models or not Filter or not FieldCondition:
return await self._retrieve_memories_fallback(
user_id, query, memory_types, limit
)
# Generate query embedding
query_embedding = await self._generate_embedding(query)
if query_embedding is None:
logger.warning("Failed to generate query embedding")
return await self._retrieve_memories_fallback(
user_id, query, memory_types, limit
)
# Build filter conditions
filter_conditions = [
FieldCondition(key="user_id", match=models.MatchValue(value=user_id))
]
if memory_types:
type_values = [mt.value for mt in memory_types]
filter_conditions.append(
FieldCondition(
key="memory_type", match=models.MatchAny(any=type_values)
)
)
# Search in Qdrant
search_results = await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.qdrant_client.search(
collection_name=self.collection_name,
query_vector=query_embedding.tolist(),
query_filter=Filter(must=filter_conditions),
limit=limit * 2, # Get more results for filtering
score_threshold=relevance_threshold,
),
)
# Convert results to memory entries
memories = []
for result in search_results:
try:
memory_entry = await self._reconstruct_memory_entry(result)
if memory_entry:
# Update access tracking
memory_entry.last_accessed = datetime.now(timezone.utc)
memory_entry.access_count += 1
memory_entry.relevance_score = result.score
memories.append(memory_entry)
# Update access in database
await self._update_memory_access(memory_entry.id)
except Exception as e:
logger.warning(f"Failed to reconstruct memory entry: {e}")
continue
# Sort by relevance and importance
memories.sort(
key=lambda m: (m.relevance_score * 0.7 + m.importance_score * 0.3),
reverse=True,
)
# Limit results
memories = memories[:limit]
self.total_retrievals += 1
logger.debug(f"Retrieved {len(memories)} memories for user {user_id}")
return memories
except Exception as e:
logger.error(f"Failed to retrieve memories: {e}")
return []
async def _retrieve_memories_fallback(
self,
user_id: int,
query: str,
memory_types: Optional[List[MemoryType]] = None,
limit: int = 10,
) -> List[MemoryEntry]:
"""Fallback method for retrieving memories without vector search"""
try:
# Build SQL query with text search
type_filter = ""
params = [user_id]
if memory_types:
type_values = [mt.value for mt in memory_types]
type_filter = "AND memory_type = ANY($2)"
params.append(type_values)
limit_param = "$3"
else:
limit_param = "$2"
params.append(limit)
# Simple text search query
query_sql = f"""
SELECT * FROM memory_entries
WHERE user_id = $1 {type_filter}
AND (content ILIKE '%' || ${'$3' if memory_types else '$2'} || '%')
ORDER BY importance_score DESC, created_at DESC
LIMIT {limit_param}
"""
if memory_types:
search_params = params[:-1] + [f"%{query}%"] + [params[-1]]
else:
search_params = params[:-1] + [f"%{query}%"] + [params[-1]]
results = await self.db_manager.execute_query(
query_sql, *search_params, fetch_all=True
)
memories = []
for result in results:
memory_entry = MemoryEntry(
id=result["id"],
user_id=result["user_id"],
guild_id=result["guild_id"],
memory_type=MemoryType(result["memory_type"]),
content=result["content"],
embedding=None,
metadata=result.get("metadata", {}),
relevance_score=0.8, # Default relevance for text match
created_at=result["created_at"],
last_accessed=result["last_accessed"],
access_count=result["access_count"],
importance_score=result["importance_score"],
)
memories.append(memory_entry)
return memories
except Exception as e:
logger.error(f"Failed to retrieve memories with fallback: {e}")
return []
async def get_personality_profile(
self, user_id: int
) -> Optional[PersonalityProfile]:
"""Get personality profile for a user"""
try:
if user_id in self.personality_profiles:
return self.personality_profiles[user_id]
# Load from database
profile_data = await self.db_manager.execute_query(
"""
SELECT * FROM personality_profiles WHERE user_id = $1
""",
user_id,
fetch_one=True,
)
if profile_data:
profile = PersonalityProfile(
user_id=user_id,
humor_preferences=json.loads(profile_data["humor_preferences"]),
communication_style=json.loads(profile_data["communication_style"]),
interaction_patterns=json.loads(
profile_data["interaction_patterns"]
),
topic_interests=json.loads(profile_data["topic_interests"]),
activity_periods=json.loads(profile_data["activity_periods"]),
personality_keywords=json.loads(
profile_data["personality_keywords"]
),
last_updated=profile_data["last_updated"],
)
self.personality_profiles[user_id] = profile
return profile
return None
except Exception as e:
logger.error(f"Failed to get personality profile: {e}")
return None
async def update_conversation_context(
self, guild_id: int, channel_id: int, participants: List[int], content: str
):
"""Update ongoing conversation context"""
try:
context_key = f"{guild_id}_{channel_id}"
if context_key not in self.active_conversations:
# Create new conversation context
self.active_conversations[context_key] = ConversationContext(
guild_id=guild_id,
channel_id=channel_id,
participants=participants,
topic_keywords=[],
emotional_tone="neutral",
start_time=datetime.now(timezone.utc),
last_activity=datetime.now(timezone.utc),
message_count=0,
)
context = self.active_conversations[context_key]
context.last_activity = datetime.now(timezone.utc)
context.message_count += 1
# Update participants
for participant in participants:
if participant not in context.participants:
context.participants.append(participant)
# Extract keywords and emotional tone
keywords = await self._extract_keywords(content)
context.topic_keywords.extend(keywords)
# Keep only recent keywords
if len(context.topic_keywords) > 20:
context.topic_keywords = context.topic_keywords[-20:]
# Detect emotional tone
emotional_tone = await self._detect_emotional_tone(content)
if emotional_tone != "neutral":
context.emotional_tone = emotional_tone
except Exception as e:
logger.error(f"Failed to update conversation context: {e}")
async def get_contextual_insights(
self, user_id: int, recent_quotes: List[str]
) -> Dict[str, Any]:
"""Get contextual insights for quote analysis"""
try:
insights = {
"personality_match": 0.0,
"humor_consistency": 0.0,
"interaction_patterns": {},
"topic_relevance": 0.0,
"behavioral_prediction": {},
}
# Get personality profile
profile = await self.get_personality_profile(user_id)
if not profile:
return insights
# Analyze quote consistency with personality
for quote in recent_quotes:
# Get relevant memories
memories = await self.retrieve_memories(
user_id,
quote,
[MemoryType.PERSONALITY, MemoryType.HUMOR_PREFERENCE],
limit=5,
)
if memories:
# Calculate personality match score
personality_scores = []
for memory in memories:
if "humor_score" in memory.metadata:
personality_scores.append(memory.metadata["humor_score"])
if personality_scores and np:
insights["personality_match"] = np.mean(personality_scores)
# Analyze humor consistency
humor_preferences = profile.humor_preferences
if humor_preferences:
consistency_scores = []
for quote in recent_quotes:
# This would analyze quote humor against preferences
# Simplified implementation
consistency_scores.append(0.8) # Placeholder
insights["humor_consistency"] = (
np.mean(consistency_scores) if consistency_scores and np else 0.0
)
# Add interaction patterns
insights["interaction_patterns"] = profile.interaction_patterns
return insights
except Exception as e:
logger.error(f"Failed to get contextual insights: {e}")
return {}
async def _generate_embedding(self, text: str) -> Optional[np.ndarray]:
"""Generate embedding for text content"""
try:
if np is None:
logger.warning("NumPy not available - cannot generate embeddings")
return None
# Use AI manager to generate embeddings
result = await self.ai_manager.generate_embedding(text)
if result:
# Handle different result formats
if hasattr(result, "embedding"):
embedding_data = result.embedding
elif isinstance(result, list):
embedding_data = result
elif isinstance(result, dict) and "embedding" in result:
embedding_data = result["embedding"]
else:
logger.warning(
f"Unexpected embedding result format: {type(result)}"
)
return None
if not np:
logger.warning("NumPy not available, cannot process embeddings")
return None
embedding = np.array(embedding_data, dtype=np.float32)
# Ensure correct dimension
if len(embedding) != self.embedding_dimension:
# Pad or truncate as needed
if len(embedding) < self.embedding_dimension:
padding = np.zeros(self.embedding_dimension - len(embedding))
embedding = np.concatenate([embedding, padding])
else:
embedding = embedding[: self.embedding_dimension]
# Normalize
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
self.embedding_generations += 1
return embedding
return None
except Exception as e:
logger.error(f"Failed to generate embedding: {e}")
return None
async def _calculate_importance_score(
self, content: str, memory_type: MemoryType, metadata: Dict[str, Any]
) -> float:
"""Calculate importance score for memory entry"""
try:
base_scores = {
MemoryType.CONVERSATION: 0.5,
MemoryType.PERSONALITY: 0.8,
MemoryType.QUOTE_CONTEXT: 0.7,
MemoryType.USER_INTERACTION: 0.6,
MemoryType.BEHAVIORAL_PATTERN: 0.9,
MemoryType.HUMOR_PREFERENCE: 0.7,
}
base_score = base_scores.get(memory_type, 0.5)
# Adjust based on content length and quality
content_factor = min(
1.0, len(content.split()) / 10
) # Longer content = more important
# Adjust based on metadata indicators
metadata_factor = 1.0
if "humor_score" in metadata and metadata["humor_score"] > 7:
metadata_factor += 0.2
if "laughter_duration" in metadata and metadata["laughter_duration"] > 2:
metadata_factor += 0.1
if "engagement_level" in metadata:
metadata_factor += metadata["engagement_level"] * 0.1
importance_score = base_score * content_factor * metadata_factor
return min(1.0, importance_score)
except Exception as e:
logger.error(f"Failed to calculate importance score: {e}")
return 0.5
async def _store_in_qdrant(self, memory_entry: MemoryEntry):
"""Store memory entry in Qdrant vector database"""
try:
if (
memory_entry.embedding is None
or not self.qdrant_client
or not PointStruct
):
return
# Prepare payload
payload = {
"user_id": memory_entry.user_id,
"guild_id": memory_entry.guild_id,
"memory_type": memory_entry.memory_type.value,
"content": memory_entry.content,
"metadata": memory_entry.metadata,
"importance_score": memory_entry.importance_score,
"created_at": memory_entry.created_at.isoformat(),
"access_count": memory_entry.access_count,
}
# Create point
point = PointStruct(
id=memory_entry.id,
vector=memory_entry.embedding.tolist(),
payload=payload,
)
# Upsert to Qdrant
await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.qdrant_client.upsert(
collection_name=self.collection_name, points=[point]
),
)
except Exception as e:
logger.error(f"Failed to store in Qdrant: {e}")
async def _store_memory_metadata(self, memory_entry: MemoryEntry):
"""Store memory metadata in PostgreSQL for efficient filtering"""
try:
await self.db_manager.execute_query(
"""
INSERT INTO memory_entries
(id, user_id, guild_id, memory_type, content, metadata,
importance_score, created_at, last_accessed, access_count)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
ON CONFLICT (id) DO UPDATE SET
last_accessed = EXCLUDED.last_accessed,
access_count = EXCLUDED.access_count
""",
memory_entry.id,
memory_entry.user_id,
memory_entry.guild_id,
memory_entry.memory_type.value,
memory_entry.content,
json.dumps(memory_entry.metadata),
memory_entry.importance_score,
memory_entry.created_at,
memory_entry.last_accessed,
memory_entry.access_count,
)
except Exception as e:
logger.error(f"Failed to store memory metadata: {e}")
async def _reconstruct_memory_entry(self, qdrant_result) -> Optional[MemoryEntry]:
"""Reconstruct memory entry from Qdrant search result"""
try:
payload = qdrant_result.payload
memory_entry = MemoryEntry(
id=qdrant_result.id,
user_id=payload["user_id"],
guild_id=payload["guild_id"],
memory_type=MemoryType(payload["memory_type"]),
content=payload["content"],
embedding=None, # Not needed for retrieval
metadata=payload.get("metadata", {}),
relevance_score=qdrant_result.score,
created_at=datetime.fromisoformat(payload["created_at"]),
last_accessed=datetime.now(timezone.utc),
access_count=payload.get("access_count", 0),
importance_score=payload.get("importance_score", 0.5),
)
return memory_entry
except Exception as e:
logger.error(f"Failed to reconstruct memory entry: {e}")
return None
async def _update_memory_access(self, memory_id: str):
"""Update memory access tracking"""
try:
await self.db_manager.execute_query(
"""
UPDATE memory_entries
SET last_accessed = NOW(), access_count = access_count + 1
WHERE id = $1
""",
memory_id,
)
except Exception as e:
logger.error(f"Failed to update memory access: {e}")
async def _update_personality_profile(
self, user_id: int, memory_entry: MemoryEntry
):
"""Update personality profile based on new memory"""
try:
profile = await self.get_personality_profile(user_id)
if not profile:
# Create new profile
profile = PersonalityProfile(
user_id=user_id,
humor_preferences={},
communication_style={},
interaction_patterns={},
topic_interests=[],
activity_periods=[],
personality_keywords=[],
last_updated=datetime.now(timezone.utc),
)
# Update humor preferences if available
if "humor_scores" in memory_entry.metadata:
humor_scores = memory_entry.metadata["humor_scores"]
for humor_type, score in humor_scores.items():
if humor_type in profile.humor_preferences:
# Average with existing preference
profile.humor_preferences[humor_type] = (
profile.humor_preferences[humor_type] + score
) / 2
else:
profile.humor_preferences[humor_type] = score
# Extract keywords from content
keywords = await self._extract_keywords(memory_entry.content)
profile.personality_keywords.extend(keywords)
# Keep only recent keywords
if len(profile.personality_keywords) > 50:
profile.personality_keywords = profile.personality_keywords[-50:]
# Update activity pattern
current_hour = memory_entry.created_at.hour
activity_pattern = {
"hour": current_hour,
"day_of_week": memory_entry.created_at.weekday(),
"activity_type": memory_entry.memory_type.value,
}
profile.activity_periods.append(activity_pattern)
# Keep only recent activity
if len(profile.activity_periods) > 100:
profile.activity_periods = profile.activity_periods[-100:]
profile.last_updated = datetime.now(timezone.utc)
# Store updated profile
await self._store_personality_profile(profile)
self.personality_profiles[user_id] = profile
except Exception as e:
logger.error(f"Failed to update personality profile: {e}")
async def _store_personality_profile(self, profile: PersonalityProfile):
"""Store personality profile in database"""
try:
await self.db_manager.execute_query(
"""
INSERT INTO personality_profiles
(user_id, humor_preferences, communication_style, interaction_patterns,
topic_interests, activity_periods, personality_keywords, last_updated)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (user_id) DO UPDATE SET
humor_preferences = EXCLUDED.humor_preferences,
communication_style = EXCLUDED.communication_style,
interaction_patterns = EXCLUDED.interaction_patterns,
topic_interests = EXCLUDED.topic_interests,
activity_periods = EXCLUDED.activity_periods,
personality_keywords = EXCLUDED.personality_keywords,
last_updated = EXCLUDED.last_updated
""",
profile.user_id,
json.dumps(profile.humor_preferences),
json.dumps(profile.communication_style),
json.dumps(profile.interaction_patterns),
json.dumps(profile.topic_interests),
json.dumps(profile.activity_periods),
json.dumps(profile.personality_keywords),
profile.last_updated,
)
except Exception as e:
logger.error(f"Failed to store personality profile: {e}")
async def _load_personality_profiles(self):
"""Load existing personality profiles from database"""
try:
profiles_data = await self.db_manager.execute_query(
"""
SELECT * FROM personality_profiles
""",
fetch_all=True,
)
for profile_data in profiles_data:
profile = PersonalityProfile(
user_id=profile_data["user_id"],
humor_preferences=json.loads(profile_data["humor_preferences"]),
communication_style=json.loads(profile_data["communication_style"]),
interaction_patterns=json.loads(
profile_data["interaction_patterns"]
),
topic_interests=json.loads(profile_data["topic_interests"]),
activity_periods=json.loads(profile_data["activity_periods"]),
personality_keywords=json.loads(
profile_data["personality_keywords"]
),
last_updated=profile_data["last_updated"],
)
self.personality_profiles[profile.user_id] = profile
logger.info(f"Loaded {len(self.personality_profiles)} personality profiles")
except Exception as e:
logger.error(f"Failed to load personality profiles: {e}")
async def _extract_keywords(self, text: str) -> List[str]:
"""Extract keywords from text content"""
try:
# Simple keyword extraction (can be enhanced with NLP libraries)
words = text.lower().split()
# Filter common words
stop_words = {
"the",
"a",
"an",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"for",
"of",
"with",
"by",
"from",
"as",
"is",
"was",
"are",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"could",
"should",
"may",
"might",
"must",
"can",
"this",
"that",
"these",
"those",
"i",
"you",
"he",
"she",
"it",
"we",
"they",
}
keywords = []
for word in words:
# Clean word
word = "".join(c for c in word if c.isalnum())
if len(word) > 3 and word not in stop_words:
keywords.append(word)
# Return unique keywords
return list(set(keywords))[:10] # Limit to 10 keywords
except Exception as e:
logger.error(f"Failed to extract keywords: {e}")
return []
async def _detect_emotional_tone(self, text: str) -> str:
"""Detect emotional tone of text"""
try:
text_lower = text.lower()
# Simple rule-based emotion detection
positive_words = [
"happy",
"joy",
"love",
"great",
"awesome",
"amazing",
"wonderful",
]
negative_words = [
"sad",
"angry",
"hate",
"terrible",
"awful",
"bad",
"annoying",
]
humorous_words = [
"funny",
"hilarious",
"lol",
"haha",
"joke",
"comedy",
"laugh",
]
sarcastic_words = ["obviously", "totally", "definitely", "sure", "right"]
positive_count = sum(1 for word in positive_words if word in text_lower)
negative_count = sum(1 for word in negative_words if word in text_lower)
humorous_count = sum(1 for word in humorous_words if word in text_lower)
sarcastic_count = sum(1 for word in sarcastic_words if word in text_lower)
if humorous_count > 0:
return "humorous"
elif sarcastic_count > 1:
return "sarcastic"
elif positive_count > negative_count:
return "positive"
elif negative_count > positive_count:
return "negative"
else:
return "neutral"
except Exception as e:
logger.error(f"Failed to detect emotional tone: {e}")
return "neutral"
async def _memory_consolidation_worker(self):
"""Background worker for memory consolidation and cleanup"""
while True:
try:
logger.info("Starting memory consolidation...")
# Clean up old memories
cutoff_date = datetime.now(timezone.utc) - timedelta(
days=self.memory_retention_days
)
# Get old, low-importance memories
old_memories = await self.db_manager.execute_query(
"""
SELECT id FROM memory_entries
WHERE created_at < $1 AND importance_score < $2
""",
cutoff_date,
self.importance_threshold,
fetch_all=True,
)
if old_memories:
memory_ids = [m["id"] for m in old_memories]
# Delete from Qdrant
if self.qdrant_client and models:
await asyncio.get_event_loop().run_in_executor(
None,
lambda: self.qdrant_client.delete(
collection_name=self.collection_name,
points_selector=models.PointIdsList(points=memory_ids),
),
)
# Delete from PostgreSQL
await self.db_manager.execute_query(
"""
DELETE FROM memory_entries WHERE id = ANY($1)
""",
memory_ids,
)
logger.info(f"Consolidated {len(memory_ids)} old memories")
# Sleep for 24 hours
await asyncio.sleep(86400)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in memory consolidation worker: {e}")
await asyncio.sleep(86400)
async def _personality_update_worker(self):
"""Background worker for updating personality profiles"""
while True:
try:
# Update personality profiles based on recent activity
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=6)
# Find users with recent activity
active_users = await self.db_manager.execute_query(
"""
SELECT DISTINCT user_id FROM memory_entries
WHERE created_at > $1
""",
cutoff_date,
fetch_all=True,
)
for user_data in active_users:
user_id = user_data["user_id"]
# Get recent memories for this user
recent_memories = await self.retrieve_memories(
user_id,
"personality humor behavior", # General query
[MemoryType.PERSONALITY, MemoryType.USER_INTERACTION],
limit=20,
relevance_threshold=0.3,
)
if recent_memories:
# Update personality based on recent memories
await self._analyze_personality_trends(user_id, recent_memories)
# Sleep for 6 hours
await asyncio.sleep(21600)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in personality update worker: {e}")
await asyncio.sleep(21600)
async def _analyze_personality_trends(
self, user_id: int, memories: List[MemoryEntry]
):
"""Analyze personality trends from recent memories"""
try:
profile = await self.get_personality_profile(user_id)
if not profile:
return
# Analyze humor preference trends
humor_scores = []
for memory in memories:
if "humor_scores" in memory.metadata:
humor_scores.append(memory.metadata["humor_scores"])
if humor_scores:
# Calculate trending humor preferences
for humor_type in ["funny", "dark", "silly", "suspicious", "asinine"]:
scores = [
h.get(humor_type, 0) for h in humor_scores if humor_type in h
]
if scores:
avg_score = sum(scores) / len(scores)
# Update with weighted average (recent activity weighted more)
if humor_type in profile.humor_preferences:
profile.humor_preferences[humor_type] = (
profile.humor_preferences[humor_type] * 0.7
+ avg_score * 0.3
)
else:
profile.humor_preferences[humor_type] = avg_score
# Update communication style
content_analysis = await self._analyze_communication_style(memories)
for style, score in content_analysis.items():
if style in profile.communication_style:
profile.communication_style[style] = (
profile.communication_style[style] * 0.8 + score * 0.2
)
else:
profile.communication_style[style] = score
profile.last_updated = datetime.now(timezone.utc)
await self._store_personality_profile(profile)
except Exception as e:
logger.error(f"Failed to analyze personality trends: {e}")
async def _analyze_communication_style(
self, memories: List[MemoryEntry]
) -> Dict[str, float]:
"""Analyze communication style from memories"""
try:
style_scores = {
"formal": 0.0,
"casual": 0.0,
"sarcastic": 0.0,
"enthusiastic": 0.0,
"direct": 0.0,
}
total_content = " ".join([m.content for m in memories])
content_lower = total_content.lower()
# Simple style detection based on word patterns
formal_indicators = [
"please",
"thank you",
"would you",
"could you",
"sir",
"madam",
]
casual_indicators = ["yeah", "yep", "nah", "gonna", "wanna", "kinda"]
sarcastic_indicators = ["obviously", "totally", "sure thing", "right"]
enthusiastic_indicators = ["!", "awesome", "amazing", "love it", "so good"]
direct_indicators = ["no", "yes", "exactly", "wrong", "correct"]
formal_count = sum(
1 for indicator in formal_indicators if indicator in content_lower
)
casual_count = sum(
1 for indicator in casual_indicators if indicator in content_lower
)
sarcastic_count = sum(
1 for indicator in sarcastic_indicators if indicator in content_lower
)
enthusiastic_count = sum(
1 for indicator in enthusiastic_indicators if indicator in content_lower
)
direct_count = sum(
1 for indicator in direct_indicators if indicator in content_lower
)
total_indicators = (
formal_count
+ casual_count
+ sarcastic_count
+ enthusiastic_count
+ direct_count
)
if total_indicators > 0:
style_scores["formal"] = formal_count / total_indicators
style_scores["casual"] = casual_count / total_indicators
style_scores["sarcastic"] = sarcastic_count / total_indicators
style_scores["enthusiastic"] = enthusiastic_count / total_indicators
style_scores["direct"] = direct_count / total_indicators
return style_scores
except Exception as e:
logger.error(f"Failed to analyze communication style: {e}")
return {}
async def _context_cleanup_worker(self):
"""Background worker to clean up inactive conversation contexts"""
while True:
try:
current_time = datetime.now(timezone.utc)
inactive_contexts = []
for context_key, context in self.active_conversations.items():
# Remove contexts inactive for more than 30 minutes
if current_time - context.last_activity > timedelta(minutes=30):
inactive_contexts.append(context_key)
for context_key in inactive_contexts:
del self.active_conversations[context_key]
if inactive_contexts:
logger.debug(
f"Cleaned up {len(inactive_contexts)} inactive conversation contexts"
)
# Sleep for 10 minutes
await asyncio.sleep(600)
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"Error in context cleanup worker: {e}")
await asyncio.sleep(600)
async def get_memory_stats(self) -> Dict[str, Any]:
"""Get memory system statistics"""
try:
# Get total memories count
total_memories_db = await self.db_manager.execute_query(
"""
SELECT COUNT(*) as count FROM memory_entries
""",
fetch_one=True,
)
total_memories = total_memories_db["count"] if total_memories_db else 0
# Get memory type distribution
type_distribution = await self.db_manager.execute_query(
"""
SELECT memory_type, COUNT(*) as count
FROM memory_entries
GROUP BY memory_type
""",
fetch_all=True,
)
type_dist_dict = {
row["memory_type"]: row["count"] for row in type_distribution
}
cache_hit_rate = self.cache_hits / max(self.total_retrievals, 1)
return {
"total_memories": total_memories,
"total_retrievals": self.total_retrievals,
"cache_hit_rate": cache_hit_rate,
"embedding_generations": self.embedding_generations,
"active_conversations": len(self.active_conversations),
"personality_profiles": len(self.personality_profiles),
"memory_type_distribution": type_dist_dict,
}
except Exception as e:
logger.error(f"Failed to get memory stats: {e}")
return {}
async def check_health(self) -> Dict[str, Any]:
"""Check health of memory system"""
try:
# Test Qdrant connection
qdrant_healthy = False
if self.qdrant_client:
try:
await asyncio.get_event_loop().run_in_executor(
None, self.qdrant_client.get_collections
)
qdrant_healthy = True
except Exception:
pass
return {
"initialized": self._initialized,
"qdrant_healthy": qdrant_healthy,
"total_memories": self.total_memories,
"total_retrievals": self.total_retrievals,
"personality_profiles": len(self.personality_profiles),
}
except Exception as e:
return {"error": str(e), "healthy": False}
async def close(self):
"""Close memory system"""
try:
logger.info("Closing memory system...")
# Cancel background tasks
tasks = [
self._memory_consolidation_task,
self._personality_update_task,
self._context_cleanup_task,
]
for task in tasks:
if task:
task.cancel()
# Wait for tasks to complete
await asyncio.gather(*[t for t in tasks if t], return_exceptions=True)
# Close Qdrant client
if self.qdrant_client:
self.qdrant_client.close()
# Clear caches
self.active_conversations.clear()
self.personality_profiles.clear()
logger.info("Memory system closed")
except Exception as e:
logger.error(f"Error closing memory system: {e}")