Merge pull request #4 from Kenearos/claude/start-implementation-2XWxH

Implement Phase 1: LangGraph backend MVP
This commit is contained in:
Kenearos 2026-02-20 17:51:01 +01:00 committed by GitHub
commit 06aec41a8a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
24 changed files with 1472 additions and 0 deletions

45
.env.example Normal file
View file

@ -0,0 +1,45 @@
# CouncilOS — Environment Variables Template
# Copy this file to .env and fill in your actual values.
# NEVER commit the .env file to version control.
# =============================================================================
# LLM API Keys
# =============================================================================
# Anthropic Claude API key (required)
ANTHROPIC_API_KEY=
# OpenAI GPT-4o API key (optional for Phase 1, required from Phase 3)
OPENAI_API_KEY=
# Tavily Search API key (required for Phase 4 web-search tool)
TAVILY_API_KEY=
# =============================================================================
# Database
# =============================================================================
# PostgreSQL connection string (required from Phase 2)
DATABASE_URL=postgresql+asyncpg://user:password@localhost:5432/councilOS
# =============================================================================
# Vector Database (ChromaDB)
# =============================================================================
# Local directory to persist ChromaDB embeddings (required for Phase 4 PDF tool)
CHROMA_PERSIST_DIR=./chroma_db
# =============================================================================
# Application Settings
# =============================================================================
# FastAPI server host and port
HOST=0.0.0.0
PORT=8000
# Log level: DEBUG | INFO | WARNING | ERROR
LOG_LEVEL=INFO
# CORS: comma-separated list of allowed frontend origins in production
# Example: https://my-app.vercel.app,https://www.my-domain.com
CORS_ORIGINS=http://localhost:3000

72
.gitignore vendored Normal file
View file

@ -0,0 +1,72 @@
# =============================================================================
# Environment & Secrets — NEVER commit these
# =============================================================================
.env
.env.local
.env.*.local
*.pem
*.key
secrets/
# =============================================================================
# Python
# =============================================================================
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
.venv/
venv/
env/
ENV/
*.egg-info/
dist/
build/
.eggs/
pip-wheel-metadata/
.mypy_cache/
.ruff_cache/
.pytest_cache/
htmlcov/
.coverage
coverage.xml
*.cover
# =============================================================================
# Node / Frontend
# =============================================================================
node_modules/
.next/
out/
.nuxt/
dist/
.cache/
*.log
npm-debug.log*
yarn-debug.log*
yarn-error.log*
.pnpm-debug.log*
# =============================================================================
# Database & Vector Store
# =============================================================================
chroma_db/
*.sqlite3
*.db
postgres_data/
# =============================================================================
# IDE & OS
# =============================================================================
.idea/
.vscode/
*.swp
*.swo
.DS_Store
Thumbs.db
# =============================================================================
# Docker
# =============================================================================
.docker/

19
backend/Dockerfile Normal file
View file

@ -0,0 +1,19 @@
FROM python:3.11-slim
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential \
&& rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

View file

@ -0,0 +1,7 @@
"""Agent node functions for CouncilOS."""
from .master_agent import master_agent_node
from .critic_agent import critic_agent_node
from .writer_agent import writer_agent_node
__all__ = ["master_agent_node", "critic_agent_node", "writer_agent_node"]

View file

@ -0,0 +1,127 @@
"""
Critic Agent Node evaluates the current draft and decides whether to approve or rework.
The critic scores the draft from 010 and returns structured feedback.
If the score meets APPROVAL_THRESHOLD, route_decision is set to "approve".
Otherwise it is set to "rework" and the feedback is appended to feedback_history.
"""
import os
import re
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS
_SYSTEM_PROMPT = """You are the Critic AI in a council of expert AIs.
Your job is to rigorously evaluate the quality of a draft document.
You must respond in EXACTLY this format no deviations:
SCORE: <integer 0-10>
VERDICT: <"approve" if score >= 8, otherwise "rework">
FEEDBACK:
<detailed, actionable feedback explaining what must be improved>
Scoring criteria:
- 03: Poor structure, major factual gaps, incoherent
- 46: Adequate but needs significant improvement
- 7: Good but has notable weaknesses
- 89: High quality, minor improvements possible
- 10: Exceptional, publication-ready
Be strict. Only award 8+ if the document genuinely meets high quality standards."""
def _parse_critic_response(content: str) -> tuple[float, str, str]:
"""
Parse the structured critic response.
Returns:
(score, verdict, feedback) tuple.
Falls back to ("rework", full content) on parse failure.
"""
score_match = re.search(r"SCORE:\s*(\d+(?:\.\d+)?)", content)
verdict_match = re.search(r"VERDICT:\s*(approve|rework)", content, re.IGNORECASE)
feedback_match = re.search(r"FEEDBACK:\s*(.*)", content, re.DOTALL)
score = float(score_match.group(1)) if score_match else 0.0
verdict = verdict_match.group(1).lower() if verdict_match else "rework"
feedback = feedback_match.group(1).strip() if feedback_match else content.strip()
# Clamp score to 010
score = max(0.0, min(10.0, score))
return score, verdict, feedback
def critic_agent_node(state: CouncilState) -> dict:
"""
LangGraph node function for the Critic Agent.
Reads current_draft from state, evaluates it, and returns:
- route_decision: "approve" or "rework"
- critic_score: numeric score
- feedback_history: appended with new feedback (if rework)
- active_node: "critic_agent"
Safety valve: if iteration_count >= MAX_ITERATIONS, force approval
to prevent infinite loops.
Args:
state: The current CouncilState.
Returns:
A dict with updated state fields.
"""
# Safety valve: prevent infinite loops
if state.get("iteration_count", 0) >= MAX_ITERATIONS:
return {
"route_decision": "approve",
"critic_score": APPROVAL_THRESHOLD,
"feedback_history": [
f"[Auto-approved after {MAX_ITERATIONS} iterations]"
],
"messages": [],
"active_node": "critic_agent",
}
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key=os.environ.get("ANTHROPIC_API_KEY"),
temperature=0.2, # Low temperature for consistent evaluation
max_tokens=1024,
)
system_msg = SystemMessage(content=_SYSTEM_PROMPT)
user_msg = HumanMessage(
content=(
f"Please evaluate this draft on the topic '{state['input_topic']}':\n\n"
f"{state['current_draft']}"
)
)
response = llm.invoke([system_msg, user_msg])
score, verdict, feedback = _parse_critic_response(response.content)
# Override verdict based on threshold to ensure consistency
if score >= APPROVAL_THRESHOLD:
route_decision = "approve"
else:
route_decision = "rework"
result: dict = {
"critic_score": score,
"route_decision": route_decision,
"messages": [system_msg, user_msg, response],
"active_node": "critic_agent",
}
# Only append feedback if we're sending back for rework
if route_decision == "rework":
result["feedback_history"] = [
f"Score: {score}/10\n{feedback}"
]
return result

View file

@ -0,0 +1,75 @@
"""
Master Agent Node creates and refines drafts based on critic feedback.
This agent is the primary content creator. On the first iteration it produces
an initial draft. On subsequent iterations it incorporates all feedback from
the feedback_history to improve the draft.
"""
import os
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from state import CouncilState
_SYSTEM_PROMPT = """You are the Master AI in a council of expert AIs.
Your job is to write high-quality content on the given topic.
When you receive critic feedback, carefully incorporate ALL feedback points
and produce an improved draft. Be thorough and precise."""
def _build_master_prompt(state: CouncilState) -> str:
"""Build the user-facing prompt for the master agent based on current state."""
if not state["feedback_history"]:
return (
f"Please write a comprehensive, well-structured document on the following topic:\n\n"
f"{state['input_topic']}"
)
feedback_block = "\n\n---\n".join(
f"Feedback round {i + 1}:\n{fb}"
for i, fb in enumerate(state["feedback_history"])
)
return (
f"Topic: {state['input_topic']}\n\n"
f"Your current draft:\n{state['current_draft']}\n\n"
f"The critic has provided the following feedback across {len(state['feedback_history'])} round(s):\n\n"
f"{feedback_block}\n\n"
f"Please produce an improved draft that fully addresses ALL feedback points above."
)
def master_agent_node(state: CouncilState) -> dict:
"""
LangGraph node function for the Master Agent.
Reads input_topic and feedback_history from state, calls the LLM,
and returns an updated current_draft.
Args:
state: The current CouncilState.
Returns:
A dict with updated state fields: current_draft, messages, active_node.
"""
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key=os.environ.get("ANTHROPIC_API_KEY"),
temperature=0.7,
max_tokens=2048,
)
system_msg = SystemMessage(content=_SYSTEM_PROMPT)
user_msg = HumanMessage(content=_build_master_prompt(state))
response = llm.invoke([system_msg, user_msg])
draft = response.content
return {
"current_draft": draft,
"messages": [system_msg, user_msg, response],
"active_node": "master_agent",
"iteration_count": state.get("iteration_count", 0) + 1,
}

View file

@ -0,0 +1,63 @@
"""
Writer Agent Node final polishing of an approved draft.
This agent receives a critic-approved draft and produces the final,
publication-ready version with polished formatting and language.
"""
import os
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage
from state import CouncilState
_SYSTEM_PROMPT = """You are the Writer AI in a council of expert AIs.
You receive a draft that has already been approved for quality by the Critic AI.
Your job is to give it a final professional polish:
- Improve sentence flow and readability
- Ensure consistent formatting (headers, bullet points, paragraphs)
- Fix any grammatical or stylistic issues
- Do NOT change the factual content or overall structure
- Preserve all key information from the draft
Return only the polished document no meta-commentary."""
def writer_agent_node(state: CouncilState) -> dict:
"""
LangGraph node function for the Writer Agent.
Receives the approved current_draft and returns a polished final version.
Args:
state: The current CouncilState.
Returns:
A dict with the final polished current_draft and updated messages.
"""
llm = ChatAnthropic(
model="claude-3-5-sonnet-20241022",
api_key=os.environ.get("ANTHROPIC_API_KEY"),
temperature=0.4,
max_tokens=4096,
)
system_msg = SystemMessage(content=_SYSTEM_PROMPT)
user_msg = HumanMessage(
content=(
f"Please give a final professional polish to this approved document "
f"on the topic '{state['input_topic']}':\n\n"
f"{state['current_draft']}"
)
)
response = llm.invoke([system_msg, user_msg])
return {
"current_draft": response.content,
"messages": [system_msg, user_msg, response],
"active_node": "writer_agent",
"route_decision": "done",
}

1
backend/api/__init__.py Normal file
View file

@ -0,0 +1 @@
"""API route definitions for CouncilOS."""

135
backend/api/routes.py Normal file
View file

@ -0,0 +1,135 @@
"""
REST API routes for CouncilOS.
Endpoints:
POST /api/councils/run Start a new council run (async, returns run_id)
GET /api/councils/run/{run_id} Poll the status/result of a run
GET /api/health Health check
"""
import uuid
from typing import Optional
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from services.graph_builder import run_council_async
from api.run_store import run_store
router = APIRouter()
# ---------------------------------------------------------------------------
# Request / Response Models
# ---------------------------------------------------------------------------
class CouncilRunRequest(BaseModel):
input_topic: str = Field(
...,
min_length=1,
max_length=10_000,
description="The user's prompt or document content for the council to work on.",
examples=["Erkläre die wichtigsten Konzepte des maschinellen Lernens für Einsteiger."],
)
class CouncilRunResponse(BaseModel):
run_id: str
status: str # "pending" | "running" | "completed" | "failed"
message: str
class CouncilResultResponse(BaseModel):
run_id: str
status: str
final_draft: Optional[str] = None
critic_score: Optional[float] = None
iteration_count: Optional[int] = None
error: Optional[str] = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@router.get("/health")
async def health_check():
"""Health check endpoint."""
return {"status": "ok", "service": "CouncilOS API"}
@router.post("/councils/run", response_model=CouncilRunResponse, status_code=202)
async def start_council_run(
request: CouncilRunRequest,
background_tasks: BackgroundTasks,
):
"""
Start a new council run.
The run executes asynchronously in the background. Poll
GET /api/councils/run/{run_id} for the result, or connect to the
WebSocket at /ws/council/{run_id} for real-time updates.
"""
run_id = str(uuid.uuid4())
# Register the run as pending in the in-memory store
run_store.create(run_id, request.input_topic)
# Schedule the graph execution as a background task
background_tasks.add_task(_execute_run, run_id, request.input_topic)
return CouncilRunResponse(
run_id=run_id,
status="pending",
message=f"Council run started. Connect to /ws/council/{run_id} for live updates.",
)
@router.get("/councils/run/{run_id}", response_model=CouncilResultResponse)
async def get_council_result(run_id: str):
"""
Retrieve the current status or final result of a council run.
"""
run = run_store.get(run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found.")
return CouncilResultResponse(
run_id=run_id,
status=run["status"],
final_draft=run.get("final_draft"),
critic_score=run.get("critic_score"),
iteration_count=run.get("iteration_count"),
error=run.get("error"),
)
# ---------------------------------------------------------------------------
# Internal helpers
# ---------------------------------------------------------------------------
async def _execute_run(run_id: str, input_topic: str) -> None:
"""
Background task that runs the LangGraph council and updates the run store.
"""
run_store.update(run_id, {"status": "running"})
try:
final_state = await run_council_async(
input_topic=input_topic,
run_id=run_id,
on_node_event=lambda nid, node: run_store.update(
nid, {"active_node": node}
),
)
run_store.update(
run_id,
{
"status": "completed",
"final_draft": final_state.get("current_draft"),
"critic_score": final_state.get("critic_score"),
"iteration_count": final_state.get("iteration_count"),
"active_node": "done",
},
)
except Exception as exc: # noqa: BLE001
run_store.update(run_id, {"status": "failed", "error": str(exc)})

47
backend/api/run_store.py Normal file
View file

@ -0,0 +1,47 @@
"""
In-memory run store for Phase 1.
Tracks the status and results of council runs by run_id. This is intentionally
simple for Phase 1. Phase 3+ will replace this with a PostgreSQL-backed store.
"""
from typing import Any, Dict, Optional
import threading
class RunStore:
"""Thread-safe in-memory store for council run state."""
def __init__(self) -> None:
self._store: Dict[str, Dict[str, Any]] = {}
self._lock = threading.Lock()
def create(self, run_id: str, input_topic: str) -> None:
with self._lock:
self._store[run_id] = {
"run_id": run_id,
"input_topic": input_topic,
"status": "pending",
"final_draft": None,
"critic_score": None,
"iteration_count": None,
"active_node": None,
"error": None,
}
def get(self, run_id: str) -> Optional[Dict[str, Any]]:
with self._lock:
return self._store.get(run_id)
def update(self, run_id: str, updates: Dict[str, Any]) -> None:
with self._lock:
if run_id in self._store:
self._store[run_id].update(updates)
def delete(self, run_id: str) -> None:
with self._lock:
self._store.pop(run_id, None)
# Singleton instance shared across the application
run_store = RunStore()

128
backend/api/websocket.py Normal file
View file

@ -0,0 +1,128 @@
"""
WebSocket endpoint for real-time agent status updates.
Clients connect to /ws/council/{run_id} and receive JSON events whenever
an agent node becomes active. This powers the live diagram pulsing in the frontend.
Event format:
{"event": "node_start", "run_id": "...", "node": "master_agent", "iteration": 2}
{"event": "node_complete", "run_id": "...", "node": "critic_agent", "score": 6.5}
{"event": "run_complete", "run_id": "...", "final_draft": "..."}
{"event": "run_failed", "run_id": "...", "error": "..."}
"""
import asyncio
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from api.run_store import run_store
ws_router = APIRouter()
# Active WebSocket connections keyed by run_id
_connections: dict[str, list[WebSocket]] = {}
async def broadcast_event(run_id: str, event: dict) -> None:
"""
Send an event to all WebSocket clients subscribed to a run_id.
Args:
run_id: The council run identifier.
event: The event dict to serialize and broadcast.
"""
clients = _connections.get(run_id, [])
disconnected = []
for ws in clients:
try:
await ws.send_text(json.dumps(event))
except Exception: # noqa: BLE001
disconnected.append(ws)
# Clean up dead connections
for ws in disconnected:
clients.remove(ws)
@ws_router.websocket("/ws/council/{run_id}")
async def council_websocket(websocket: WebSocket, run_id: str):
"""
WebSocket endpoint for live council run updates.
On connect: sends the current run status immediately.
While running: polls the run store and pushes status changes.
On complete/failed: sends a final event and closes the connection.
"""
await websocket.accept()
# Register this client
if run_id not in _connections:
_connections[run_id] = []
_connections[run_id].append(websocket)
try:
# Send current state immediately on connect
run = run_store.get(run_id)
if run is None:
await websocket.send_text(
json.dumps({"event": "error", "message": f"Run '{run_id}' not found."})
)
return
await websocket.send_text(
json.dumps({"event": "connected", "run_id": run_id, "status": run["status"]})
)
# Poll for status changes and push updates
last_node = None
while True:
run = run_store.get(run_id)
if run is None:
break
current_node = run.get("active_node")
if current_node and current_node != last_node:
await websocket.send_text(
json.dumps({
"event": "node_active",
"run_id": run_id,
"node": current_node,
"iteration": run.get("iteration_count"),
})
)
last_node = current_node
if run["status"] == "completed":
await websocket.send_text(
json.dumps({
"event": "run_complete",
"run_id": run_id,
"final_draft": run.get("final_draft"),
"critic_score": run.get("critic_score"),
"iteration_count": run.get("iteration_count"),
})
)
break
if run["status"] == "failed":
await websocket.send_text(
json.dumps({
"event": "run_failed",
"run_id": run_id,
"error": run.get("error"),
})
)
break
await asyncio.sleep(0.5) # 500ms polling interval
except WebSocketDisconnect:
pass
finally:
if run_id in _connections:
try:
_connections[run_id].remove(websocket)
except ValueError:
pass

59
backend/main.py Normal file
View file

@ -0,0 +1,59 @@
"""
CouncilOS FastAPI application entrypoint.
Start the server:
uvicorn main:app --reload --port 8000
API Overview:
POST /api/councils/run Start a council run
GET /api/councils/run/{run_id} Poll run status/result
GET /api/health Health check
WS /ws/council/{run_id} Real-time agent status events
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api.routes import router
from api.websocket import ws_router
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan: startup and shutdown logic."""
print("CouncilOS API starting up...")
yield
print("CouncilOS API shutting down...")
app = FastAPI(
title="CouncilOS API",
description=(
"Backend for the CouncilOS multi-agent AI pipeline platform. "
"Orchestrates LangGraph council runs and streams real-time agent "
"status via WebSockets."
),
version="0.1.0",
lifespan=lifespan,
)
# CORS — allow all origins in development; tighten in production
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Mount REST routes under /api prefix
app.include_router(router, prefix="/api")
# Mount WebSocket routes (no prefix — path is /ws/council/{run_id})
app.include_router(ws_router)
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

6
backend/pytest.ini Normal file
View file

@ -0,0 +1,6 @@
[pytest]
testpaths = tests
asyncio_mode = auto
python_files = test_*.py
python_classes = Test*
python_functions = test_*

38
backend/requirements.txt Normal file
View file

@ -0,0 +1,38 @@
# Core AI orchestration
langgraph>=0.2.0
langchain>=0.2.0
langchain-anthropic>=0.1.0
langchain-openai>=0.1.0
# Backend API
fastapi>=0.111.0
uvicorn[standard]>=0.30.0
websockets>=12.0
python-multipart>=0.0.9
# Database
asyncpg>=0.29.0
sqlalchemy[asyncio]>=2.0.0
alembic>=1.13.0
# Vector DB (PDF tool)
chromadb>=0.5.0
pypdf>=4.0.0
# Search tool
tavily-python>=0.3.0
# Utilities
python-dotenv>=1.0.0
pydantic>=2.0.0
pydantic-settings>=2.0.0
# Linting and formatting
ruff>=0.4.0
black>=24.0.0
# Testing
pytest>=8.0.0
pytest-asyncio>=0.23.0
pytest-mock>=3.14.0
httpx>=0.27.0

View file

@ -0,0 +1 @@
"""Service modules for CouncilOS backend."""

View file

@ -0,0 +1,131 @@
"""
Graph Builder constructs the LangGraph execution graph for council runs.
Phase 1: Hard-coded test graph:
User Input Master Agent Critic Agent (score < 8: back to Master | score 8: Writer Agent)
Phase 3 (future): This module will be extended to build graphs dynamically
from JSON blueprints stored in PostgreSQL.
"""
import asyncio
from typing import Any, Callable, Optional
from langgraph.graph import StateGraph, END
from state import CouncilState
from agents import master_agent_node, critic_agent_node, writer_agent_node
def route_after_critic(state: CouncilState) -> str:
"""
Conditional edge function: determines next node after the critic.
Returns:
"master_agent" if the critic wants rework.
"writer_agent" if the critic approves the draft.
"""
decision = state.get("route_decision", "rework")
if decision == "approve":
return "writer_agent"
return "master_agent"
def build_council_graph(
on_node_start: Optional[Callable[[str, str], Any]] = None,
) -> StateGraph:
"""
Build and compile the Phase 1 hard-coded council graph.
Graph topology:
master_agent critic_agent (conditional) master_agent | writer_agent END
Args:
on_node_start: Optional async callback invoked when a node begins execution.
Signature: (run_id: str, node_name: str) -> Any
Used to emit WebSocket events for real-time UI updates.
Returns:
A compiled LangGraph StateGraph ready for invocation.
"""
graph = StateGraph(CouncilState)
# Register agent nodes
graph.add_node("master_agent", master_agent_node)
graph.add_node("critic_agent", critic_agent_node)
graph.add_node("writer_agent", writer_agent_node)
# Define edges
graph.set_entry_point("master_agent")
graph.add_edge("master_agent", "critic_agent")
# Conditional edge: critic decides whether to rework or approve
graph.add_conditional_edges(
"critic_agent",
route_after_critic,
{
"master_agent": "master_agent",
"writer_agent": "writer_agent",
},
)
# Writer is the terminal node
graph.add_edge("writer_agent", END)
return graph.compile()
def create_initial_state(
input_topic: str,
run_id: str,
) -> CouncilState:
"""
Create a fresh CouncilState for a new council run.
Args:
input_topic: The user's prompt or document content.
run_id: Unique identifier for this run (used in WebSocket events).
Returns:
An initialized CouncilState dict.
"""
return CouncilState(
input_topic=input_topic,
current_draft="",
feedback_history=[],
route_decision="",
messages=[],
iteration_count=0,
critic_score=None,
run_id=run_id,
active_node="",
)
async def run_council_async(
input_topic: str,
run_id: str,
on_node_event: Optional[Callable[[str, str], Any]] = None,
) -> CouncilState:
"""
Execute a full council run asynchronously.
Args:
input_topic: The user's prompt.
run_id: Unique identifier for this run.
on_node_event: Optional callback for WebSocket node events.
Returns:
The final CouncilState after the writer agent completes.
"""
graph = build_council_graph(on_node_start=on_node_event)
initial_state = create_initial_state(input_topic, run_id)
# LangGraph's invoke is synchronous — run it in a thread pool to avoid
# blocking the FastAPI event loop
loop = asyncio.get_event_loop()
final_state = await loop.run_in_executor(
None,
lambda: graph.invoke(initial_state),
)
return final_state

47
backend/state.py Normal file
View file

@ -0,0 +1,47 @@
"""
CouncilState the central data structure passed between all agents in LangGraph.
All agents must read from and write to this TypedDict. Agents must not store
state internally; everything passes through CouncilState.
"""
from typing import Annotated, List, Optional
import operator
from typing_extensions import TypedDict
class CouncilState(TypedDict):
"""
The global state shared across all agents in a council run.
Fields:
input_topic: The user's original prompt or uploaded PDF content.
current_draft: The document currently being worked on.
feedback_history: All critic feedback accumulated across loop iterations.
Agents append here never overwrite.
route_decision: Routing signal used by conditional edges.
Values: "rework" | "approve" | custom strings.
messages: LLM message history (system prompts + responses).
Uses operator.add reducer so messages accumulate.
iteration_count: Tracks how many rework loops have occurred.
critic_score: The numeric score (010) assigned by the critic agent.
run_id: Unique identifier for this council run (for WebSocket events).
active_node: Name of the currently executing agent node (for UI updates).
"""
input_topic: str
current_draft: str
feedback_history: Annotated[List[str], operator.add]
route_decision: str
messages: Annotated[list, operator.add]
iteration_count: int
critic_score: Optional[float]
run_id: str
active_node: str
# Approval threshold: critic score must reach this value to exit the loop
APPROVAL_THRESHOLD = 8.0
# Safety limit: maximum number of rework iterations before forcing approval
MAX_ITERATIONS = 5

View file

@ -0,0 +1 @@
"""Pytest test suite for CouncilOS backend."""

99
backend/tests/test_api.py Normal file
View file

@ -0,0 +1,99 @@
"""
Integration tests for the FastAPI REST endpoints.
Uses httpx.AsyncClient with the TestClient pattern no real LLM calls.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from unittest.mock import AsyncMock, patch
from fastapi.testclient import TestClient
from main import app
from api.run_store import run_store
@pytest.fixture(autouse=True)
def clean_run_store():
"""Reset the run store before each test."""
run_store._store.clear()
yield
run_store._store.clear()
client = TestClient(app)
class TestHealthEndpoint:
def test_health_check_returns_ok(self):
response = client.get("/api/health")
assert response.status_code == 200
assert response.json()["status"] == "ok"
class TestStartCouncilRun:
def test_start_run_returns_202_with_run_id(self):
with patch("api.routes._execute_run", new_callable=AsyncMock):
response = client.post(
"/api/councils/run",
json={"input_topic": "Erkläre maschinelles Lernen"},
)
assert response.status_code == 202
data = response.json()
assert "run_id" in data
assert data["status"] == "pending"
assert len(data["run_id"]) == 36 # UUID format
def test_start_run_rejects_empty_topic(self):
response = client.post("/api/councils/run", json={"input_topic": ""})
assert response.status_code == 422 # Pydantic validation error
def test_start_run_rejects_missing_topic(self):
response = client.post("/api/councils/run", json={})
assert response.status_code == 422
class TestGetCouncilResult:
def test_get_pending_run(self):
run_store.create("test-run-id", "Test topic")
response = client.get("/api/councils/run/test-run-id")
assert response.status_code == 200
data = response.json()
assert data["run_id"] == "test-run-id"
assert data["status"] == "pending"
def test_get_completed_run(self):
run_store.create("completed-run", "Topic")
run_store.update("completed-run", {
"status": "completed",
"final_draft": "Final polished document.",
"critic_score": 9.0,
"iteration_count": 2,
})
response = client.get("/api/councils/run/completed-run")
assert response.status_code == 200
data = response.json()
assert data["status"] == "completed"
assert data["final_draft"] == "Final polished document."
assert data["critic_score"] == 9.0
assert data["iteration_count"] == 2
def test_get_nonexistent_run_returns_404(self):
response = client.get("/api/councils/run/does-not-exist")
assert response.status_code == 404
def test_get_failed_run(self):
run_store.create("failed-run", "Topic")
run_store.update("failed-run", {
"status": "failed",
"error": "API connection timeout",
})
response = client.get("/api/councils/run/failed-run")
assert response.status_code == 200
data = response.json()
assert data["status"] == "failed"
assert "timeout" in data["error"]

View file

@ -0,0 +1,211 @@
"""
Tests for the LangGraph routing logic.
All LLM calls are mocked no real API calls are made in these tests.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from unittest.mock import patch, MagicMock
from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS
from services.graph_builder import route_after_critic, create_initial_state
class TestRouteAfterCritic:
"""Unit tests for the conditional edge routing function."""
def _make_state(self, route_decision: str, iteration_count: int = 1) -> CouncilState:
state = create_initial_state("test topic", "test-run")
state["route_decision"] = route_decision
state["iteration_count"] = iteration_count
return state
def test_approve_routes_to_writer(self):
state = self._make_state("approve")
assert route_after_critic(state) == "writer_agent"
def test_rework_routes_to_master(self):
state = self._make_state("rework")
assert route_after_critic(state) == "master_agent"
def test_empty_decision_defaults_to_rework(self):
state = self._make_state("")
assert route_after_critic(state) == "master_agent"
def test_unknown_decision_defaults_to_rework(self):
state = self._make_state("unknown_value")
assert route_after_critic(state) == "master_agent"
class TestCriticAgentParsing:
"""Unit tests for the critic agent's response parser."""
def test_parse_valid_approve_response(self):
from agents.critic_agent import _parse_critic_response
content = "SCORE: 9\nVERDICT: approve\nFEEDBACK:\nExcellent work."
score, verdict, feedback = _parse_critic_response(content)
assert score == 9.0
assert verdict == "approve"
assert "Excellent" in feedback
def test_parse_valid_rework_response(self):
from agents.critic_agent import _parse_critic_response
content = "SCORE: 5\nVERDICT: rework\nFEEDBACK:\nNeeds more detail."
score, verdict, feedback = _parse_critic_response(content)
assert score == 5.0
assert verdict == "rework"
assert "detail" in feedback
def test_parse_score_clamped_to_0_10(self):
from agents.critic_agent import _parse_critic_response
content = "SCORE: 15\nVERDICT: approve\nFEEDBACK:\nToo high score."
score, verdict, feedback = _parse_critic_response(content)
assert score == 10.0
def test_parse_missing_score_defaults_to_0(self):
from agents.critic_agent import _parse_critic_response
content = "No structured response at all."
score, verdict, feedback = _parse_critic_response(content)
assert score == 0.0
assert verdict == "rework"
def test_threshold_boundary_exactly_8_approves(self):
from agents.critic_agent import _parse_critic_response
content = f"SCORE: {APPROVAL_THRESHOLD}\nVERDICT: approve\nFEEDBACK:\nGood."
score, verdict, _ = _parse_critic_response(content)
assert score == APPROVAL_THRESHOLD
assert verdict == "approve"
class TestMasterAgentPromptBuilding:
"""Unit tests for the master agent's prompt construction."""
def test_first_iteration_prompt_has_no_feedback_block(self):
from agents.master_agent import _build_master_prompt
state = create_initial_state("Test topic", "run-1")
prompt = _build_master_prompt(state)
assert "Test topic" in prompt
assert "feedback" not in prompt.lower() or "Feedback" not in prompt
def test_rework_prompt_includes_feedback(self):
from agents.master_agent import _build_master_prompt
state = create_initial_state("Test topic", "run-1")
state["current_draft"] = "My draft"
state["feedback_history"] = ["Score: 5/10\nNeeds more structure."]
prompt = _build_master_prompt(state)
assert "My draft" in prompt
assert "Needs more structure" in prompt
def test_rework_prompt_includes_all_feedback_rounds(self):
from agents.master_agent import _build_master_prompt
state = create_initial_state("Topic", "run-2")
state["current_draft"] = "Draft v2"
state["feedback_history"] = ["First feedback", "Second feedback"]
prompt = _build_master_prompt(state)
assert "First feedback" in prompt
assert "Second feedback" in prompt
assert "2 round" in prompt
class TestCriticSafetyValve:
"""Tests for the MAX_ITERATIONS safety valve in the critic agent."""
def test_safety_valve_forces_approve_at_max_iterations(self):
from agents.critic_agent import critic_agent_node
state = create_initial_state("topic", "run-safety")
state["iteration_count"] = MAX_ITERATIONS
state["current_draft"] = "Some draft"
result = critic_agent_node(state)
assert result["route_decision"] == "approve"
assert result["critic_score"] == APPROVAL_THRESHOLD
def test_safety_valve_not_triggered_below_max(self):
"""Below MAX_ITERATIONS the real LLM call would happen — mock it."""
from agents.critic_agent import critic_agent_node
mock_response = MagicMock()
mock_response.content = "SCORE: 4\nVERDICT: rework\nFEEDBACK:\nNeeds work."
with patch("agents.critic_agent.ChatAnthropic") as MockLLM:
MockLLM.return_value.invoke.return_value = mock_response
state = create_initial_state("topic", "run-below-max")
state["iteration_count"] = MAX_ITERATIONS - 1
state["current_draft"] = "Draft"
result = critic_agent_node(state)
assert result["route_decision"] == "rework"
assert result["critic_score"] == 4.0
class TestMasterAgentNode:
"""Integration-style tests for master_agent_node with mocked LLM."""
def test_master_agent_returns_draft(self):
from agents.master_agent import master_agent_node
mock_response = MagicMock()
mock_response.content = "This is a generated draft about AI."
with patch("agents.master_agent.ChatAnthropic") as MockLLM:
MockLLM.return_value.invoke.return_value = mock_response
state = create_initial_state("AI basics", "run-master-1")
result = master_agent_node(state)
assert result["current_draft"] == "This is a generated draft about AI."
assert result["active_node"] == "master_agent"
assert result["iteration_count"] == 1
def test_master_agent_increments_iteration_count(self):
from agents.master_agent import master_agent_node
mock_response = MagicMock()
mock_response.content = "Draft"
with patch("agents.master_agent.ChatAnthropic") as MockLLM:
MockLLM.return_value.invoke.return_value = mock_response
state = create_initial_state("topic", "run-master-2")
state["iteration_count"] = 3
result = master_agent_node(state)
assert result["iteration_count"] == 4
class TestWriterAgentNode:
"""Tests for writer_agent_node with mocked LLM."""
def test_writer_returns_polished_draft(self):
from agents.writer_agent import writer_agent_node
mock_response = MagicMock()
mock_response.content = "Polished and professional document."
with patch("agents.writer_agent.ChatAnthropic") as MockLLM:
MockLLM.return_value.invoke.return_value = mock_response
state = create_initial_state("Machine Learning", "run-writer-1")
state["current_draft"] = "Raw draft content"
result = writer_agent_node(state)
assert result["current_draft"] == "Polished and professional document."
assert result["active_node"] == "writer_agent"
assert result["route_decision"] == "done"

View file

@ -0,0 +1,55 @@
"""Tests for the in-memory RunStore."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from api.run_store import RunStore
class TestRunStore:
def setup_method(self):
self.store = RunStore()
def test_create_and_get(self):
self.store.create("run-1", "Test topic")
run = self.store.get("run-1")
assert run is not None
assert run["run_id"] == "run-1"
assert run["input_topic"] == "Test topic"
assert run["status"] == "pending"
def test_get_nonexistent_returns_none(self):
assert self.store.get("nonexistent") is None
def test_update_status(self):
self.store.create("run-2", "Topic")
self.store.update("run-2", {"status": "running"})
assert self.store.get("run-2")["status"] == "running"
def test_update_nonexistent_is_noop(self):
"""Updating a non-existent run should not raise."""
self.store.update("ghost-run", {"status": "running"})
def test_delete(self):
self.store.create("run-3", "Topic")
self.store.delete("run-3")
assert self.store.get("run-3") is None
def test_delete_nonexistent_is_noop(self):
self.store.delete("ghost-run")
def test_update_partial_fields(self):
self.store.create("run-4", "Topic")
self.store.update("run-4", {"status": "completed", "final_draft": "Result text"})
run = self.store.get("run-4")
assert run["status"] == "completed"
assert run["final_draft"] == "Result text"
assert run["input_topic"] == "Topic" # original field preserved
def test_multiple_runs_independent(self):
self.store.create("run-a", "Topic A")
self.store.create("run-b", "Topic B")
self.store.update("run-a", {"status": "running"})
assert self.store.get("run-b")["status"] == "pending"

View file

@ -0,0 +1,44 @@
"""Tests for CouncilState structure and graph_builder helpers."""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS
from services.graph_builder import create_initial_state
class TestCouncilState:
def test_initial_state_fields(self):
state = create_initial_state("Test topic", "run-001")
assert state["input_topic"] == "Test topic"
assert state["current_draft"] == ""
assert state["feedback_history"] == []
assert state["route_decision"] == ""
assert state["messages"] == []
assert state["iteration_count"] == 0
assert state["critic_score"] is None
assert state["run_id"] == "run-001"
assert state["active_node"] == ""
def test_approval_threshold_value(self):
assert APPROVAL_THRESHOLD == 8.0
def test_max_iterations_value(self):
assert MAX_ITERATIONS == 5
def test_state_is_typed_dict(self):
"""CouncilState should be instantiable as a plain dict."""
state: CouncilState = {
"input_topic": "AI",
"current_draft": "draft",
"feedback_history": ["fb1"],
"route_decision": "rework",
"messages": [],
"iteration_count": 1,
"critic_score": 6.0,
"run_id": "x",
"active_node": "critic_agent",
}
assert state["critic_score"] == 6.0

View file

@ -0,0 +1,7 @@
"""
Agent tools for CouncilOS.
Phase 4 will add:
- web_search_tool: Tavily Search API wrapper
- pdf_reader_tool: PyPDF + ChromaDB vector store wrapper
"""

54
docker-compose.yml Normal file
View file

@ -0,0 +1,54 @@
version: "3.9"
# CouncilOS — local development environment
# Usage:
# docker compose up -d # Start all services
# docker compose down # Stop all services
# docker compose logs -f api # Follow API logs
services:
# ---------------------------------------------------------------------------
# PostgreSQL — stores council blueprints (used from Phase 2)
# ---------------------------------------------------------------------------
db:
image: postgres:16-alpine
restart: unless-stopped
environment:
POSTGRES_DB: councilOS
POSTGRES_USER: user
POSTGRES_PASSWORD: password
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U user -d councilOS"]
interval: 5s
timeout: 5s
retries: 5
# ---------------------------------------------------------------------------
# CouncilOS API — FastAPI + LangGraph backend
# ---------------------------------------------------------------------------
api:
build:
context: ./backend
dockerfile: Dockerfile
restart: unless-stopped
ports:
- "8000:8000"
env_file:
- .env
environment:
DATABASE_URL: postgresql+asyncpg://user:password@db:5432/councilOS
volumes:
- ./backend:/app
- chroma_data:/app/chroma_db
depends_on:
db:
condition: service_healthy
command: uvicorn main:app --host 0.0.0.0 --port 8000 --reload
volumes:
postgres_data:
chroma_data: