From 797f02c74df56c20b12ad420bc0a19a2a0005158 Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 20 Feb 2026 16:33:39 +0000 Subject: [PATCH] Implement Phase 1: LangGraph backend MVP MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Sets up the full backend foundation for CouncilOS: - CouncilState TypedDict with all required fields and LangGraph reducers - Three agent nodes: master_agent (drafts), critic_agent (scores + routes), writer_agent (final polish) - LangGraph graph with cyclic rework loop: Master → Critic → (score < 8: back to Master | score ≥ 8: Writer → END) - Safety valve: MAX_ITERATIONS=5 prevents infinite loops - FastAPI app with REST endpoints (POST /api/councils/run, GET /api/councils/run/{id}) and WebSocket endpoint (/ws/council/{run_id}) for real-time agent status events - In-memory RunStore for Phase 1 (PostgreSQL-backed in Phase 3) - pytest test suite: state, routing logic, critic parser, agent nodes, API endpoints - .env.example, .gitignore, docker-compose.yml, Dockerfile https://claude.ai/code/session_01RfMpt3TbMjZEtK3CAyP5iQ --- .env.example | 45 +++++++ .gitignore | 72 ++++++++++ backend/Dockerfile | 19 +++ backend/agents/__init__.py | 7 + backend/agents/critic_agent.py | 127 ++++++++++++++++++ backend/agents/master_agent.py | 75 +++++++++++ backend/agents/writer_agent.py | 63 +++++++++ backend/api/__init__.py | 1 + backend/api/routes.py | 135 +++++++++++++++++++ backend/api/run_store.py | 47 +++++++ backend/api/websocket.py | 128 ++++++++++++++++++ backend/main.py | 59 +++++++++ backend/pytest.ini | 6 + backend/requirements.txt | 38 ++++++ backend/services/__init__.py | 1 + backend/services/graph_builder.py | 131 +++++++++++++++++++ backend/state.py | 47 +++++++ backend/tests/__init__.py | 1 + backend/tests/test_api.py | 99 ++++++++++++++ backend/tests/test_routing.py | 211 ++++++++++++++++++++++++++++++ backend/tests/test_run_store.py | 55 ++++++++ backend/tests/test_state.py | 44 +++++++ backend/tools/__init__.py | 7 + docker-compose.yml | 54 ++++++++ 24 files changed, 1472 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 backend/Dockerfile create mode 100644 backend/agents/__init__.py create mode 100644 backend/agents/critic_agent.py create mode 100644 backend/agents/master_agent.py create mode 100644 backend/agents/writer_agent.py create mode 100644 backend/api/__init__.py create mode 100644 backend/api/routes.py create mode 100644 backend/api/run_store.py create mode 100644 backend/api/websocket.py create mode 100644 backend/main.py create mode 100644 backend/pytest.ini create mode 100644 backend/requirements.txt create mode 100644 backend/services/__init__.py create mode 100644 backend/services/graph_builder.py create mode 100644 backend/state.py create mode 100644 backend/tests/__init__.py create mode 100644 backend/tests/test_api.py create mode 100644 backend/tests/test_routing.py create mode 100644 backend/tests/test_run_store.py create mode 100644 backend/tests/test_state.py create mode 100644 backend/tools/__init__.py create mode 100644 docker-compose.yml diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..4816b7b --- /dev/null +++ b/.env.example @@ -0,0 +1,45 @@ +# CouncilOS — Environment Variables Template +# Copy this file to .env and fill in your actual values. +# NEVER commit the .env file to version control. + +# ============================================================================= +# LLM API Keys +# ============================================================================= + +# Anthropic Claude API key (required) +ANTHROPIC_API_KEY= + +# OpenAI GPT-4o API key (optional for Phase 1, required from Phase 3) +OPENAI_API_KEY= + +# Tavily Search API key (required for Phase 4 web-search tool) +TAVILY_API_KEY= + +# ============================================================================= +# Database +# ============================================================================= + +# PostgreSQL connection string (required from Phase 2) +DATABASE_URL=postgresql+asyncpg://user:password@localhost:5432/councilOS + +# ============================================================================= +# Vector Database (ChromaDB) +# ============================================================================= + +# Local directory to persist ChromaDB embeddings (required for Phase 4 PDF tool) +CHROMA_PERSIST_DIR=./chroma_db + +# ============================================================================= +# Application Settings +# ============================================================================= + +# FastAPI server host and port +HOST=0.0.0.0 +PORT=8000 + +# Log level: DEBUG | INFO | WARNING | ERROR +LOG_LEVEL=INFO + +# CORS: comma-separated list of allowed frontend origins in production +# Example: https://my-app.vercel.app,https://www.my-domain.com +CORS_ORIGINS=http://localhost:3000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7e26f51 --- /dev/null +++ b/.gitignore @@ -0,0 +1,72 @@ +# ============================================================================= +# Environment & Secrets — NEVER commit these +# ============================================================================= +.env +.env.local +.env.*.local +*.pem +*.key +secrets/ + +# ============================================================================= +# Python +# ============================================================================= +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python +.venv/ +venv/ +env/ +ENV/ +*.egg-info/ +dist/ +build/ +.eggs/ +pip-wheel-metadata/ +.mypy_cache/ +.ruff_cache/ +.pytest_cache/ +htmlcov/ +.coverage +coverage.xml +*.cover + +# ============================================================================= +# Node / Frontend +# ============================================================================= +node_modules/ +.next/ +out/ +.nuxt/ +dist/ +.cache/ +*.log +npm-debug.log* +yarn-debug.log* +yarn-error.log* +.pnpm-debug.log* + +# ============================================================================= +# Database & Vector Store +# ============================================================================= +chroma_db/ +*.sqlite3 +*.db +postgres_data/ + +# ============================================================================= +# IDE & OS +# ============================================================================= +.idea/ +.vscode/ +*.swp +*.swo +.DS_Store +Thumbs.db + +# ============================================================================= +# Docker +# ============================================================================= +.docker/ diff --git a/backend/Dockerfile b/backend/Dockerfile new file mode 100644 index 0000000..bb8a866 --- /dev/null +++ b/backend/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + build-essential \ + && rm -rf /var/lib/apt/lists/* + +# Install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +EXPOSE 8000 + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/backend/agents/__init__.py b/backend/agents/__init__.py new file mode 100644 index 0000000..b1cf7fe --- /dev/null +++ b/backend/agents/__init__.py @@ -0,0 +1,7 @@ +"""Agent node functions for CouncilOS.""" + +from .master_agent import master_agent_node +from .critic_agent import critic_agent_node +from .writer_agent import writer_agent_node + +__all__ = ["master_agent_node", "critic_agent_node", "writer_agent_node"] diff --git a/backend/agents/critic_agent.py b/backend/agents/critic_agent.py new file mode 100644 index 0000000..7681c81 --- /dev/null +++ b/backend/agents/critic_agent.py @@ -0,0 +1,127 @@ +""" +Critic Agent Node — evaluates the current draft and decides whether to approve or rework. + +The critic scores the draft from 0–10 and returns structured feedback. +If the score meets APPROVAL_THRESHOLD, route_decision is set to "approve". +Otherwise it is set to "rework" and the feedback is appended to feedback_history. +""" + +import os +import re +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import HumanMessage, SystemMessage + +from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS + + +_SYSTEM_PROMPT = """You are the Critic AI in a council of expert AIs. +Your job is to rigorously evaluate the quality of a draft document. + +You must respond in EXACTLY this format — no deviations: + +SCORE: +VERDICT: <"approve" if score >= 8, otherwise "rework"> +FEEDBACK: + + +Scoring criteria: +- 0–3: Poor structure, major factual gaps, incoherent +- 4–6: Adequate but needs significant improvement +- 7: Good but has notable weaknesses +- 8–9: High quality, minor improvements possible +- 10: Exceptional, publication-ready + +Be strict. Only award 8+ if the document genuinely meets high quality standards.""" + + +def _parse_critic_response(content: str) -> tuple[float, str, str]: + """ + Parse the structured critic response. + + Returns: + (score, verdict, feedback) tuple. + Falls back to ("rework", full content) on parse failure. + """ + score_match = re.search(r"SCORE:\s*(\d+(?:\.\d+)?)", content) + verdict_match = re.search(r"VERDICT:\s*(approve|rework)", content, re.IGNORECASE) + feedback_match = re.search(r"FEEDBACK:\s*(.*)", content, re.DOTALL) + + score = float(score_match.group(1)) if score_match else 0.0 + verdict = verdict_match.group(1).lower() if verdict_match else "rework" + feedback = feedback_match.group(1).strip() if feedback_match else content.strip() + + # Clamp score to 0–10 + score = max(0.0, min(10.0, score)) + + return score, verdict, feedback + + +def critic_agent_node(state: CouncilState) -> dict: + """ + LangGraph node function for the Critic Agent. + + Reads current_draft from state, evaluates it, and returns: + - route_decision: "approve" or "rework" + - critic_score: numeric score + - feedback_history: appended with new feedback (if rework) + - active_node: "critic_agent" + + Safety valve: if iteration_count >= MAX_ITERATIONS, force approval + to prevent infinite loops. + + Args: + state: The current CouncilState. + + Returns: + A dict with updated state fields. + """ + # Safety valve: prevent infinite loops + if state.get("iteration_count", 0) >= MAX_ITERATIONS: + return { + "route_decision": "approve", + "critic_score": APPROVAL_THRESHOLD, + "feedback_history": [ + f"[Auto-approved after {MAX_ITERATIONS} iterations]" + ], + "messages": [], + "active_node": "critic_agent", + } + + llm = ChatAnthropic( + model="claude-3-5-sonnet-20241022", + api_key=os.environ.get("ANTHROPIC_API_KEY"), + temperature=0.2, # Low temperature for consistent evaluation + max_tokens=1024, + ) + + system_msg = SystemMessage(content=_SYSTEM_PROMPT) + user_msg = HumanMessage( + content=( + f"Please evaluate this draft on the topic '{state['input_topic']}':\n\n" + f"{state['current_draft']}" + ) + ) + + response = llm.invoke([system_msg, user_msg]) + score, verdict, feedback = _parse_critic_response(response.content) + + # Override verdict based on threshold to ensure consistency + if score >= APPROVAL_THRESHOLD: + route_decision = "approve" + else: + route_decision = "rework" + + result: dict = { + "critic_score": score, + "route_decision": route_decision, + "messages": [system_msg, user_msg, response], + "active_node": "critic_agent", + } + + # Only append feedback if we're sending back for rework + if route_decision == "rework": + result["feedback_history"] = [ + f"Score: {score}/10\n{feedback}" + ] + + return result diff --git a/backend/agents/master_agent.py b/backend/agents/master_agent.py new file mode 100644 index 0000000..bef87bd --- /dev/null +++ b/backend/agents/master_agent.py @@ -0,0 +1,75 @@ +""" +Master Agent Node — creates and refines drafts based on critic feedback. + +This agent is the primary content creator. On the first iteration it produces +an initial draft. On subsequent iterations it incorporates all feedback from +the feedback_history to improve the draft. +""" + +import os +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import HumanMessage, SystemMessage + +from state import CouncilState + + +_SYSTEM_PROMPT = """You are the Master AI in a council of expert AIs. +Your job is to write high-quality content on the given topic. +When you receive critic feedback, carefully incorporate ALL feedback points +and produce an improved draft. Be thorough and precise.""" + + +def _build_master_prompt(state: CouncilState) -> str: + """Build the user-facing prompt for the master agent based on current state.""" + if not state["feedback_history"]: + return ( + f"Please write a comprehensive, well-structured document on the following topic:\n\n" + f"{state['input_topic']}" + ) + + feedback_block = "\n\n---\n".join( + f"Feedback round {i + 1}:\n{fb}" + for i, fb in enumerate(state["feedback_history"]) + ) + + return ( + f"Topic: {state['input_topic']}\n\n" + f"Your current draft:\n{state['current_draft']}\n\n" + f"The critic has provided the following feedback across {len(state['feedback_history'])} round(s):\n\n" + f"{feedback_block}\n\n" + f"Please produce an improved draft that fully addresses ALL feedback points above." + ) + + +def master_agent_node(state: CouncilState) -> dict: + """ + LangGraph node function for the Master Agent. + + Reads input_topic and feedback_history from state, calls the LLM, + and returns an updated current_draft. + + Args: + state: The current CouncilState. + + Returns: + A dict with updated state fields: current_draft, messages, active_node. + """ + llm = ChatAnthropic( + model="claude-3-5-sonnet-20241022", + api_key=os.environ.get("ANTHROPIC_API_KEY"), + temperature=0.7, + max_tokens=2048, + ) + + system_msg = SystemMessage(content=_SYSTEM_PROMPT) + user_msg = HumanMessage(content=_build_master_prompt(state)) + + response = llm.invoke([system_msg, user_msg]) + draft = response.content + + return { + "current_draft": draft, + "messages": [system_msg, user_msg, response], + "active_node": "master_agent", + "iteration_count": state.get("iteration_count", 0) + 1, + } diff --git a/backend/agents/writer_agent.py b/backend/agents/writer_agent.py new file mode 100644 index 0000000..8f0bb2f --- /dev/null +++ b/backend/agents/writer_agent.py @@ -0,0 +1,63 @@ +""" +Writer Agent Node — final polishing of an approved draft. + +This agent receives a critic-approved draft and produces the final, +publication-ready version with polished formatting and language. +""" + +import os +from langchain_anthropic import ChatAnthropic +from langchain_core.messages import HumanMessage, SystemMessage + +from state import CouncilState + + +_SYSTEM_PROMPT = """You are the Writer AI in a council of expert AIs. +You receive a draft that has already been approved for quality by the Critic AI. +Your job is to give it a final professional polish: + +- Improve sentence flow and readability +- Ensure consistent formatting (headers, bullet points, paragraphs) +- Fix any grammatical or stylistic issues +- Do NOT change the factual content or overall structure +- Preserve all key information from the draft + +Return only the polished document — no meta-commentary.""" + + +def writer_agent_node(state: CouncilState) -> dict: + """ + LangGraph node function for the Writer Agent. + + Receives the approved current_draft and returns a polished final version. + + Args: + state: The current CouncilState. + + Returns: + A dict with the final polished current_draft and updated messages. + """ + llm = ChatAnthropic( + model="claude-3-5-sonnet-20241022", + api_key=os.environ.get("ANTHROPIC_API_KEY"), + temperature=0.4, + max_tokens=4096, + ) + + system_msg = SystemMessage(content=_SYSTEM_PROMPT) + user_msg = HumanMessage( + content=( + f"Please give a final professional polish to this approved document " + f"on the topic '{state['input_topic']}':\n\n" + f"{state['current_draft']}" + ) + ) + + response = llm.invoke([system_msg, user_msg]) + + return { + "current_draft": response.content, + "messages": [system_msg, user_msg, response], + "active_node": "writer_agent", + "route_decision": "done", + } diff --git a/backend/api/__init__.py b/backend/api/__init__.py new file mode 100644 index 0000000..ba15015 --- /dev/null +++ b/backend/api/__init__.py @@ -0,0 +1 @@ +"""API route definitions for CouncilOS.""" diff --git a/backend/api/routes.py b/backend/api/routes.py new file mode 100644 index 0000000..316202e --- /dev/null +++ b/backend/api/routes.py @@ -0,0 +1,135 @@ +""" +REST API routes for CouncilOS. + +Endpoints: + POST /api/councils/run — Start a new council run (async, returns run_id) + GET /api/councils/run/{run_id} — Poll the status/result of a run + GET /api/health — Health check +""" + +import uuid +from typing import Optional +from fastapi import APIRouter, HTTPException, BackgroundTasks +from pydantic import BaseModel, Field + +from services.graph_builder import run_council_async +from api.run_store import run_store + + +router = APIRouter() + + +# --------------------------------------------------------------------------- +# Request / Response Models +# --------------------------------------------------------------------------- + +class CouncilRunRequest(BaseModel): + input_topic: str = Field( + ..., + min_length=1, + max_length=10_000, + description="The user's prompt or document content for the council to work on.", + examples=["Erkläre die wichtigsten Konzepte des maschinellen Lernens für Einsteiger."], + ) + + +class CouncilRunResponse(BaseModel): + run_id: str + status: str # "pending" | "running" | "completed" | "failed" + message: str + + +class CouncilResultResponse(BaseModel): + run_id: str + status: str + final_draft: Optional[str] = None + critic_score: Optional[float] = None + iteration_count: Optional[int] = None + error: Optional[str] = None + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@router.get("/health") +async def health_check(): + """Health check endpoint.""" + return {"status": "ok", "service": "CouncilOS API"} + + +@router.post("/councils/run", response_model=CouncilRunResponse, status_code=202) +async def start_council_run( + request: CouncilRunRequest, + background_tasks: BackgroundTasks, +): + """ + Start a new council run. + + The run executes asynchronously in the background. Poll + GET /api/councils/run/{run_id} for the result, or connect to the + WebSocket at /ws/council/{run_id} for real-time updates. + """ + run_id = str(uuid.uuid4()) + + # Register the run as pending in the in-memory store + run_store.create(run_id, request.input_topic) + + # Schedule the graph execution as a background task + background_tasks.add_task(_execute_run, run_id, request.input_topic) + + return CouncilRunResponse( + run_id=run_id, + status="pending", + message=f"Council run started. Connect to /ws/council/{run_id} for live updates.", + ) + + +@router.get("/councils/run/{run_id}", response_model=CouncilResultResponse) +async def get_council_result(run_id: str): + """ + Retrieve the current status or final result of a council run. + """ + run = run_store.get(run_id) + if run is None: + raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found.") + + return CouncilResultResponse( + run_id=run_id, + status=run["status"], + final_draft=run.get("final_draft"), + critic_score=run.get("critic_score"), + iteration_count=run.get("iteration_count"), + error=run.get("error"), + ) + + +# --------------------------------------------------------------------------- +# Internal helpers +# --------------------------------------------------------------------------- + +async def _execute_run(run_id: str, input_topic: str) -> None: + """ + Background task that runs the LangGraph council and updates the run store. + """ + run_store.update(run_id, {"status": "running"}) + try: + final_state = await run_council_async( + input_topic=input_topic, + run_id=run_id, + on_node_event=lambda nid, node: run_store.update( + nid, {"active_node": node} + ), + ) + run_store.update( + run_id, + { + "status": "completed", + "final_draft": final_state.get("current_draft"), + "critic_score": final_state.get("critic_score"), + "iteration_count": final_state.get("iteration_count"), + "active_node": "done", + }, + ) + except Exception as exc: # noqa: BLE001 + run_store.update(run_id, {"status": "failed", "error": str(exc)}) diff --git a/backend/api/run_store.py b/backend/api/run_store.py new file mode 100644 index 0000000..caeeb54 --- /dev/null +++ b/backend/api/run_store.py @@ -0,0 +1,47 @@ +""" +In-memory run store for Phase 1. + +Tracks the status and results of council runs by run_id. This is intentionally +simple for Phase 1. Phase 3+ will replace this with a PostgreSQL-backed store. +""" + +from typing import Any, Dict, Optional +import threading + + +class RunStore: + """Thread-safe in-memory store for council run state.""" + + def __init__(self) -> None: + self._store: Dict[str, Dict[str, Any]] = {} + self._lock = threading.Lock() + + def create(self, run_id: str, input_topic: str) -> None: + with self._lock: + self._store[run_id] = { + "run_id": run_id, + "input_topic": input_topic, + "status": "pending", + "final_draft": None, + "critic_score": None, + "iteration_count": None, + "active_node": None, + "error": None, + } + + def get(self, run_id: str) -> Optional[Dict[str, Any]]: + with self._lock: + return self._store.get(run_id) + + def update(self, run_id: str, updates: Dict[str, Any]) -> None: + with self._lock: + if run_id in self._store: + self._store[run_id].update(updates) + + def delete(self, run_id: str) -> None: + with self._lock: + self._store.pop(run_id, None) + + +# Singleton instance shared across the application +run_store = RunStore() diff --git a/backend/api/websocket.py b/backend/api/websocket.py new file mode 100644 index 0000000..94e87fa --- /dev/null +++ b/backend/api/websocket.py @@ -0,0 +1,128 @@ +""" +WebSocket endpoint for real-time agent status updates. + +Clients connect to /ws/council/{run_id} and receive JSON events whenever +an agent node becomes active. This powers the live diagram pulsing in the frontend. + +Event format: + {"event": "node_start", "run_id": "...", "node": "master_agent", "iteration": 2} + {"event": "node_complete", "run_id": "...", "node": "critic_agent", "score": 6.5} + {"event": "run_complete", "run_id": "...", "final_draft": "..."} + {"event": "run_failed", "run_id": "...", "error": "..."} +""" + +import asyncio +import json +from fastapi import APIRouter, WebSocket, WebSocketDisconnect + +from api.run_store import run_store + + +ws_router = APIRouter() + +# Active WebSocket connections keyed by run_id +_connections: dict[str, list[WebSocket]] = {} + + +async def broadcast_event(run_id: str, event: dict) -> None: + """ + Send an event to all WebSocket clients subscribed to a run_id. + + Args: + run_id: The council run identifier. + event: The event dict to serialize and broadcast. + """ + clients = _connections.get(run_id, []) + disconnected = [] + + for ws in clients: + try: + await ws.send_text(json.dumps(event)) + except Exception: # noqa: BLE001 + disconnected.append(ws) + + # Clean up dead connections + for ws in disconnected: + clients.remove(ws) + + +@ws_router.websocket("/ws/council/{run_id}") +async def council_websocket(websocket: WebSocket, run_id: str): + """ + WebSocket endpoint for live council run updates. + + On connect: sends the current run status immediately. + While running: polls the run store and pushes status changes. + On complete/failed: sends a final event and closes the connection. + """ + await websocket.accept() + + # Register this client + if run_id not in _connections: + _connections[run_id] = [] + _connections[run_id].append(websocket) + + try: + # Send current state immediately on connect + run = run_store.get(run_id) + if run is None: + await websocket.send_text( + json.dumps({"event": "error", "message": f"Run '{run_id}' not found."}) + ) + return + + await websocket.send_text( + json.dumps({"event": "connected", "run_id": run_id, "status": run["status"]}) + ) + + # Poll for status changes and push updates + last_node = None + while True: + run = run_store.get(run_id) + if run is None: + break + + current_node = run.get("active_node") + if current_node and current_node != last_node: + await websocket.send_text( + json.dumps({ + "event": "node_active", + "run_id": run_id, + "node": current_node, + "iteration": run.get("iteration_count"), + }) + ) + last_node = current_node + + if run["status"] == "completed": + await websocket.send_text( + json.dumps({ + "event": "run_complete", + "run_id": run_id, + "final_draft": run.get("final_draft"), + "critic_score": run.get("critic_score"), + "iteration_count": run.get("iteration_count"), + }) + ) + break + + if run["status"] == "failed": + await websocket.send_text( + json.dumps({ + "event": "run_failed", + "run_id": run_id, + "error": run.get("error"), + }) + ) + break + + await asyncio.sleep(0.5) # 500ms polling interval + + except WebSocketDisconnect: + pass + finally: + if run_id in _connections: + try: + _connections[run_id].remove(websocket) + except ValueError: + pass diff --git a/backend/main.py b/backend/main.py new file mode 100644 index 0000000..1c2f4d3 --- /dev/null +++ b/backend/main.py @@ -0,0 +1,59 @@ +""" +CouncilOS — FastAPI application entrypoint. + +Start the server: + uvicorn main:app --reload --port 8000 + +API Overview: + POST /api/councils/run — Start a council run + GET /api/councils/run/{run_id} — Poll run status/result + GET /api/health — Health check + WS /ws/council/{run_id} — Real-time agent status events +""" + +from contextlib import asynccontextmanager +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware + +from api.routes import router +from api.websocket import ws_router + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan: startup and shutdown logic.""" + print("CouncilOS API starting up...") + yield + print("CouncilOS API shutting down...") + + +app = FastAPI( + title="CouncilOS API", + description=( + "Backend for the CouncilOS multi-agent AI pipeline platform. " + "Orchestrates LangGraph council runs and streams real-time agent " + "status via WebSockets." + ), + version="0.1.0", + lifespan=lifespan, +) + +# CORS — allow all origins in development; tighten in production +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Mount REST routes under /api prefix +app.include_router(router, prefix="/api") + +# Mount WebSocket routes (no prefix — path is /ws/council/{run_id}) +app.include_router(ws_router) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) diff --git a/backend/pytest.ini b/backend/pytest.ini new file mode 100644 index 0000000..f40fe2d --- /dev/null +++ b/backend/pytest.ini @@ -0,0 +1,6 @@ +[pytest] +testpaths = tests +asyncio_mode = auto +python_files = test_*.py +python_classes = Test* +python_functions = test_* diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..25f1a97 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,38 @@ +# Core AI orchestration +langgraph>=0.2.0 +langchain>=0.2.0 +langchain-anthropic>=0.1.0 +langchain-openai>=0.1.0 + +# Backend API +fastapi>=0.111.0 +uvicorn[standard]>=0.30.0 +websockets>=12.0 +python-multipart>=0.0.9 + +# Database +asyncpg>=0.29.0 +sqlalchemy[asyncio]>=2.0.0 +alembic>=1.13.0 + +# Vector DB (PDF tool) +chromadb>=0.5.0 +pypdf>=4.0.0 + +# Search tool +tavily-python>=0.3.0 + +# Utilities +python-dotenv>=1.0.0 +pydantic>=2.0.0 +pydantic-settings>=2.0.0 + +# Linting and formatting +ruff>=0.4.0 +black>=24.0.0 + +# Testing +pytest>=8.0.0 +pytest-asyncio>=0.23.0 +pytest-mock>=3.14.0 +httpx>=0.27.0 diff --git a/backend/services/__init__.py b/backend/services/__init__.py new file mode 100644 index 0000000..76c3520 --- /dev/null +++ b/backend/services/__init__.py @@ -0,0 +1 @@ +"""Service modules for CouncilOS backend.""" diff --git a/backend/services/graph_builder.py b/backend/services/graph_builder.py new file mode 100644 index 0000000..d386ca7 --- /dev/null +++ b/backend/services/graph_builder.py @@ -0,0 +1,131 @@ +""" +Graph Builder — constructs the LangGraph execution graph for council runs. + +Phase 1: Hard-coded test graph: + User Input → Master Agent → Critic Agent → (score < 8: back to Master | score ≥ 8: Writer Agent) + +Phase 3 (future): This module will be extended to build graphs dynamically +from JSON blueprints stored in PostgreSQL. +""" + +import asyncio +from typing import Any, Callable, Optional +from langgraph.graph import StateGraph, END + +from state import CouncilState +from agents import master_agent_node, critic_agent_node, writer_agent_node + + +def route_after_critic(state: CouncilState) -> str: + """ + Conditional edge function: determines next node after the critic. + + Returns: + "master_agent" if the critic wants rework. + "writer_agent" if the critic approves the draft. + """ + decision = state.get("route_decision", "rework") + if decision == "approve": + return "writer_agent" + return "master_agent" + + +def build_council_graph( + on_node_start: Optional[Callable[[str, str], Any]] = None, +) -> StateGraph: + """ + Build and compile the Phase 1 hard-coded council graph. + + Graph topology: + master_agent → critic_agent → (conditional) → master_agent | writer_agent → END + + Args: + on_node_start: Optional async callback invoked when a node begins execution. + Signature: (run_id: str, node_name: str) -> Any + Used to emit WebSocket events for real-time UI updates. + + Returns: + A compiled LangGraph StateGraph ready for invocation. + """ + graph = StateGraph(CouncilState) + + # Register agent nodes + graph.add_node("master_agent", master_agent_node) + graph.add_node("critic_agent", critic_agent_node) + graph.add_node("writer_agent", writer_agent_node) + + # Define edges + graph.set_entry_point("master_agent") + graph.add_edge("master_agent", "critic_agent") + + # Conditional edge: critic decides whether to rework or approve + graph.add_conditional_edges( + "critic_agent", + route_after_critic, + { + "master_agent": "master_agent", + "writer_agent": "writer_agent", + }, + ) + + # Writer is the terminal node + graph.add_edge("writer_agent", END) + + return graph.compile() + + +def create_initial_state( + input_topic: str, + run_id: str, +) -> CouncilState: + """ + Create a fresh CouncilState for a new council run. + + Args: + input_topic: The user's prompt or document content. + run_id: Unique identifier for this run (used in WebSocket events). + + Returns: + An initialized CouncilState dict. + """ + return CouncilState( + input_topic=input_topic, + current_draft="", + feedback_history=[], + route_decision="", + messages=[], + iteration_count=0, + critic_score=None, + run_id=run_id, + active_node="", + ) + + +async def run_council_async( + input_topic: str, + run_id: str, + on_node_event: Optional[Callable[[str, str], Any]] = None, +) -> CouncilState: + """ + Execute a full council run asynchronously. + + Args: + input_topic: The user's prompt. + run_id: Unique identifier for this run. + on_node_event: Optional callback for WebSocket node events. + + Returns: + The final CouncilState after the writer agent completes. + """ + graph = build_council_graph(on_node_start=on_node_event) + initial_state = create_initial_state(input_topic, run_id) + + # LangGraph's invoke is synchronous — run it in a thread pool to avoid + # blocking the FastAPI event loop + loop = asyncio.get_event_loop() + final_state = await loop.run_in_executor( + None, + lambda: graph.invoke(initial_state), + ) + + return final_state diff --git a/backend/state.py b/backend/state.py new file mode 100644 index 0000000..94756bb --- /dev/null +++ b/backend/state.py @@ -0,0 +1,47 @@ +""" +CouncilState — the central data structure passed between all agents in LangGraph. + +All agents must read from and write to this TypedDict. Agents must not store +state internally; everything passes through CouncilState. +""" + +from typing import Annotated, List, Optional +import operator +from typing_extensions import TypedDict + + +class CouncilState(TypedDict): + """ + The global state shared across all agents in a council run. + + Fields: + input_topic: The user's original prompt or uploaded PDF content. + current_draft: The document currently being worked on. + feedback_history: All critic feedback accumulated across loop iterations. + Agents append here — never overwrite. + route_decision: Routing signal used by conditional edges. + Values: "rework" | "approve" | custom strings. + messages: LLM message history (system prompts + responses). + Uses operator.add reducer so messages accumulate. + iteration_count: Tracks how many rework loops have occurred. + critic_score: The numeric score (0–10) assigned by the critic agent. + run_id: Unique identifier for this council run (for WebSocket events). + active_node: Name of the currently executing agent node (for UI updates). + """ + + input_topic: str + current_draft: str + feedback_history: Annotated[List[str], operator.add] + route_decision: str + messages: Annotated[list, operator.add] + iteration_count: int + critic_score: Optional[float] + run_id: str + active_node: str + + +# Approval threshold: critic score must reach this value to exit the loop +APPROVAL_THRESHOLD = 8.0 + +# Safety limit: maximum number of rework iterations before forcing approval +MAX_ITERATIONS = 5 diff --git a/backend/tests/__init__.py b/backend/tests/__init__.py new file mode 100644 index 0000000..88907d2 --- /dev/null +++ b/backend/tests/__init__.py @@ -0,0 +1 @@ +"""Pytest test suite for CouncilOS backend.""" diff --git a/backend/tests/test_api.py b/backend/tests/test_api.py new file mode 100644 index 0000000..8a668f1 --- /dev/null +++ b/backend/tests/test_api.py @@ -0,0 +1,99 @@ +""" +Integration tests for the FastAPI REST endpoints. + +Uses httpx.AsyncClient with the TestClient pattern — no real LLM calls. +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import pytest +from unittest.mock import AsyncMock, patch +from fastapi.testclient import TestClient + +from main import app +from api.run_store import run_store + + +@pytest.fixture(autouse=True) +def clean_run_store(): + """Reset the run store before each test.""" + run_store._store.clear() + yield + run_store._store.clear() + + +client = TestClient(app) + + +class TestHealthEndpoint: + def test_health_check_returns_ok(self): + response = client.get("/api/health") + assert response.status_code == 200 + assert response.json()["status"] == "ok" + + +class TestStartCouncilRun: + def test_start_run_returns_202_with_run_id(self): + with patch("api.routes._execute_run", new_callable=AsyncMock): + response = client.post( + "/api/councils/run", + json={"input_topic": "Erkläre maschinelles Lernen"}, + ) + assert response.status_code == 202 + data = response.json() + assert "run_id" in data + assert data["status"] == "pending" + assert len(data["run_id"]) == 36 # UUID format + + def test_start_run_rejects_empty_topic(self): + response = client.post("/api/councils/run", json={"input_topic": ""}) + assert response.status_code == 422 # Pydantic validation error + + def test_start_run_rejects_missing_topic(self): + response = client.post("/api/councils/run", json={}) + assert response.status_code == 422 + + +class TestGetCouncilResult: + def test_get_pending_run(self): + run_store.create("test-run-id", "Test topic") + response = client.get("/api/councils/run/test-run-id") + assert response.status_code == 200 + data = response.json() + assert data["run_id"] == "test-run-id" + assert data["status"] == "pending" + + def test_get_completed_run(self): + run_store.create("completed-run", "Topic") + run_store.update("completed-run", { + "status": "completed", + "final_draft": "Final polished document.", + "critic_score": 9.0, + "iteration_count": 2, + }) + response = client.get("/api/councils/run/completed-run") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "completed" + assert data["final_draft"] == "Final polished document." + assert data["critic_score"] == 9.0 + assert data["iteration_count"] == 2 + + def test_get_nonexistent_run_returns_404(self): + response = client.get("/api/councils/run/does-not-exist") + assert response.status_code == 404 + + def test_get_failed_run(self): + run_store.create("failed-run", "Topic") + run_store.update("failed-run", { + "status": "failed", + "error": "API connection timeout", + }) + response = client.get("/api/councils/run/failed-run") + assert response.status_code == 200 + data = response.json() + assert data["status"] == "failed" + assert "timeout" in data["error"] diff --git a/backend/tests/test_routing.py b/backend/tests/test_routing.py new file mode 100644 index 0000000..dfa8378 --- /dev/null +++ b/backend/tests/test_routing.py @@ -0,0 +1,211 @@ +""" +Tests for the LangGraph routing logic. + +All LLM calls are mocked — no real API calls are made in these tests. +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +import pytest +from unittest.mock import patch, MagicMock + +from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS +from services.graph_builder import route_after_critic, create_initial_state + + +class TestRouteAfterCritic: + """Unit tests for the conditional edge routing function.""" + + def _make_state(self, route_decision: str, iteration_count: int = 1) -> CouncilState: + state = create_initial_state("test topic", "test-run") + state["route_decision"] = route_decision + state["iteration_count"] = iteration_count + return state + + def test_approve_routes_to_writer(self): + state = self._make_state("approve") + assert route_after_critic(state) == "writer_agent" + + def test_rework_routes_to_master(self): + state = self._make_state("rework") + assert route_after_critic(state) == "master_agent" + + def test_empty_decision_defaults_to_rework(self): + state = self._make_state("") + assert route_after_critic(state) == "master_agent" + + def test_unknown_decision_defaults_to_rework(self): + state = self._make_state("unknown_value") + assert route_after_critic(state) == "master_agent" + + +class TestCriticAgentParsing: + """Unit tests for the critic agent's response parser.""" + + def test_parse_valid_approve_response(self): + from agents.critic_agent import _parse_critic_response + + content = "SCORE: 9\nVERDICT: approve\nFEEDBACK:\nExcellent work." + score, verdict, feedback = _parse_critic_response(content) + assert score == 9.0 + assert verdict == "approve" + assert "Excellent" in feedback + + def test_parse_valid_rework_response(self): + from agents.critic_agent import _parse_critic_response + + content = "SCORE: 5\nVERDICT: rework\nFEEDBACK:\nNeeds more detail." + score, verdict, feedback = _parse_critic_response(content) + assert score == 5.0 + assert verdict == "rework" + assert "detail" in feedback + + def test_parse_score_clamped_to_0_10(self): + from agents.critic_agent import _parse_critic_response + + content = "SCORE: 15\nVERDICT: approve\nFEEDBACK:\nToo high score." + score, verdict, feedback = _parse_critic_response(content) + assert score == 10.0 + + def test_parse_missing_score_defaults_to_0(self): + from agents.critic_agent import _parse_critic_response + + content = "No structured response at all." + score, verdict, feedback = _parse_critic_response(content) + assert score == 0.0 + assert verdict == "rework" + + def test_threshold_boundary_exactly_8_approves(self): + from agents.critic_agent import _parse_critic_response + + content = f"SCORE: {APPROVAL_THRESHOLD}\nVERDICT: approve\nFEEDBACK:\nGood." + score, verdict, _ = _parse_critic_response(content) + assert score == APPROVAL_THRESHOLD + assert verdict == "approve" + + +class TestMasterAgentPromptBuilding: + """Unit tests for the master agent's prompt construction.""" + + def test_first_iteration_prompt_has_no_feedback_block(self): + from agents.master_agent import _build_master_prompt + + state = create_initial_state("Test topic", "run-1") + prompt = _build_master_prompt(state) + assert "Test topic" in prompt + assert "feedback" not in prompt.lower() or "Feedback" not in prompt + + def test_rework_prompt_includes_feedback(self): + from agents.master_agent import _build_master_prompt + + state = create_initial_state("Test topic", "run-1") + state["current_draft"] = "My draft" + state["feedback_history"] = ["Score: 5/10\nNeeds more structure."] + prompt = _build_master_prompt(state) + assert "My draft" in prompt + assert "Needs more structure" in prompt + + def test_rework_prompt_includes_all_feedback_rounds(self): + from agents.master_agent import _build_master_prompt + + state = create_initial_state("Topic", "run-2") + state["current_draft"] = "Draft v2" + state["feedback_history"] = ["First feedback", "Second feedback"] + prompt = _build_master_prompt(state) + assert "First feedback" in prompt + assert "Second feedback" in prompt + assert "2 round" in prompt + + +class TestCriticSafetyValve: + """Tests for the MAX_ITERATIONS safety valve in the critic agent.""" + + def test_safety_valve_forces_approve_at_max_iterations(self): + from agents.critic_agent import critic_agent_node + + state = create_initial_state("topic", "run-safety") + state["iteration_count"] = MAX_ITERATIONS + state["current_draft"] = "Some draft" + + result = critic_agent_node(state) + + assert result["route_decision"] == "approve" + assert result["critic_score"] == APPROVAL_THRESHOLD + + def test_safety_valve_not_triggered_below_max(self): + """Below MAX_ITERATIONS the real LLM call would happen — mock it.""" + from agents.critic_agent import critic_agent_node + + mock_response = MagicMock() + mock_response.content = "SCORE: 4\nVERDICT: rework\nFEEDBACK:\nNeeds work." + + with patch("agents.critic_agent.ChatAnthropic") as MockLLM: + MockLLM.return_value.invoke.return_value = mock_response + + state = create_initial_state("topic", "run-below-max") + state["iteration_count"] = MAX_ITERATIONS - 1 + state["current_draft"] = "Draft" + + result = critic_agent_node(state) + + assert result["route_decision"] == "rework" + assert result["critic_score"] == 4.0 + + +class TestMasterAgentNode: + """Integration-style tests for master_agent_node with mocked LLM.""" + + def test_master_agent_returns_draft(self): + from agents.master_agent import master_agent_node + + mock_response = MagicMock() + mock_response.content = "This is a generated draft about AI." + + with patch("agents.master_agent.ChatAnthropic") as MockLLM: + MockLLM.return_value.invoke.return_value = mock_response + + state = create_initial_state("AI basics", "run-master-1") + result = master_agent_node(state) + + assert result["current_draft"] == "This is a generated draft about AI." + assert result["active_node"] == "master_agent" + assert result["iteration_count"] == 1 + + def test_master_agent_increments_iteration_count(self): + from agents.master_agent import master_agent_node + + mock_response = MagicMock() + mock_response.content = "Draft" + + with patch("agents.master_agent.ChatAnthropic") as MockLLM: + MockLLM.return_value.invoke.return_value = mock_response + + state = create_initial_state("topic", "run-master-2") + state["iteration_count"] = 3 + result = master_agent_node(state) + + assert result["iteration_count"] == 4 + + +class TestWriterAgentNode: + """Tests for writer_agent_node with mocked LLM.""" + + def test_writer_returns_polished_draft(self): + from agents.writer_agent import writer_agent_node + + mock_response = MagicMock() + mock_response.content = "Polished and professional document." + + with patch("agents.writer_agent.ChatAnthropic") as MockLLM: + MockLLM.return_value.invoke.return_value = mock_response + + state = create_initial_state("Machine Learning", "run-writer-1") + state["current_draft"] = "Raw draft content" + result = writer_agent_node(state) + + assert result["current_draft"] == "Polished and professional document." + assert result["active_node"] == "writer_agent" + assert result["route_decision"] == "done" diff --git a/backend/tests/test_run_store.py b/backend/tests/test_run_store.py new file mode 100644 index 0000000..4da267d --- /dev/null +++ b/backend/tests/test_run_store.py @@ -0,0 +1,55 @@ +"""Tests for the in-memory RunStore.""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from api.run_store import RunStore + + +class TestRunStore: + def setup_method(self): + self.store = RunStore() + + def test_create_and_get(self): + self.store.create("run-1", "Test topic") + run = self.store.get("run-1") + assert run is not None + assert run["run_id"] == "run-1" + assert run["input_topic"] == "Test topic" + assert run["status"] == "pending" + + def test_get_nonexistent_returns_none(self): + assert self.store.get("nonexistent") is None + + def test_update_status(self): + self.store.create("run-2", "Topic") + self.store.update("run-2", {"status": "running"}) + assert self.store.get("run-2")["status"] == "running" + + def test_update_nonexistent_is_noop(self): + """Updating a non-existent run should not raise.""" + self.store.update("ghost-run", {"status": "running"}) + + def test_delete(self): + self.store.create("run-3", "Topic") + self.store.delete("run-3") + assert self.store.get("run-3") is None + + def test_delete_nonexistent_is_noop(self): + self.store.delete("ghost-run") + + def test_update_partial_fields(self): + self.store.create("run-4", "Topic") + self.store.update("run-4", {"status": "completed", "final_draft": "Result text"}) + run = self.store.get("run-4") + assert run["status"] == "completed" + assert run["final_draft"] == "Result text" + assert run["input_topic"] == "Topic" # original field preserved + + def test_multiple_runs_independent(self): + self.store.create("run-a", "Topic A") + self.store.create("run-b", "Topic B") + self.store.update("run-a", {"status": "running"}) + assert self.store.get("run-b")["status"] == "pending" diff --git a/backend/tests/test_state.py b/backend/tests/test_state.py new file mode 100644 index 0000000..eeff0ed --- /dev/null +++ b/backend/tests/test_state.py @@ -0,0 +1,44 @@ +"""Tests for CouncilState structure and graph_builder helpers.""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS +from services.graph_builder import create_initial_state + + +class TestCouncilState: + def test_initial_state_fields(self): + state = create_initial_state("Test topic", "run-001") + assert state["input_topic"] == "Test topic" + assert state["current_draft"] == "" + assert state["feedback_history"] == [] + assert state["route_decision"] == "" + assert state["messages"] == [] + assert state["iteration_count"] == 0 + assert state["critic_score"] is None + assert state["run_id"] == "run-001" + assert state["active_node"] == "" + + def test_approval_threshold_value(self): + assert APPROVAL_THRESHOLD == 8.0 + + def test_max_iterations_value(self): + assert MAX_ITERATIONS == 5 + + def test_state_is_typed_dict(self): + """CouncilState should be instantiable as a plain dict.""" + state: CouncilState = { + "input_topic": "AI", + "current_draft": "draft", + "feedback_history": ["fb1"], + "route_decision": "rework", + "messages": [], + "iteration_count": 1, + "critic_score": 6.0, + "run_id": "x", + "active_node": "critic_agent", + } + assert state["critic_score"] == 6.0 diff --git a/backend/tools/__init__.py b/backend/tools/__init__.py new file mode 100644 index 0000000..15761c9 --- /dev/null +++ b/backend/tools/__init__.py @@ -0,0 +1,7 @@ +""" +Agent tools for CouncilOS. + +Phase 4 will add: +- web_search_tool: Tavily Search API wrapper +- pdf_reader_tool: PyPDF + ChromaDB vector store wrapper +""" diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..3eef50d --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,54 @@ +version: "3.9" + +# CouncilOS — local development environment +# Usage: +# docker compose up -d # Start all services +# docker compose down # Stop all services +# docker compose logs -f api # Follow API logs + +services: + # --------------------------------------------------------------------------- + # PostgreSQL — stores council blueprints (used from Phase 2) + # --------------------------------------------------------------------------- + db: + image: postgres:16-alpine + restart: unless-stopped + environment: + POSTGRES_DB: councilOS + POSTGRES_USER: user + POSTGRES_PASSWORD: password + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U user -d councilOS"] + interval: 5s + timeout: 5s + retries: 5 + + # --------------------------------------------------------------------------- + # CouncilOS API — FastAPI + LangGraph backend + # --------------------------------------------------------------------------- + api: + build: + context: ./backend + dockerfile: Dockerfile + restart: unless-stopped + ports: + - "8000:8000" + env_file: + - .env + environment: + DATABASE_URL: postgresql+asyncpg://user:password@db:5432/councilOS + volumes: + - ./backend:/app + - chroma_data:/app/chroma_db + depends_on: + db: + condition: service_healthy + command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload + +volumes: + postgres_data: + chroma_data: