219 lines
6.6 KiB
Python
219 lines
6.6 KiB
Python
"""
|
|
Conversation Memory for Eugen Bot
|
|
Stores and retrieves chat history per user with time-based filtering
|
|
"""
|
|
import json
|
|
import logging
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Optional
|
|
|
|
|
|
class ConversationMemory:
|
|
"""Manages persistent conversation history for each user"""
|
|
|
|
def __init__(self, data_dir="data/conversations", max_messages=25, retention_hours=1, logger=None):
|
|
"""
|
|
Initialize conversation memory
|
|
|
|
Args:
|
|
data_dir (str): Directory to store conversation JSON files
|
|
max_messages (int): Maximum messages to store per user
|
|
retention_hours (int): How long to keep messages in context
|
|
logger: Optional logger instance for error reporting
|
|
"""
|
|
self.data_dir = Path(data_dir)
|
|
self.data_dir.mkdir(parents=True, exist_ok=True)
|
|
self.max_messages = max_messages
|
|
self.retention_hours = retention_hours
|
|
self.logger = logger or logging.getLogger(__name__)
|
|
# In-memory cache to reduce file I/O for frequently accessed users
|
|
self._cache: Dict[str, List[Dict]] = {}
|
|
|
|
def _get_user_file(self, username):
|
|
"""Get the file path for a user's conversation history"""
|
|
# Sanitize username for filesystem
|
|
safe_username = "".join(c for c in username.lower() if c.isalnum() or c in "._-")
|
|
return self.data_dir / f"{safe_username}.json"
|
|
|
|
def _load_user_history(self, username):
|
|
"""
|
|
Load user history from cache or file
|
|
|
|
Args:
|
|
username (str): Twitch username
|
|
|
|
Returns:
|
|
list: List of message dicts or empty list
|
|
"""
|
|
safe_username = username.lower()
|
|
|
|
# Check cache first
|
|
if safe_username in self._cache:
|
|
return self._cache[safe_username]
|
|
|
|
file_path = self._get_user_file(username)
|
|
|
|
if not file_path.exists():
|
|
self._cache[safe_username] = []
|
|
return []
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
history = json.load(f)
|
|
self._cache[safe_username] = history
|
|
return history
|
|
except Exception as e:
|
|
self.logger.error(f"Error loading history for {username}: {e}")
|
|
self._cache[safe_username] = []
|
|
return []
|
|
|
|
def _save_user_history(self, username, history):
|
|
"""
|
|
Save user history to file and update cache
|
|
|
|
Args:
|
|
username (str): Twitch username
|
|
history (list): List of message dicts
|
|
"""
|
|
safe_username = username.lower()
|
|
file_path = self._get_user_file(username)
|
|
|
|
# Update cache
|
|
self._cache[safe_username] = history
|
|
|
|
# Save to file
|
|
try:
|
|
with open(file_path, 'w', encoding='utf-8') as f:
|
|
json.dump(history, f, ensure_ascii=False, indent=2)
|
|
except Exception as e:
|
|
self.logger.error(f"Error saving history for {username}: {e}")
|
|
|
|
def get_user_history(self, username, limit=5):
|
|
"""
|
|
Load recent chat history for a user
|
|
|
|
Args:
|
|
username (str): Twitch username
|
|
limit (int): Maximum number of messages to return
|
|
|
|
Returns:
|
|
list: List of message dicts with role, content, timestamp
|
|
"""
|
|
history = self._load_user_history(username)
|
|
|
|
if not history:
|
|
return []
|
|
|
|
# Filter by retention time using list comprehension for better performance
|
|
cutoff_time = datetime.now() - timedelta(hours=self.retention_hours)
|
|
|
|
def is_recent(msg):
|
|
try:
|
|
msg_time = datetime.fromisoformat(msg['timestamp'])
|
|
return msg_time > cutoff_time
|
|
except (KeyError, ValueError):
|
|
return False
|
|
|
|
recent = [msg for msg in history if is_recent(msg)]
|
|
|
|
# Return only the most recent messages up to limit
|
|
return recent[-limit:] if recent else []
|
|
|
|
def add_message(self, username, role, content):
|
|
"""
|
|
Add a message to user's conversation history
|
|
|
|
Args:
|
|
username (str): Twitch username
|
|
role (str): 'user' or 'assistant'
|
|
content (str): Message content
|
|
"""
|
|
# Load existing history (uses cache if available)
|
|
history = self._load_user_history(username)
|
|
|
|
# Add new message
|
|
history.append({
|
|
"role": role,
|
|
"content": content,
|
|
"timestamp": datetime.now().isoformat()
|
|
})
|
|
|
|
# Enforce max message limit
|
|
if len(history) > self.max_messages:
|
|
history = history[-self.max_messages:]
|
|
|
|
# Save back to file and update cache
|
|
self._save_user_history(username, history)
|
|
|
|
def format_for_prompt(self, history):
|
|
"""
|
|
Convert history to format suitable for AI API
|
|
|
|
Args:
|
|
history (list): List of message dicts from get_user_history
|
|
|
|
Returns:
|
|
list: List of dicts with 'role' and 'content' keys
|
|
"""
|
|
return [
|
|
{
|
|
"role": msg['role'],
|
|
"content": msg['content']
|
|
}
|
|
for msg in history
|
|
]
|
|
|
|
def clear_user_history(self, username):
|
|
"""
|
|
Clear all history for a specific user
|
|
|
|
Args:
|
|
username (str): Twitch username
|
|
"""
|
|
safe_username = username.lower()
|
|
|
|
# Clear from cache
|
|
if safe_username in self._cache:
|
|
del self._cache[safe_username]
|
|
|
|
# Clear from disk
|
|
file_path = self._get_user_file(username)
|
|
if file_path.exists():
|
|
try:
|
|
file_path.unlink()
|
|
except Exception as e:
|
|
self.logger.error(f"Error clearing history for {username}: {e}")
|
|
|
|
def get_all_users(self):
|
|
"""
|
|
Get list of all users with conversation history
|
|
|
|
Returns:
|
|
list: List of usernames
|
|
"""
|
|
users = []
|
|
for file_path in self.data_dir.glob("*.json"):
|
|
users.append(file_path.stem)
|
|
return users
|
|
|
|
def get_user_message_count(self, username):
|
|
"""
|
|
Get total message count for a user
|
|
|
|
Args:
|
|
username (str): Twitch username
|
|
|
|
Returns:
|
|
int: Number of messages in history
|
|
"""
|
|
file_path = self._get_user_file(username)
|
|
if not file_path.exists():
|
|
return 0
|
|
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
history = json.load(f)
|
|
return len(history)
|
|
except Exception:
|
|
return 0
|