KI-Konzil/backend/api/run_store.py
Claude 797f02c74d
Implement Phase 1: LangGraph backend MVP
Sets up the full backend foundation for CouncilOS:

- CouncilState TypedDict with all required fields and LangGraph reducers
- Three agent nodes: master_agent (drafts), critic_agent (scores + routes),
  writer_agent (final polish)
- LangGraph graph with cyclic rework loop: Master → Critic → (score < 8:
  back to Master | score ≥ 8: Writer → END)
- Safety valve: MAX_ITERATIONS=5 prevents infinite loops
- FastAPI app with REST endpoints (POST /api/councils/run, GET /api/councils/run/{id})
  and WebSocket endpoint (/ws/council/{run_id}) for real-time agent status events
- In-memory RunStore for Phase 1 (PostgreSQL-backed in Phase 3)
- pytest test suite: state, routing logic, critic parser, agent nodes, API endpoints
- .env.example, .gitignore, docker-compose.yml, Dockerfile

https://claude.ai/code/session_01RfMpt3TbMjZEtK3CAyP5iQ
2026-02-20 16:33:39 +00:00

47 lines
1.4 KiB
Python

"""
In-memory run store for Phase 1.
Tracks the status and results of council runs by run_id. This is intentionally
simple for Phase 1. Phase 3+ will replace this with a PostgreSQL-backed store.
"""
from typing import Any, Dict, Optional
import threading
class RunStore:
"""Thread-safe in-memory store for council run state."""
def __init__(self) -> None:
self._store: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
def create(self, run_id: str, input_topic: str) -> None:
with self._lock:
self._store[run_id] = {
"run_id": run_id,
"input_topic": input_topic,
"status": "pending",
"final_draft": None,
"critic_score": None,
"iteration_count": None,
"active_node": None,
"error": None,
}
def get(self, run_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
return self._store.get(run_id)
def update(self, run_id: str, updates: Dict[str, Any]) -> None:
with self._lock:
if run_id in self._store:
self._store[run_id].update(updates)
def delete(self, run_id: str) -> None:
with self._lock:
self._store.pop(run_id, None)
# Singleton instance shared across the application
run_store = RunStore()