- Deleted final.md, fix_async_fixtures.py, fix_cog_tests.py, fix_fixture_scoping.py, and run_race_condition_tests.sh as they are no longer needed. - Updated main.py to enhance logging and error handling. - Refactored various components for improved performance and maintainability. - Introduced new logging utilities for better context and performance monitoring. - Ensured all datetime operations are timezone-aware and consistent across the codebase.
1473 lines
53 KiB
Python
1473 lines
53 KiB
Python
"""
|
|
Memory System for Discord Voice Chat Quote Bot
|
|
|
|
Implements long-term memory using Qdrant vector database for personality tracking,
|
|
conversation context, and intelligent quote analysis with semantic understanding.
|
|
"""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import uuid
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timedelta, timezone
|
|
from enum import Enum
|
|
from typing import TYPE_CHECKING, Any, Dict, List, Optional
|
|
|
|
if TYPE_CHECKING:
|
|
import numpy as np
|
|
else:
|
|
try:
|
|
import numpy as np
|
|
except ImportError:
|
|
# Fallback for environments without numpy
|
|
np = None
|
|
|
|
try:
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import models
|
|
from qdrant_client.http.models import (Distance, FieldCondition, Filter,
|
|
PointStruct, VectorParams)
|
|
except ImportError:
|
|
# Fallback for environments without qdrant-client
|
|
QdrantClient = None
|
|
models = None
|
|
Distance = None
|
|
VectorParams = None
|
|
PointStruct = None
|
|
Filter = None
|
|
FieldCondition = None
|
|
|
|
from config.settings import Settings
|
|
from core.ai_manager import AIProviderManager
|
|
from core.database import DatabaseManager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MemoryType(Enum):
|
|
"""Types of memories stored in the system"""
|
|
|
|
CONVERSATION = "conversation"
|
|
PERSONALITY = "personality"
|
|
QUOTE_CONTEXT = "quote_context"
|
|
USER_INTERACTION = "user_interaction"
|
|
BEHAVIORAL_PATTERN = "behavioral_pattern"
|
|
HUMOR_PREFERENCE = "humor_preference"
|
|
|
|
|
|
class RelevanceType(Enum):
|
|
"""Types of relevance scoring for memory retrieval"""
|
|
|
|
SEMANTIC = "semantic"
|
|
TEMPORAL = "temporal"
|
|
CONTEXTUAL = "contextual"
|
|
PERSONALITY = "personality"
|
|
|
|
|
|
@dataclass
|
|
class MemoryEntry:
|
|
"""Individual memory entry"""
|
|
|
|
id: str
|
|
user_id: int
|
|
guild_id: int
|
|
memory_type: MemoryType
|
|
content: str
|
|
embedding: Optional["np.ndarray"]
|
|
metadata: Dict[str, Any]
|
|
relevance_score: float
|
|
created_at: datetime
|
|
last_accessed: datetime
|
|
access_count: int
|
|
importance_score: float
|
|
|
|
|
|
@dataclass
|
|
class ConversationContext:
|
|
"""Context for ongoing conversations"""
|
|
|
|
guild_id: int
|
|
channel_id: int
|
|
participants: List[int]
|
|
topic_keywords: List[str]
|
|
emotional_tone: str
|
|
start_time: datetime
|
|
last_activity: datetime
|
|
message_count: int
|
|
|
|
|
|
@dataclass
|
|
class PersonalityProfile:
|
|
"""User personality profile based on conversation history"""
|
|
|
|
user_id: int
|
|
humor_preferences: Dict[str, float] # funny, dark, silly, etc.
|
|
communication_style: Dict[str, float] # formal, casual, sarcastic, etc.
|
|
interaction_patterns: Dict[str, Any]
|
|
topic_interests: List[str]
|
|
activity_periods: List[Dict[str, Any]]
|
|
personality_keywords: List[str]
|
|
last_updated: datetime
|
|
|
|
|
|
class MemoryManager:
|
|
"""
|
|
Long-term memory system using Qdrant vector database
|
|
|
|
Features:
|
|
- Semantic memory storage and retrieval
|
|
- Personality profiling and tracking
|
|
- Conversation context management
|
|
- Intelligent quote contextualization
|
|
- Behavioral pattern recognition
|
|
- Memory importance scoring and pruning
|
|
- Multi-dimensional similarity search
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
ai_manager: AIProviderManager,
|
|
db_manager: DatabaseManager,
|
|
settings: Settings,
|
|
):
|
|
self.ai_manager = ai_manager
|
|
self.db_manager = db_manager
|
|
self.settings = settings
|
|
|
|
# Qdrant client
|
|
self.qdrant_client: Optional["QdrantClient"] = None
|
|
self.collection_name = "quote_bot_memories"
|
|
|
|
# Memory configuration
|
|
self.embedding_dimension = 384 # Standard sentence transformer dimension
|
|
self.max_memories_per_user = 10000 # Memory limit per user
|
|
self.memory_retention_days = 90 # Days to retain memories
|
|
self.importance_threshold = 0.3 # Minimum importance to retain
|
|
|
|
# Context tracking
|
|
self.active_conversations: Dict[str, ConversationContext] = {}
|
|
self.personality_profiles: Dict[int, PersonalityProfile] = {}
|
|
|
|
# Background tasks
|
|
self._memory_consolidation_task: Optional[asyncio.Task[None]] = None
|
|
self._personality_update_task: Optional[asyncio.Task[None]] = None
|
|
self._context_cleanup_task: Optional[asyncio.Task[None]] = None
|
|
|
|
# Statistics
|
|
self.total_memories = 0
|
|
self.total_retrievals = 0
|
|
self.cache_hits = 0
|
|
self.embedding_generations = 0
|
|
|
|
self._initialized = False
|
|
|
|
async def initialize(self):
|
|
"""Initialize the memory system"""
|
|
if self._initialized:
|
|
logger.debug("Memory system already initialized, skipping")
|
|
return
|
|
|
|
try:
|
|
logger.info(
|
|
"Initializing memory system",
|
|
extra={
|
|
"component": "memory_manager",
|
|
"operation": "initialize",
|
|
"collection_name": self.collection_name,
|
|
"embedding_dimension": self.embedding_dimension,
|
|
"max_memories_per_user": self.max_memories_per_user,
|
|
"retention_days": self.memory_retention_days,
|
|
"importance_threshold": self.importance_threshold
|
|
}
|
|
)
|
|
|
|
# Initialize Qdrant client
|
|
await self._initialize_qdrant()
|
|
|
|
# Load existing personality profiles
|
|
await self._load_personality_profiles()
|
|
|
|
# Start background tasks
|
|
self._memory_consolidation_task = asyncio.create_task(
|
|
self._memory_consolidation_worker()
|
|
)
|
|
self._personality_update_task = asyncio.create_task(
|
|
self._personality_update_worker()
|
|
)
|
|
self._context_cleanup_task = asyncio.create_task(
|
|
self._context_cleanup_worker()
|
|
)
|
|
|
|
self._initialized = True
|
|
logger.info(
|
|
"Memory system initialized successfully",
|
|
extra={
|
|
"component": "memory_manager",
|
|
"operation": "initialize",
|
|
"status": "success",
|
|
"qdrant_connected": self.qdrant_client is not None,
|
|
"personality_profiles_loaded": len(self.personality_profiles),
|
|
"background_tasks_started": 3
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(
|
|
"Failed to initialize memory system",
|
|
extra={
|
|
"component": "memory_manager",
|
|
"operation": "initialize",
|
|
"error": str(e),
|
|
"error_type": type(e).__name__
|
|
},
|
|
exc_info=True
|
|
)
|
|
raise
|
|
|
|
async def _initialize_qdrant(self):
|
|
"""Initialize Qdrant vector database"""
|
|
try:
|
|
logger.debug(
|
|
"Initializing Qdrant connection",
|
|
extra={
|
|
"component": "memory_manager",
|
|
"operation": "_initialize_qdrant",
|
|
"collection_name": self.collection_name,
|
|
"embedding_dimension": self.embedding_dimension
|
|
}
|
|
)
|
|
if QdrantClient is None:
|
|
logger.warning(
|
|
"Qdrant client not available - memory system will use basic functionality"
|
|
)
|
|
return
|
|
|
|
# Get Qdrant connection settings
|
|
qdrant_url = self.settings.qdrant_url or "http://localhost:6333"
|
|
qdrant_api_key = self.settings.qdrant_api_key
|
|
|
|
# Create Qdrant client
|
|
if qdrant_api_key:
|
|
self.qdrant_client = QdrantClient(
|
|
url=qdrant_url, api_key=qdrant_api_key
|
|
)
|
|
else:
|
|
self.qdrant_client = QdrantClient(url=qdrant_url)
|
|
|
|
# Check if collection exists
|
|
if self.qdrant_client is None:
|
|
raise ValueError("Qdrant client not initialized")
|
|
collections = await asyncio.get_event_loop().run_in_executor(
|
|
None, self.qdrant_client.get_collections
|
|
)
|
|
|
|
collection_exists = any(
|
|
collection.name == self.collection_name
|
|
for collection in collections.collections
|
|
)
|
|
|
|
if not collection_exists:
|
|
# Create collection
|
|
if self.qdrant_client is None:
|
|
raise ValueError("Qdrant client not initialized")
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
lambda: self.qdrant_client.create_collection(
|
|
collection_name=self.collection_name,
|
|
vectors_config=VectorParams(
|
|
size=self.embedding_dimension, distance=Distance.COSINE
|
|
),
|
|
),
|
|
)
|
|
logger.info(f"Created Qdrant collection: {self.collection_name}")
|
|
else:
|
|
logger.info(f"Using existing Qdrant collection: {self.collection_name}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize Qdrant: {e}")
|
|
self.qdrant_client = None
|
|
|
|
async def store_memory(
|
|
self,
|
|
user_id: int,
|
|
guild_id: int,
|
|
memory_type: MemoryType,
|
|
content: str,
|
|
metadata: Optional[Dict[str, Any]] = None,
|
|
) -> str:
|
|
"""
|
|
Store a new memory entry
|
|
|
|
Args:
|
|
user_id: Discord user ID
|
|
guild_id: Discord guild ID
|
|
memory_type: Type of memory
|
|
content: Memory content
|
|
metadata: Additional metadata
|
|
|
|
Returns:
|
|
str: Memory ID
|
|
"""
|
|
# Generate unique memory ID first
|
|
memory_id = str(uuid.uuid4())
|
|
|
|
try:
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
# Generate embedding for content
|
|
embedding = await self._generate_embedding(content)
|
|
if embedding is None:
|
|
logger.warning(
|
|
f"Failed to generate embedding for memory: {content[:50]}..."
|
|
)
|
|
# Still store the memory without embedding
|
|
|
|
# Calculate importance score
|
|
importance_score = await self._calculate_importance_score(
|
|
content, memory_type, metadata or {}
|
|
)
|
|
|
|
# Create memory entry
|
|
memory_entry = MemoryEntry(
|
|
id=memory_id,
|
|
user_id=user_id,
|
|
guild_id=guild_id,
|
|
memory_type=memory_type,
|
|
content=content,
|
|
embedding=embedding,
|
|
metadata=metadata or {},
|
|
relevance_score=1.0, # Initial relevance
|
|
created_at=datetime.now(timezone.utc),
|
|
last_accessed=datetime.now(timezone.utc),
|
|
access_count=0,
|
|
importance_score=importance_score,
|
|
)
|
|
|
|
# Store in Qdrant if available and embedding generated
|
|
if self.qdrant_client and embedding is not None:
|
|
await self._store_in_qdrant(memory_entry)
|
|
|
|
# Store metadata in PostgreSQL for efficient filtering
|
|
await self._store_memory_metadata(memory_entry)
|
|
|
|
# Update personality profile if relevant
|
|
if memory_type in [MemoryType.PERSONALITY, MemoryType.USER_INTERACTION]:
|
|
await self._update_personality_profile(user_id, memory_entry)
|
|
|
|
self.total_memories += 1
|
|
|
|
logger.debug(f"Stored memory {memory_id} for user {user_id}")
|
|
return memory_id
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store memory: {e}")
|
|
return memory_id
|
|
|
|
async def retrieve_memories(
|
|
self,
|
|
user_id: int,
|
|
query: str,
|
|
memory_types: Optional[List[MemoryType]] = None,
|
|
limit: int = 10,
|
|
relevance_threshold: float = 0.7,
|
|
) -> List[MemoryEntry]:
|
|
"""
|
|
Retrieve relevant memories for a user and query
|
|
|
|
Args:
|
|
user_id: Discord user ID
|
|
query: Query text for semantic search
|
|
memory_types: Optional filter by memory types
|
|
limit: Maximum number of memories to return
|
|
relevance_threshold: Minimum relevance score
|
|
|
|
Returns:
|
|
List[MemoryEntry]: Retrieved memories
|
|
"""
|
|
try:
|
|
if not self._initialized:
|
|
await self.initialize()
|
|
|
|
# If no Qdrant client or models, fall back to database-only search
|
|
if not self.qdrant_client or not models or not Filter or not FieldCondition:
|
|
return await self._retrieve_memories_fallback(
|
|
user_id, query, memory_types, limit
|
|
)
|
|
|
|
# Generate query embedding
|
|
query_embedding = await self._generate_embedding(query)
|
|
if query_embedding is None:
|
|
logger.warning("Failed to generate query embedding")
|
|
return await self._retrieve_memories_fallback(
|
|
user_id, query, memory_types, limit
|
|
)
|
|
|
|
# Build filter conditions
|
|
filter_conditions = [
|
|
FieldCondition(key="user_id", match=models.MatchValue(value=user_id))
|
|
]
|
|
|
|
if memory_types:
|
|
type_values = [mt.value for mt in memory_types]
|
|
filter_conditions.append(
|
|
FieldCondition(
|
|
key="memory_type", match=models.MatchAny(any=type_values)
|
|
)
|
|
)
|
|
|
|
# Search in Qdrant
|
|
search_results = await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
lambda: self.qdrant_client.search(
|
|
collection_name=self.collection_name,
|
|
query_vector=query_embedding.tolist(),
|
|
query_filter=Filter(must=filter_conditions),
|
|
limit=limit * 2, # Get more results for filtering
|
|
score_threshold=relevance_threshold,
|
|
),
|
|
)
|
|
|
|
# Convert results to memory entries
|
|
memories = []
|
|
for result in search_results:
|
|
try:
|
|
memory_entry = await self._reconstruct_memory_entry(result)
|
|
if memory_entry:
|
|
# Update access tracking
|
|
memory_entry.last_accessed = datetime.now(timezone.utc)
|
|
memory_entry.access_count += 1
|
|
memory_entry.relevance_score = result.score
|
|
|
|
memories.append(memory_entry)
|
|
|
|
# Update access in database
|
|
await self._update_memory_access(memory_entry.id)
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Failed to reconstruct memory entry: {e}")
|
|
continue
|
|
|
|
# Sort by relevance and importance
|
|
memories.sort(
|
|
key=lambda m: (m.relevance_score * 0.7 + m.importance_score * 0.3),
|
|
reverse=True,
|
|
)
|
|
|
|
# Limit results
|
|
memories = memories[:limit]
|
|
|
|
self.total_retrievals += 1
|
|
|
|
logger.debug(f"Retrieved {len(memories)} memories for user {user_id}")
|
|
return memories
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve memories: {e}")
|
|
return []
|
|
|
|
async def _retrieve_memories_fallback(
|
|
self,
|
|
user_id: int,
|
|
query: str,
|
|
memory_types: Optional[List[MemoryType]] = None,
|
|
limit: int = 10,
|
|
) -> List[MemoryEntry]:
|
|
"""Fallback method for retrieving memories without vector search"""
|
|
try:
|
|
# Build SQL query with text search
|
|
type_filter = ""
|
|
params = [user_id]
|
|
|
|
if memory_types:
|
|
type_values = [mt.value for mt in memory_types]
|
|
type_filter = "AND memory_type = ANY($2)"
|
|
params.append(type_values)
|
|
limit_param = "$3"
|
|
else:
|
|
limit_param = "$2"
|
|
|
|
params.append(limit)
|
|
|
|
# Simple text search query
|
|
query_sql = f"""
|
|
SELECT * FROM memory_entries
|
|
WHERE user_id = $1 {type_filter}
|
|
AND (content ILIKE '%' || ${'$3' if memory_types else '$2'} || '%')
|
|
ORDER BY importance_score DESC, created_at DESC
|
|
LIMIT {limit_param}
|
|
"""
|
|
|
|
if memory_types:
|
|
search_params = params[:-1] + [f"%{query}%"] + [params[-1]]
|
|
else:
|
|
search_params = params[:-1] + [f"%{query}%"] + [params[-1]]
|
|
|
|
results = await self.db_manager.execute_query(
|
|
query_sql, *search_params, fetch_all=True
|
|
)
|
|
|
|
memories = []
|
|
for result in results:
|
|
memory_entry = MemoryEntry(
|
|
id=result["id"],
|
|
user_id=result["user_id"],
|
|
guild_id=result["guild_id"],
|
|
memory_type=MemoryType(result["memory_type"]),
|
|
content=result["content"],
|
|
embedding=None,
|
|
metadata=result.get("metadata", {}),
|
|
relevance_score=0.8, # Default relevance for text match
|
|
created_at=result["created_at"],
|
|
last_accessed=result["last_accessed"],
|
|
access_count=result["access_count"],
|
|
importance_score=result["importance_score"],
|
|
)
|
|
memories.append(memory_entry)
|
|
|
|
return memories
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to retrieve memories with fallback: {e}")
|
|
return []
|
|
|
|
async def get_personality_profile(
|
|
self, user_id: int
|
|
) -> Optional[PersonalityProfile]:
|
|
"""Get personality profile for a user"""
|
|
try:
|
|
if user_id in self.personality_profiles:
|
|
return self.personality_profiles[user_id]
|
|
|
|
# Load from database
|
|
profile_data = await self.db_manager.execute_query(
|
|
"""
|
|
SELECT * FROM personality_profiles WHERE user_id = $1
|
|
""",
|
|
user_id,
|
|
fetch_one=True,
|
|
)
|
|
|
|
if profile_data:
|
|
profile = PersonalityProfile(
|
|
user_id=user_id,
|
|
humor_preferences=json.loads(profile_data["humor_preferences"]),
|
|
communication_style=json.loads(profile_data["communication_style"]),
|
|
interaction_patterns=json.loads(
|
|
profile_data["interaction_patterns"]
|
|
),
|
|
topic_interests=json.loads(profile_data["topic_interests"]),
|
|
activity_periods=json.loads(profile_data["activity_periods"]),
|
|
personality_keywords=json.loads(
|
|
profile_data["personality_keywords"]
|
|
),
|
|
last_updated=profile_data["last_updated"],
|
|
)
|
|
|
|
self.personality_profiles[user_id] = profile
|
|
return profile
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get personality profile: {e}")
|
|
return None
|
|
|
|
async def update_conversation_context(
|
|
self, guild_id: int, channel_id: int, participants: List[int], content: str
|
|
):
|
|
"""Update ongoing conversation context"""
|
|
try:
|
|
context_key = f"{guild_id}_{channel_id}"
|
|
|
|
if context_key not in self.active_conversations:
|
|
# Create new conversation context
|
|
self.active_conversations[context_key] = ConversationContext(
|
|
guild_id=guild_id,
|
|
channel_id=channel_id,
|
|
participants=participants,
|
|
topic_keywords=[],
|
|
emotional_tone="neutral",
|
|
start_time=datetime.now(timezone.utc),
|
|
last_activity=datetime.now(timezone.utc),
|
|
message_count=0,
|
|
)
|
|
|
|
context = self.active_conversations[context_key]
|
|
context.last_activity = datetime.now(timezone.utc)
|
|
context.message_count += 1
|
|
|
|
# Update participants
|
|
for participant in participants:
|
|
if participant not in context.participants:
|
|
context.participants.append(participant)
|
|
|
|
# Extract keywords and emotional tone
|
|
keywords = await self._extract_keywords(content)
|
|
context.topic_keywords.extend(keywords)
|
|
|
|
# Keep only recent keywords
|
|
if len(context.topic_keywords) > 20:
|
|
context.topic_keywords = context.topic_keywords[-20:]
|
|
|
|
# Detect emotional tone
|
|
emotional_tone = await self._detect_emotional_tone(content)
|
|
if emotional_tone != "neutral":
|
|
context.emotional_tone = emotional_tone
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update conversation context: {e}")
|
|
|
|
async def get_contextual_insights(
|
|
self, user_id: int, recent_quotes: List[str]
|
|
) -> Dict[str, Any]:
|
|
"""Get contextual insights for quote analysis"""
|
|
try:
|
|
insights = {
|
|
"personality_match": 0.0,
|
|
"humor_consistency": 0.0,
|
|
"interaction_patterns": {},
|
|
"topic_relevance": 0.0,
|
|
"behavioral_prediction": {},
|
|
}
|
|
|
|
# Get personality profile
|
|
profile = await self.get_personality_profile(user_id)
|
|
if not profile:
|
|
return insights
|
|
|
|
# Analyze quote consistency with personality
|
|
for quote in recent_quotes:
|
|
# Get relevant memories
|
|
memories = await self.retrieve_memories(
|
|
user_id,
|
|
quote,
|
|
[MemoryType.PERSONALITY, MemoryType.HUMOR_PREFERENCE],
|
|
limit=5,
|
|
)
|
|
|
|
if memories:
|
|
# Calculate personality match score
|
|
personality_scores = []
|
|
for memory in memories:
|
|
if "humor_score" in memory.metadata:
|
|
personality_scores.append(memory.metadata["humor_score"])
|
|
|
|
if personality_scores and np:
|
|
insights["personality_match"] = np.mean(personality_scores)
|
|
|
|
# Analyze humor consistency
|
|
humor_preferences = profile.humor_preferences
|
|
if humor_preferences:
|
|
consistency_scores = []
|
|
for quote in recent_quotes:
|
|
# This would analyze quote humor against preferences
|
|
# Simplified implementation
|
|
consistency_scores.append(0.8) # Placeholder
|
|
|
|
insights["humor_consistency"] = (
|
|
np.mean(consistency_scores) if consistency_scores and np else 0.0
|
|
)
|
|
|
|
# Add interaction patterns
|
|
insights["interaction_patterns"] = profile.interaction_patterns
|
|
|
|
return insights
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get contextual insights: {e}")
|
|
return {}
|
|
|
|
async def _generate_embedding(self, text: str) -> Optional[np.ndarray]:
|
|
"""Generate embedding for text content"""
|
|
try:
|
|
if np is None:
|
|
logger.warning("NumPy not available - cannot generate embeddings")
|
|
return None
|
|
|
|
# Use AI manager to generate embeddings
|
|
result = await self.ai_manager.generate_embedding(text)
|
|
|
|
if result:
|
|
# Handle different result formats
|
|
if hasattr(result, "embedding"):
|
|
embedding_data = result.embedding
|
|
elif isinstance(result, list):
|
|
embedding_data = result
|
|
elif isinstance(result, dict) and "embedding" in result:
|
|
embedding_data = result["embedding"]
|
|
else:
|
|
logger.warning(
|
|
f"Unexpected embedding result format: {type(result)}"
|
|
)
|
|
return None
|
|
|
|
if not np:
|
|
logger.warning("NumPy not available, cannot process embeddings")
|
|
return None
|
|
|
|
embedding = np.array(embedding_data, dtype=np.float32)
|
|
|
|
# Ensure correct dimension
|
|
if len(embedding) != self.embedding_dimension:
|
|
# Pad or truncate as needed
|
|
if len(embedding) < self.embedding_dimension:
|
|
padding = np.zeros(self.embedding_dimension - len(embedding))
|
|
embedding = np.concatenate([embedding, padding])
|
|
else:
|
|
embedding = embedding[: self.embedding_dimension]
|
|
|
|
# Normalize
|
|
norm = np.linalg.norm(embedding)
|
|
if norm > 0:
|
|
embedding = embedding / norm
|
|
|
|
self.embedding_generations += 1
|
|
return embedding
|
|
|
|
return None
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate embedding: {e}")
|
|
return None
|
|
|
|
async def _calculate_importance_score(
|
|
self, content: str, memory_type: MemoryType, metadata: Dict[str, Any]
|
|
) -> float:
|
|
"""Calculate importance score for memory entry"""
|
|
try:
|
|
base_scores = {
|
|
MemoryType.CONVERSATION: 0.5,
|
|
MemoryType.PERSONALITY: 0.8,
|
|
MemoryType.QUOTE_CONTEXT: 0.7,
|
|
MemoryType.USER_INTERACTION: 0.6,
|
|
MemoryType.BEHAVIORAL_PATTERN: 0.9,
|
|
MemoryType.HUMOR_PREFERENCE: 0.7,
|
|
}
|
|
|
|
base_score = base_scores.get(memory_type, 0.5)
|
|
|
|
# Adjust based on content length and quality
|
|
content_factor = min(
|
|
1.0, len(content.split()) / 10
|
|
) # Longer content = more important
|
|
|
|
# Adjust based on metadata indicators
|
|
metadata_factor = 1.0
|
|
if "humor_score" in metadata and metadata["humor_score"] > 7:
|
|
metadata_factor += 0.2
|
|
if "laughter_duration" in metadata and metadata["laughter_duration"] > 2:
|
|
metadata_factor += 0.1
|
|
if "engagement_level" in metadata:
|
|
metadata_factor += metadata["engagement_level"] * 0.1
|
|
|
|
importance_score = base_score * content_factor * metadata_factor
|
|
return min(1.0, importance_score)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to calculate importance score: {e}")
|
|
return 0.5
|
|
|
|
async def _store_in_qdrant(self, memory_entry: MemoryEntry):
|
|
"""Store memory entry in Qdrant vector database"""
|
|
try:
|
|
if (
|
|
memory_entry.embedding is None
|
|
or not self.qdrant_client
|
|
or not PointStruct
|
|
):
|
|
return
|
|
|
|
# Prepare payload
|
|
payload = {
|
|
"user_id": memory_entry.user_id,
|
|
"guild_id": memory_entry.guild_id,
|
|
"memory_type": memory_entry.memory_type.value,
|
|
"content": memory_entry.content,
|
|
"metadata": memory_entry.metadata,
|
|
"importance_score": memory_entry.importance_score,
|
|
"created_at": memory_entry.created_at.isoformat(),
|
|
"access_count": memory_entry.access_count,
|
|
}
|
|
|
|
# Create point
|
|
point = PointStruct(
|
|
id=memory_entry.id,
|
|
vector=memory_entry.embedding.tolist(),
|
|
payload=payload,
|
|
)
|
|
|
|
# Upsert to Qdrant
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
lambda: self.qdrant_client.upsert(
|
|
collection_name=self.collection_name, points=[point]
|
|
),
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store in Qdrant: {e}")
|
|
|
|
async def _store_memory_metadata(self, memory_entry: MemoryEntry):
|
|
"""Store memory metadata in PostgreSQL for efficient filtering"""
|
|
try:
|
|
await self.db_manager.execute_query(
|
|
"""
|
|
INSERT INTO memory_entries
|
|
(id, user_id, guild_id, memory_type, content, metadata,
|
|
importance_score, created_at, last_accessed, access_count)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
|
|
ON CONFLICT (id) DO UPDATE SET
|
|
last_accessed = EXCLUDED.last_accessed,
|
|
access_count = EXCLUDED.access_count
|
|
""",
|
|
memory_entry.id,
|
|
memory_entry.user_id,
|
|
memory_entry.guild_id,
|
|
memory_entry.memory_type.value,
|
|
memory_entry.content,
|
|
json.dumps(memory_entry.metadata),
|
|
memory_entry.importance_score,
|
|
memory_entry.created_at,
|
|
memory_entry.last_accessed,
|
|
memory_entry.access_count,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store memory metadata: {e}")
|
|
|
|
async def _reconstruct_memory_entry(self, qdrant_result) -> Optional[MemoryEntry]:
|
|
"""Reconstruct memory entry from Qdrant search result"""
|
|
try:
|
|
payload = qdrant_result.payload
|
|
|
|
memory_entry = MemoryEntry(
|
|
id=qdrant_result.id,
|
|
user_id=payload["user_id"],
|
|
guild_id=payload["guild_id"],
|
|
memory_type=MemoryType(payload["memory_type"]),
|
|
content=payload["content"],
|
|
embedding=None, # Not needed for retrieval
|
|
metadata=payload.get("metadata", {}),
|
|
relevance_score=qdrant_result.score,
|
|
created_at=datetime.fromisoformat(payload["created_at"]),
|
|
last_accessed=datetime.now(timezone.utc),
|
|
access_count=payload.get("access_count", 0),
|
|
importance_score=payload.get("importance_score", 0.5),
|
|
)
|
|
|
|
return memory_entry
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to reconstruct memory entry: {e}")
|
|
return None
|
|
|
|
async def _update_memory_access(self, memory_id: str):
|
|
"""Update memory access tracking"""
|
|
try:
|
|
await self.db_manager.execute_query(
|
|
"""
|
|
UPDATE memory_entries
|
|
SET last_accessed = NOW(), access_count = access_count + 1
|
|
WHERE id = $1
|
|
""",
|
|
memory_id,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update memory access: {e}")
|
|
|
|
async def _update_personality_profile(
|
|
self, user_id: int, memory_entry: MemoryEntry
|
|
):
|
|
"""Update personality profile based on new memory"""
|
|
try:
|
|
profile = await self.get_personality_profile(user_id)
|
|
|
|
if not profile:
|
|
# Create new profile
|
|
profile = PersonalityProfile(
|
|
user_id=user_id,
|
|
humor_preferences={},
|
|
communication_style={},
|
|
interaction_patterns={},
|
|
topic_interests=[],
|
|
activity_periods=[],
|
|
personality_keywords=[],
|
|
last_updated=datetime.now(timezone.utc),
|
|
)
|
|
|
|
# Update humor preferences if available
|
|
if "humor_scores" in memory_entry.metadata:
|
|
humor_scores = memory_entry.metadata["humor_scores"]
|
|
for humor_type, score in humor_scores.items():
|
|
if humor_type in profile.humor_preferences:
|
|
# Average with existing preference
|
|
profile.humor_preferences[humor_type] = (
|
|
profile.humor_preferences[humor_type] + score
|
|
) / 2
|
|
else:
|
|
profile.humor_preferences[humor_type] = score
|
|
|
|
# Extract keywords from content
|
|
keywords = await self._extract_keywords(memory_entry.content)
|
|
profile.personality_keywords.extend(keywords)
|
|
|
|
# Keep only recent keywords
|
|
if len(profile.personality_keywords) > 50:
|
|
profile.personality_keywords = profile.personality_keywords[-50:]
|
|
|
|
# Update activity pattern
|
|
current_hour = memory_entry.created_at.hour
|
|
activity_pattern = {
|
|
"hour": current_hour,
|
|
"day_of_week": memory_entry.created_at.weekday(),
|
|
"activity_type": memory_entry.memory_type.value,
|
|
}
|
|
profile.activity_periods.append(activity_pattern)
|
|
|
|
# Keep only recent activity
|
|
if len(profile.activity_periods) > 100:
|
|
profile.activity_periods = profile.activity_periods[-100:]
|
|
|
|
profile.last_updated = datetime.now(timezone.utc)
|
|
|
|
# Store updated profile
|
|
await self._store_personality_profile(profile)
|
|
self.personality_profiles[user_id] = profile
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to update personality profile: {e}")
|
|
|
|
async def _store_personality_profile(self, profile: PersonalityProfile):
|
|
"""Store personality profile in database"""
|
|
try:
|
|
await self.db_manager.execute_query(
|
|
"""
|
|
INSERT INTO personality_profiles
|
|
(user_id, humor_preferences, communication_style, interaction_patterns,
|
|
topic_interests, activity_periods, personality_keywords, last_updated)
|
|
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
|
|
ON CONFLICT (user_id) DO UPDATE SET
|
|
humor_preferences = EXCLUDED.humor_preferences,
|
|
communication_style = EXCLUDED.communication_style,
|
|
interaction_patterns = EXCLUDED.interaction_patterns,
|
|
topic_interests = EXCLUDED.topic_interests,
|
|
activity_periods = EXCLUDED.activity_periods,
|
|
personality_keywords = EXCLUDED.personality_keywords,
|
|
last_updated = EXCLUDED.last_updated
|
|
""",
|
|
profile.user_id,
|
|
json.dumps(profile.humor_preferences),
|
|
json.dumps(profile.communication_style),
|
|
json.dumps(profile.interaction_patterns),
|
|
json.dumps(profile.topic_interests),
|
|
json.dumps(profile.activity_periods),
|
|
json.dumps(profile.personality_keywords),
|
|
profile.last_updated,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to store personality profile: {e}")
|
|
|
|
async def _load_personality_profiles(self):
|
|
"""Load existing personality profiles from database"""
|
|
try:
|
|
profiles_data = await self.db_manager.execute_query(
|
|
"""
|
|
SELECT * FROM personality_profiles
|
|
""",
|
|
fetch_all=True,
|
|
)
|
|
|
|
for profile_data in profiles_data:
|
|
profile = PersonalityProfile(
|
|
user_id=profile_data["user_id"],
|
|
humor_preferences=json.loads(profile_data["humor_preferences"]),
|
|
communication_style=json.loads(profile_data["communication_style"]),
|
|
interaction_patterns=json.loads(
|
|
profile_data["interaction_patterns"]
|
|
),
|
|
topic_interests=json.loads(profile_data["topic_interests"]),
|
|
activity_periods=json.loads(profile_data["activity_periods"]),
|
|
personality_keywords=json.loads(
|
|
profile_data["personality_keywords"]
|
|
),
|
|
last_updated=profile_data["last_updated"],
|
|
)
|
|
|
|
self.personality_profiles[profile.user_id] = profile
|
|
|
|
logger.info(f"Loaded {len(self.personality_profiles)} personality profiles")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load personality profiles: {e}")
|
|
|
|
async def _extract_keywords(self, text: str) -> List[str]:
|
|
"""Extract keywords from text content"""
|
|
try:
|
|
# Simple keyword extraction (can be enhanced with NLP libraries)
|
|
words = text.lower().split()
|
|
|
|
# Filter common words
|
|
stop_words = {
|
|
"the",
|
|
"a",
|
|
"an",
|
|
"and",
|
|
"or",
|
|
"but",
|
|
"in",
|
|
"on",
|
|
"at",
|
|
"to",
|
|
"for",
|
|
"of",
|
|
"with",
|
|
"by",
|
|
"from",
|
|
"as",
|
|
"is",
|
|
"was",
|
|
"are",
|
|
"were",
|
|
"be",
|
|
"been",
|
|
"being",
|
|
"have",
|
|
"has",
|
|
"had",
|
|
"do",
|
|
"does",
|
|
"did",
|
|
"will",
|
|
"would",
|
|
"could",
|
|
"should",
|
|
"may",
|
|
"might",
|
|
"must",
|
|
"can",
|
|
"this",
|
|
"that",
|
|
"these",
|
|
"those",
|
|
"i",
|
|
"you",
|
|
"he",
|
|
"she",
|
|
"it",
|
|
"we",
|
|
"they",
|
|
}
|
|
|
|
keywords = []
|
|
for word in words:
|
|
# Clean word
|
|
word = "".join(c for c in word if c.isalnum())
|
|
if len(word) > 3 and word not in stop_words:
|
|
keywords.append(word)
|
|
|
|
# Return unique keywords
|
|
return list(set(keywords))[:10] # Limit to 10 keywords
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to extract keywords: {e}")
|
|
return []
|
|
|
|
async def _detect_emotional_tone(self, text: str) -> str:
|
|
"""Detect emotional tone of text"""
|
|
try:
|
|
text_lower = text.lower()
|
|
|
|
# Simple rule-based emotion detection
|
|
positive_words = [
|
|
"happy",
|
|
"joy",
|
|
"love",
|
|
"great",
|
|
"awesome",
|
|
"amazing",
|
|
"wonderful",
|
|
]
|
|
negative_words = [
|
|
"sad",
|
|
"angry",
|
|
"hate",
|
|
"terrible",
|
|
"awful",
|
|
"bad",
|
|
"annoying",
|
|
]
|
|
humorous_words = [
|
|
"funny",
|
|
"hilarious",
|
|
"lol",
|
|
"haha",
|
|
"joke",
|
|
"comedy",
|
|
"laugh",
|
|
]
|
|
sarcastic_words = ["obviously", "totally", "definitely", "sure", "right"]
|
|
|
|
positive_count = sum(1 for word in positive_words if word in text_lower)
|
|
negative_count = sum(1 for word in negative_words if word in text_lower)
|
|
humorous_count = sum(1 for word in humorous_words if word in text_lower)
|
|
sarcastic_count = sum(1 for word in sarcastic_words if word in text_lower)
|
|
|
|
if humorous_count > 0:
|
|
return "humorous"
|
|
elif sarcastic_count > 1:
|
|
return "sarcastic"
|
|
elif positive_count > negative_count:
|
|
return "positive"
|
|
elif negative_count > positive_count:
|
|
return "negative"
|
|
else:
|
|
return "neutral"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to detect emotional tone: {e}")
|
|
return "neutral"
|
|
|
|
async def _memory_consolidation_worker(self):
|
|
"""Background worker for memory consolidation and cleanup"""
|
|
while True:
|
|
try:
|
|
logger.info("Starting memory consolidation...")
|
|
|
|
# Clean up old memories
|
|
cutoff_date = datetime.now(timezone.utc) - timedelta(
|
|
days=self.memory_retention_days
|
|
)
|
|
|
|
# Get old, low-importance memories
|
|
old_memories = await self.db_manager.execute_query(
|
|
"""
|
|
SELECT id FROM memory_entries
|
|
WHERE created_at < $1 AND importance_score < $2
|
|
""",
|
|
cutoff_date,
|
|
self.importance_threshold,
|
|
fetch_all=True,
|
|
)
|
|
|
|
if old_memories:
|
|
memory_ids = [m["id"] for m in old_memories]
|
|
|
|
# Delete from Qdrant
|
|
if self.qdrant_client and models:
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
lambda: self.qdrant_client.delete(
|
|
collection_name=self.collection_name,
|
|
points_selector=models.PointIdsList(points=memory_ids),
|
|
),
|
|
)
|
|
|
|
# Delete from PostgreSQL
|
|
await self.db_manager.execute_query(
|
|
"""
|
|
DELETE FROM memory_entries WHERE id = ANY($1)
|
|
""",
|
|
memory_ids,
|
|
)
|
|
|
|
logger.info(f"Consolidated {len(memory_ids)} old memories")
|
|
|
|
# Sleep for 24 hours
|
|
await asyncio.sleep(86400)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in memory consolidation worker: {e}")
|
|
await asyncio.sleep(86400)
|
|
|
|
async def _personality_update_worker(self):
|
|
"""Background worker for updating personality profiles"""
|
|
while True:
|
|
try:
|
|
# Update personality profiles based on recent activity
|
|
cutoff_date = datetime.now(timezone.utc) - timedelta(hours=6)
|
|
|
|
# Find users with recent activity
|
|
active_users = await self.db_manager.execute_query(
|
|
"""
|
|
SELECT DISTINCT user_id FROM memory_entries
|
|
WHERE created_at > $1
|
|
""",
|
|
cutoff_date,
|
|
fetch_all=True,
|
|
)
|
|
|
|
for user_data in active_users:
|
|
user_id = user_data["user_id"]
|
|
|
|
# Get recent memories for this user
|
|
recent_memories = await self.retrieve_memories(
|
|
user_id,
|
|
"personality humor behavior", # General query
|
|
[MemoryType.PERSONALITY, MemoryType.USER_INTERACTION],
|
|
limit=20,
|
|
relevance_threshold=0.3,
|
|
)
|
|
|
|
if recent_memories:
|
|
# Update personality based on recent memories
|
|
await self._analyze_personality_trends(user_id, recent_memories)
|
|
|
|
# Sleep for 6 hours
|
|
await asyncio.sleep(21600)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in personality update worker: {e}")
|
|
await asyncio.sleep(21600)
|
|
|
|
async def _analyze_personality_trends(
|
|
self, user_id: int, memories: List[MemoryEntry]
|
|
):
|
|
"""Analyze personality trends from recent memories"""
|
|
try:
|
|
profile = await self.get_personality_profile(user_id)
|
|
if not profile:
|
|
return
|
|
|
|
# Analyze humor preference trends
|
|
humor_scores = []
|
|
for memory in memories:
|
|
if "humor_scores" in memory.metadata:
|
|
humor_scores.append(memory.metadata["humor_scores"])
|
|
|
|
if humor_scores:
|
|
# Calculate trending humor preferences
|
|
for humor_type in ["funny", "dark", "silly", "suspicious", "asinine"]:
|
|
scores = [
|
|
h.get(humor_type, 0) for h in humor_scores if humor_type in h
|
|
]
|
|
if scores:
|
|
avg_score = sum(scores) / len(scores)
|
|
# Update with weighted average (recent activity weighted more)
|
|
if humor_type in profile.humor_preferences:
|
|
profile.humor_preferences[humor_type] = (
|
|
profile.humor_preferences[humor_type] * 0.7
|
|
+ avg_score * 0.3
|
|
)
|
|
else:
|
|
profile.humor_preferences[humor_type] = avg_score
|
|
|
|
# Update communication style
|
|
content_analysis = await self._analyze_communication_style(memories)
|
|
for style, score in content_analysis.items():
|
|
if style in profile.communication_style:
|
|
profile.communication_style[style] = (
|
|
profile.communication_style[style] * 0.8 + score * 0.2
|
|
)
|
|
else:
|
|
profile.communication_style[style] = score
|
|
|
|
profile.last_updated = datetime.now(timezone.utc)
|
|
await self._store_personality_profile(profile)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to analyze personality trends: {e}")
|
|
|
|
async def _analyze_communication_style(
|
|
self, memories: List[MemoryEntry]
|
|
) -> Dict[str, float]:
|
|
"""Analyze communication style from memories"""
|
|
try:
|
|
style_scores = {
|
|
"formal": 0.0,
|
|
"casual": 0.0,
|
|
"sarcastic": 0.0,
|
|
"enthusiastic": 0.0,
|
|
"direct": 0.0,
|
|
}
|
|
|
|
total_content = " ".join([m.content for m in memories])
|
|
content_lower = total_content.lower()
|
|
|
|
# Simple style detection based on word patterns
|
|
formal_indicators = [
|
|
"please",
|
|
"thank you",
|
|
"would you",
|
|
"could you",
|
|
"sir",
|
|
"madam",
|
|
]
|
|
casual_indicators = ["yeah", "yep", "nah", "gonna", "wanna", "kinda"]
|
|
sarcastic_indicators = ["obviously", "totally", "sure thing", "right"]
|
|
enthusiastic_indicators = ["!", "awesome", "amazing", "love it", "so good"]
|
|
direct_indicators = ["no", "yes", "exactly", "wrong", "correct"]
|
|
|
|
formal_count = sum(
|
|
1 for indicator in formal_indicators if indicator in content_lower
|
|
)
|
|
casual_count = sum(
|
|
1 for indicator in casual_indicators if indicator in content_lower
|
|
)
|
|
sarcastic_count = sum(
|
|
1 for indicator in sarcastic_indicators if indicator in content_lower
|
|
)
|
|
enthusiastic_count = sum(
|
|
1 for indicator in enthusiastic_indicators if indicator in content_lower
|
|
)
|
|
direct_count = sum(
|
|
1 for indicator in direct_indicators if indicator in content_lower
|
|
)
|
|
|
|
total_indicators = (
|
|
formal_count
|
|
+ casual_count
|
|
+ sarcastic_count
|
|
+ enthusiastic_count
|
|
+ direct_count
|
|
)
|
|
|
|
if total_indicators > 0:
|
|
style_scores["formal"] = formal_count / total_indicators
|
|
style_scores["casual"] = casual_count / total_indicators
|
|
style_scores["sarcastic"] = sarcastic_count / total_indicators
|
|
style_scores["enthusiastic"] = enthusiastic_count / total_indicators
|
|
style_scores["direct"] = direct_count / total_indicators
|
|
|
|
return style_scores
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to analyze communication style: {e}")
|
|
return {}
|
|
|
|
async def _context_cleanup_worker(self):
|
|
"""Background worker to clean up inactive conversation contexts"""
|
|
while True:
|
|
try:
|
|
current_time = datetime.now(timezone.utc)
|
|
inactive_contexts = []
|
|
|
|
for context_key, context in self.active_conversations.items():
|
|
# Remove contexts inactive for more than 30 minutes
|
|
if current_time - context.last_activity > timedelta(minutes=30):
|
|
inactive_contexts.append(context_key)
|
|
|
|
for context_key in inactive_contexts:
|
|
del self.active_conversations[context_key]
|
|
|
|
if inactive_contexts:
|
|
logger.debug(
|
|
f"Cleaned up {len(inactive_contexts)} inactive conversation contexts"
|
|
)
|
|
|
|
# Sleep for 10 minutes
|
|
await asyncio.sleep(600)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in context cleanup worker: {e}")
|
|
await asyncio.sleep(600)
|
|
|
|
async def get_memory_stats(self) -> Dict[str, Any]:
|
|
"""Get memory system statistics"""
|
|
try:
|
|
# Get total memories count
|
|
total_memories_db = await self.db_manager.execute_query(
|
|
"""
|
|
SELECT COUNT(*) as count FROM memory_entries
|
|
""",
|
|
fetch_one=True,
|
|
)
|
|
|
|
total_memories = total_memories_db["count"] if total_memories_db else 0
|
|
|
|
# Get memory type distribution
|
|
type_distribution = await self.db_manager.execute_query(
|
|
"""
|
|
SELECT memory_type, COUNT(*) as count
|
|
FROM memory_entries
|
|
GROUP BY memory_type
|
|
""",
|
|
fetch_all=True,
|
|
)
|
|
|
|
type_dist_dict = {
|
|
row["memory_type"]: row["count"] for row in type_distribution
|
|
}
|
|
|
|
cache_hit_rate = self.cache_hits / max(self.total_retrievals, 1)
|
|
|
|
return {
|
|
"total_memories": total_memories,
|
|
"total_retrievals": self.total_retrievals,
|
|
"cache_hit_rate": cache_hit_rate,
|
|
"embedding_generations": self.embedding_generations,
|
|
"active_conversations": len(self.active_conversations),
|
|
"personality_profiles": len(self.personality_profiles),
|
|
"memory_type_distribution": type_dist_dict,
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get memory stats: {e}")
|
|
return {}
|
|
|
|
async def check_health(self) -> Dict[str, Any]:
|
|
"""Check health of memory system"""
|
|
try:
|
|
# Test Qdrant connection
|
|
qdrant_healthy = False
|
|
if self.qdrant_client:
|
|
try:
|
|
await asyncio.get_event_loop().run_in_executor(
|
|
None, self.qdrant_client.get_collections
|
|
)
|
|
qdrant_healthy = True
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"initialized": self._initialized,
|
|
"qdrant_healthy": qdrant_healthy,
|
|
"total_memories": self.total_memories,
|
|
"total_retrievals": self.total_retrievals,
|
|
"personality_profiles": len(self.personality_profiles),
|
|
}
|
|
|
|
except Exception as e:
|
|
return {"error": str(e), "healthy": False}
|
|
|
|
async def close(self):
|
|
"""Close memory system"""
|
|
try:
|
|
logger.info("Closing memory system...")
|
|
|
|
# Cancel background tasks
|
|
tasks = [
|
|
self._memory_consolidation_task,
|
|
self._personality_update_task,
|
|
self._context_cleanup_task,
|
|
]
|
|
|
|
for task in tasks:
|
|
if task:
|
|
task.cancel()
|
|
|
|
# Wait for tasks to complete
|
|
await asyncio.gather(*[t for t in tasks if t], return_exceptions=True)
|
|
|
|
# Close Qdrant client
|
|
if self.qdrant_client:
|
|
self.qdrant_client.close()
|
|
|
|
# Clear caches
|
|
self.active_conversations.clear()
|
|
self.personality_profiles.clear()
|
|
|
|
logger.info("Memory system closed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error closing memory system: {e}")
|