Merge pull request #7 from Kenearos/claude/identify-missing-features-NI1UU

Phase 4: God Mode, Tool Binding, and Persistent Run History
This commit is contained in:
Kenearos 2026-02-21 12:01:41 +01:00 committed by GitHub
commit fb0d3ae8f1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 2502 additions and 81 deletions

View file

@ -0,0 +1,56 @@
"""Create council_runs table for persistent run history
Revision ID: 002
Revises: 001
Create Date: 2026-02-21
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
revision: str = "002"
down_revision: Union[str, None] = "001"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.create_table(
"council_runs",
sa.Column("id", sa.String(36), primary_key=True),
sa.Column("blueprint_id", sa.String(36), nullable=True),
sa.Column("input_topic", sa.Text(), nullable=False),
sa.Column(
"status",
sa.String(20),
nullable=False,
server_default="pending",
),
sa.Column(
"execution_mode",
sa.String(20),
nullable=False,
server_default="auto-pilot",
),
sa.Column("final_draft", sa.Text(), nullable=True),
sa.Column("critic_score", sa.Float(), nullable=True),
sa.Column("iteration_count", sa.Integer(), nullable=True),
sa.Column("active_node", sa.String(255), nullable=True),
sa.Column("error", sa.Text(), nullable=True),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"completed_at",
sa.DateTime(timezone=True),
nullable=True,
),
)
def downgrade() -> None:
op.drop_table("council_runs")

View file

@ -2,23 +2,33 @@
REST API routes for CouncilOS.
Endpoints:
POST /api/councils/run Start a new council run (Phase 1 hard-coded graph)
POST /api/councils/{id}/run Start a run from a saved blueprint (Phase 3)
GET /api/councils/run/{run_id} Poll the status/result of a run
GET /api/health Health check
POST /api/councils/run Start a new council run (Phase 1)
POST /api/councils/{id}/run Start a run from a blueprint (Phase 3)
GET /api/councils/run/{run_id} Poll the status/result of a run
POST /api/councils/run/{run_id}/approve God Mode: approve/reject/modify (Phase 4)
GET /api/councils/run/{run_id}/state God Mode: get paused state (Phase 4)
POST /api/councils/upload-pdf Upload and ingest a PDF (Phase 4)
GET /api/health Health check
"""
import os
import tempfile
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, BackgroundTasks
from typing import List, Optional
from fastapi import APIRouter, BackgroundTasks, Depends, File, HTTPException, UploadFile
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from services.graph_builder import run_council_async
from services.dynamic_graph_builder import run_blueprint_council_async
from services.blueprint_service import get_blueprint
from database import get_session
from api.run_store import run_store
from database import get_session
from services.blueprint_service import get_blueprint
from services.dynamic_graph_builder import (
get_god_mode_state,
resume_god_mode,
run_blueprint_council_async,
)
from services.graph_builder import run_council_async
router = APIRouter()
@ -36,11 +46,15 @@ class CouncilRunRequest(BaseModel):
description="The user's prompt or document content for the council to work on.",
examples=["Erkläre die wichtigsten Konzepte des maschinellen Lernens für Einsteiger."],
)
god_mode: bool = Field(
default=False,
description="If true, the run pauses before each node for human approval.",
)
class CouncilRunResponse(BaseModel):
run_id: str
status: str # "pending" | "running" | "completed" | "failed"
status: str # "pending" | "running" | "completed" | "failed" | "paused"
message: str
@ -51,6 +65,26 @@ class CouncilResultResponse(BaseModel):
critic_score: Optional[float] = None
iteration_count: Optional[int] = None
error: Optional[str] = None
paused: Optional[bool] = None
next_nodes: Optional[List[str]] = None
current_draft: Optional[str] = None
class GodModeApprovalRequest(BaseModel):
action: str = Field(
...,
description="Action to take: 'approve', 'reject', or 'modify'.",
)
modified_state: Optional[dict] = Field(
default=None,
description="Partial state override when action is 'modify'.",
)
class PdfUploadResponse(BaseModel):
filename: str
chunks_ingested: int
message: str
# ---------------------------------------------------------------------------
@ -104,8 +138,8 @@ async def start_blueprint_run(
"""
Start a council run using a saved blueprint (Phase 3 dynamic graph).
Reads the blueprint from PostgreSQL and dynamically constructs the
LangGraph execution graph at runtime.
Set god_mode=true to pause before each agent node and require
human approval via the /approve endpoint.
"""
bp = await get_blueprint(session, blueprint_id)
if bp is None:
@ -116,14 +150,19 @@ async def start_blueprint_run(
blueprint_dict = bp.to_dict()
background_tasks.add_task(
_execute_blueprint_run, run_id, request.input_topic, blueprint_dict
_execute_blueprint_run,
run_id,
request.input_topic,
blueprint_dict,
request.god_mode,
)
mode_label = "God Mode" if request.god_mode else "Auto-Pilot"
return CouncilRunResponse(
run_id=run_id,
status="pending",
message=(
f"Council run started from blueprint '{bp.name}'. "
f"Council run started from blueprint '{bp.name}' ({mode_label}). "
f"Connect to /ws/council/{run_id} for live updates."
),
)
@ -133,11 +172,21 @@ async def start_blueprint_run(
async def get_council_result(run_id: str):
"""
Retrieve the current status or final result of a council run.
In God Mode, includes paused state and next_nodes info.
"""
run = run_store.get(run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found.")
# Check for god mode paused state
god_state = get_god_mode_state(run_id)
paused = god_state["paused"] if god_state else None
next_nodes = god_state["next_nodes"] if god_state else None
current_draft = (
god_state["current_state"].get("current_draft") if god_state else None
)
return CouncilResultResponse(
run_id=run_id,
status=run["status"],
@ -145,6 +194,97 @@ async def get_council_result(run_id: str):
critic_score=run.get("critic_score"),
iteration_count=run.get("iteration_count"),
error=run.get("error"),
paused=paused,
next_nodes=next_nodes,
current_draft=current_draft,
)
@router.post("/councils/run/{run_id}/approve", response_model=CouncilResultResponse)
async def approve_god_mode(
run_id: str,
request: GodModeApprovalRequest,
background_tasks: BackgroundTasks,
):
"""
Approve, reject, or modify a paused God Mode council run.
Actions:
approve continue execution to the next node
reject stop the run entirely
modify update the state before continuing (pass modified_state)
"""
god_state = get_god_mode_state(run_id)
if not god_state or not god_state.get("paused"):
raise HTTPException(
status_code=400,
detail=f"Run '{run_id}' is not paused in God Mode.",
)
if request.action == "reject":
state = await resume_god_mode(run_id, action="reject")
run_store.update(run_id, {"status": "failed", "error": "Rejected by user in God Mode."})
return CouncilResultResponse(
run_id=run_id,
status="failed",
error="Rejected by user in God Mode.",
)
# Resume in background (approve or modify)
background_tasks.add_task(
_resume_god_mode_task,
run_id,
request.action,
request.modified_state,
)
return CouncilResultResponse(
run_id=run_id,
status="running",
)
@router.get("/councils/run/{run_id}/state")
async def get_run_state(run_id: str):
"""
Get the full paused state of a God Mode run for the approval UI.
"""
god_state = get_god_mode_state(run_id)
if not god_state:
raise HTTPException(
status_code=404,
detail=f"No God Mode session found for run '{run_id}'.",
)
return god_state
@router.post("/councils/upload-pdf", response_model=PdfUploadResponse)
async def upload_pdf(file: UploadFile = File(...)):
"""
Upload and ingest a PDF file into the ChromaDB vector store.
The content becomes searchable by agents with the PDF Reader tool enabled.
"""
if not file.filename or not file.filename.lower().endswith(".pdf"):
raise HTTPException(status_code=400, detail="Only PDF files are accepted.")
from tools.pdf_reader import ingest_pdf
# Save upload to a temp file
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
content = await file.read()
tmp.write(content)
tmp_path = tmp.name
try:
chunks = ingest_pdf(tmp_path)
finally:
os.unlink(tmp_path)
return PdfUploadResponse(
filename=file.filename,
chunks_ingested=chunks,
message=f"Successfully ingested {chunks} chunks from '{file.filename}'.",
)
@ -180,7 +320,10 @@ async def _execute_run(run_id: str, input_topic: str) -> None:
async def _execute_blueprint_run(
run_id: str, input_topic: str, blueprint: dict
run_id: str,
input_topic: str,
blueprint: dict,
god_mode: bool = False,
) -> None:
"""
Background task that runs a dynamically built LangGraph from a blueprint.
@ -191,10 +334,22 @@ async def _execute_blueprint_run(
blueprint=blueprint,
input_topic=input_topic,
run_id=run_id,
god_mode=god_mode,
on_node_event=lambda nid, node: run_store.update(
nid, {"active_node": node}
),
)
# In god mode, the first invoke will pause at the first node
if god_mode and final_state:
god_state = get_god_mode_state(run_id)
if god_state and god_state.get("paused"):
run_store.update(run_id, {
"status": "paused",
"active_node": god_state["next_nodes"][0] if god_state["next_nodes"] else None,
})
return
run_store.update(
run_id,
{
@ -207,3 +362,42 @@ async def _execute_blueprint_run(
)
except Exception as exc: # noqa: BLE001
run_store.update(run_id, {"status": "failed", "error": str(exc)})
async def _resume_god_mode_task(
run_id: str,
action: str,
modified_state: Optional[dict],
) -> None:
"""Background task that resumes a god mode run after human approval."""
run_store.update(run_id, {"status": "running"})
try:
state = await resume_god_mode(run_id, action=action, modified_state=modified_state)
if state is None:
run_store.update(run_id, {"status": "failed", "error": "God Mode session not found."})
return
# Check if paused again at next node
god_state = get_god_mode_state(run_id)
if god_state and god_state.get("paused"):
run_store.update(run_id, {
"status": "paused",
"active_node": god_state["next_nodes"][0] if god_state["next_nodes"] else None,
"current_draft": state.get("current_draft"),
"critic_score": state.get("critic_score"),
"iteration_count": state.get("iteration_count"),
})
else:
run_store.update(
run_id,
{
"status": "completed",
"final_draft": state.get("current_draft"),
"critic_score": state.get("critic_score"),
"iteration_count": state.get("iteration_count"),
"active_node": "done",
},
)
except Exception as exc: # noqa: BLE001
run_store.update(run_id, {"status": "failed", "error": str(exc)})

View file

@ -0,0 +1,64 @@
"""
REST API routes for council run history.
Endpoints:
GET /api/runs/ List recent council runs
GET /api/runs/{run_id} Get a specific run's details
"""
from typing import List, Optional
from fastapi import APIRouter, Depends, HTTPException, Query
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_session
from services.run_service import get_run, list_runs
run_history_router = APIRouter()
# ---------------------------------------------------------------------------
# Response Models
# ---------------------------------------------------------------------------
class RunHistoryResponse(BaseModel):
id: str
blueprint_id: Optional[str] = None
input_topic: str
status: str
execution_mode: str
final_draft: Optional[str] = None
critic_score: Optional[float] = None
iteration_count: Optional[int] = None
error: Optional[str] = None
created_at: Optional[str] = None
completed_at: Optional[str] = None
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@run_history_router.get("/runs/", response_model=List[RunHistoryResponse])
async def list_all_runs(
limit: int = Query(default=50, ge=1, le=200),
offset: int = Query(default=0, ge=0),
session: AsyncSession = Depends(get_session),
):
"""List recent council runs, ordered by most recent first."""
runs = await list_runs(session, limit=limit, offset=offset)
return [r.to_dict() for r in runs]
@run_history_router.get("/runs/{run_id}", response_model=RunHistoryResponse)
async def get_single_run(
run_id: str,
session: AsyncSession = Depends(get_session),
):
"""Retrieve a specific council run by ID."""
run = await get_run(session, run_id)
if run is None:
raise HTTPException(status_code=404, detail=f"Run '{run_id}' not found.")
return run.to_dict()

View file

@ -2,20 +2,22 @@
WebSocket endpoint for real-time agent status updates.
Clients connect to /ws/council/{run_id} and receive JSON events whenever
an agent node becomes active. This powers the live diagram pulsing in the frontend.
an agent node becomes active or the run status changes.
Event format:
{"event": "node_start", "run_id": "...", "node": "master_agent", "iteration": 2}
{"event": "node_complete", "run_id": "...", "node": "critic_agent", "score": 6.5}
{"event": "run_complete", "run_id": "...", "final_draft": "..."}
{"event": "node_active", "run_id": "...", "node": "master_agent", "iteration": 2}
{"event": "run_paused", "run_id": "...", "next_nodes": ["critic_agent"], "current_draft": "..."}
{"event": "run_complete", "run_id": "...", "final_draft": "...", "critic_score": 8.5}
{"event": "run_failed", "run_id": "...", "error": "..."}
"""
import asyncio
import json
from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from api.run_store import run_store
from services.dynamic_graph_builder import get_god_mode_state
ws_router = APIRouter()
@ -53,6 +55,7 @@ async def council_websocket(websocket: WebSocket, run_id: str):
On connect: sends the current run status immediately.
While running: polls the run store and pushes status changes.
On paused: sends a god mode pause event with next_nodes.
On complete/failed: sends a final event and closes the connection.
"""
await websocket.accept()
@ -77,13 +80,17 @@ async def council_websocket(websocket: WebSocket, run_id: str):
# Poll for status changes and push updates
last_node = None
last_status = run["status"]
while True:
run = run_store.get(run_id)
if run is None:
break
current_node = run.get("active_node")
if current_node and current_node != last_node:
current_status = run["status"]
# Emit node_active when the active agent changes
if current_node and current_node != last_node and current_node != "done":
await websocket.send_text(
json.dumps({
"event": "node_active",
@ -94,7 +101,41 @@ async def council_websocket(websocket: WebSocket, run_id: str):
)
last_node = current_node
if run["status"] == "completed":
# God Mode: emit pause event
if current_status == "paused" and last_status != "paused":
god_state = get_god_mode_state(run_id)
await websocket.send_text(
json.dumps({
"event": "run_paused",
"run_id": run_id,
"next_nodes": god_state["next_nodes"] if god_state else [],
"current_draft": (
god_state["current_state"].get("current_draft", "")
if god_state else ""
),
"critic_score": (
god_state["current_state"].get("critic_score")
if god_state else None
),
"iteration_count": (
god_state["current_state"].get("iteration_count")
if god_state else None
),
})
)
last_status = current_status
# Status changed from paused back to running (user approved)
if current_status == "running" and last_status == "paused":
await websocket.send_text(
json.dumps({
"event": "run_resumed",
"run_id": run_id,
})
)
last_status = current_status
if current_status == "completed":
await websocket.send_text(
json.dumps({
"event": "run_complete",
@ -106,7 +147,7 @@ async def council_websocket(websocket: WebSocket, run_id: str):
)
break
if run["status"] == "failed":
if current_status == "failed":
await websocket.send_text(
json.dumps({
"event": "run_failed",
@ -116,6 +157,7 @@ async def council_websocket(websocket: WebSocket, run_id: str):
)
break
last_status = current_status
await asyncio.sleep(0.5) # 500ms polling interval
except WebSocketDisconnect:

View file

@ -38,6 +38,7 @@ async def get_session() -> AsyncSession:
async def init_db() -> None:
"""Create all tables. Used at application startup."""
from models.blueprint import Base
import models.council_run # noqa: F401 — register CouncilRun model
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)

View file

@ -5,16 +5,21 @@ Start the server:
uvicorn main:app --reload --port 8000
API Overview:
POST /api/councils/ Create a blueprint
GET /api/councils/ List all blueprints
GET /api/councils/{id} Get specific blueprint
PUT /api/councils/{id} Update a blueprint
DELETE /api/councils/{id} Delete a blueprint
POST /api/councils/run Start a run (Phase 1 hard-coded graph)
POST /api/councils/{id}/run Start a run from a blueprint (Phase 3)
GET /api/councils/run/{run_id} Poll run status/result
GET /api/health Health check
WS /ws/council/{run_id} Real-time agent status events
POST /api/councils/ Create a blueprint
GET /api/councils/ List all blueprints
GET /api/councils/{id} Get specific blueprint
PUT /api/councils/{id} Update a blueprint
DELETE /api/councils/{id} Delete a blueprint
POST /api/councils/run Start a run (Phase 1)
POST /api/councils/{id}/run Start a run from a blueprint
GET /api/councils/run/{run_id} Poll run status/result
POST /api/councils/run/{run_id}/approve God Mode: approve/reject/modify
GET /api/councils/run/{run_id}/state God Mode: paused state
POST /api/councils/upload-pdf Upload PDF for ingestion
GET /api/runs/ List run history
GET /api/runs/{run_id} Get historical run detail
GET /api/health Health check
WS /ws/council/{run_id} Real-time agent status events
"""
from contextlib import asynccontextmanager
@ -23,6 +28,7 @@ from fastapi.middleware.cors import CORSMiddleware
from api.routes import router
from api.blueprint_routes import blueprint_router
from api.run_history_routes import run_history_router
from api.websocket import ws_router
from database import init_db, close_db
@ -45,7 +51,7 @@ app = FastAPI(
"Orchestrates LangGraph council runs and streams real-time agent "
"status via WebSockets."
),
version="0.2.0",
version="0.3.0",
lifespan=lifespan,
)
@ -61,6 +67,7 @@ app.add_middleware(
# Mount REST routes under /api prefix
app.include_router(router, prefix="/api")
app.include_router(blueprint_router, prefix="/api")
app.include_router(run_history_router, prefix="/api")
# Mount WebSocket routes (no prefix — path is /ws/council/{run_id})
app.include_router(ws_router)

View file

@ -0,0 +1,71 @@
"""
CouncilRun model persists council run history in PostgreSQL.
Each run record stores the execution metadata, status, and results.
Replaces the in-memory run_store for persistent storage.
"""
import uuid
from datetime import datetime, timezone
from sqlalchemy import Column, DateTime, Float, Integer, String, Text
from models.blueprint import Base
class CouncilRun(Base):
"""
A persisted council run stored in PostgreSQL.
Tracks the full lifecycle of a run: pending running completed/failed/paused.
"""
__tablename__ = "council_runs"
id = Column(
String(36),
primary_key=True,
default=lambda: str(uuid.uuid4()),
)
blueprint_id = Column(String(36), nullable=True)
input_topic = Column(Text, nullable=False)
status = Column(
String(20),
nullable=False,
default="pending",
)
execution_mode = Column(
String(20),
nullable=False,
default="auto-pilot",
)
final_draft = Column(Text, nullable=True)
critic_score = Column(Float, nullable=True)
iteration_count = Column(Integer, nullable=True)
active_node = Column(String(255), nullable=True)
error = Column(Text, nullable=True)
created_at = Column(
DateTime(timezone=True),
nullable=False,
default=lambda: datetime.now(timezone.utc),
)
completed_at = Column(
DateTime(timezone=True),
nullable=True,
)
def to_dict(self) -> dict:
"""Serialize to a JSON-friendly dict."""
return {
"id": self.id,
"blueprint_id": self.blueprint_id,
"input_topic": self.input_topic,
"status": self.status,
"execution_mode": self.execution_mode,
"final_draft": self.final_draft,
"critic_score": self.critic_score,
"iteration_count": self.iteration_count,
"active_node": self.active_node,
"error": self.error,
"created_at": self.created_at.isoformat() if self.created_at else None,
"completed_at": self.completed_at.isoformat() if self.completed_at else None,
}

View file

@ -1,8 +1,10 @@
# Core AI orchestration
langgraph>=0.2.0
langgraph-checkpoint>=2.0.0
langchain>=0.2.0
langchain-anthropic>=0.1.0
langchain-openai>=0.1.0
langchain-community>=0.2.0
# Backend API
fastapi>=0.111.0

View file

@ -5,6 +5,11 @@ This is the Phase 3 replacement for the hard-coded graph in graph_builder.py.
It reads a CouncilBlueprint JSON (as produced by the frontend parser) and
dynamically constructs the LangGraph StateGraph with the correct nodes,
edges, and conditional routing.
Phase 4 additions:
- Tool binding: agents with tools enabled (webSearch, pdfReader) get
LangChain tools bound to their LLM via .bind_tools().
- God Mode: supports interrupt_before for human-in-the-loop approval.
"""
import asyncio
@ -17,6 +22,8 @@ from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph
from state import CouncilState, APPROVAL_THRESHOLD, MAX_ITERATIONS
from tools.web_search import web_search
from tools.pdf_reader import pdf_search
# ---------------------------------------------------------------------------
@ -50,6 +57,78 @@ def _get_llm(model_name: str) -> Any:
return factory()
# ---------------------------------------------------------------------------
# Tool resolution
# ---------------------------------------------------------------------------
def _resolve_tools(tools_config: Optional[dict]) -> list:
"""
Resolve a node's tools config to a list of LangChain tool objects.
Args:
tools_config: Dict like {"webSearch": true, "pdfReader": true}
Returns:
A list of LangChain tool objects to bind to the LLM.
"""
if not tools_config:
return []
resolved = []
if tools_config.get("webSearch"):
resolved.append(web_search)
if tools_config.get("pdfReader"):
resolved.append(pdf_search)
return resolved
def _invoke_with_tools(llm: Any, messages: list, tools: list) -> Any:
"""
Invoke an LLM, optionally with tools bound. If the LLM returns tool
calls, execute them and feed results back for a final answer.
Args:
llm: A LangChain chat model instance.
messages: The message list to send.
tools: List of LangChain tools (may be empty).
Returns:
The final LLM response message.
"""
if not tools:
return llm.invoke(messages)
llm_with_tools = llm.bind_tools(tools)
response = llm_with_tools.invoke(messages)
# If no tool calls, return directly
if not response.tool_calls:
return response
# Execute tool calls and collect results
from langchain_core.messages import ToolMessage
tool_map = {t.name: t for t in tools}
tool_messages = [response]
for tc in response.tool_calls:
tool_fn = tool_map.get(tc["name"])
if tool_fn:
try:
result = tool_fn.invoke(tc["args"])
except Exception as exc: # noqa: BLE001
result = f"[Tool Error] {exc}"
else:
result = f"[Tool Error] Unknown tool: {tc['name']}"
tool_messages.append(
ToolMessage(content=str(result), tool_call_id=tc["id"])
)
# Final LLM call with tool results
return llm_with_tools.invoke(messages + tool_messages)
# ---------------------------------------------------------------------------
# Generic agent node factory
# ---------------------------------------------------------------------------
@ -59,6 +138,7 @@ def _make_agent_node(
label: str,
system_prompt: str,
model_name: str,
tools_config: Optional[dict] = None,
) -> Callable[[CouncilState], dict]:
"""
Create a LangGraph node function for a user-defined agent.
@ -71,10 +151,12 @@ def _make_agent_node(
label: Display name of the agent (used in prompts).
system_prompt: The persona / role definition for this agent.
model_name: Which LLM to use ("claude-3-5-sonnet" | "gpt-4o").
tools_config: Optional dict like {"webSearch": true, "pdfReader": true}.
Returns:
A callable (CouncilState) -> dict suitable for StateGraph.add_node().
"""
node_tools = _resolve_tools(tools_config)
def agent_node(state: CouncilState) -> dict:
llm = _get_llm(model_name)
@ -105,7 +187,7 @@ def _make_agent_node(
system_msg = SystemMessage(content=system_prompt)
user_msg = HumanMessage(content=user_content)
response = llm.invoke([system_msg, user_msg])
response = _invoke_with_tools(llm, [system_msg, user_msg], node_tools)
return {
"current_draft": response.content,
@ -177,6 +259,7 @@ def _make_critic_node(
label: str,
system_prompt: str,
model_name: str,
tools_config: Optional[dict] = None,
) -> Callable[[CouncilState], dict]:
"""
Create a critic-style node that scores and routes.
@ -186,6 +269,8 @@ def _make_critic_node(
"""
import re
node_tools = _resolve_tools(tools_config)
critic_system = (
system_prompt + "\n\n"
"IMPORTANT: You must respond in EXACTLY this format:\n\n"
@ -219,7 +304,7 @@ def _make_critic_node(
)
)
response = llm.invoke([system_msg, user_msg])
response = _invoke_with_tools(llm, [system_msg, user_msg], node_tools)
# Parse structured response
score_match = re.search(r"SCORE:\s*(\d+(?:\.\d+)?)", response.content)
@ -251,7 +336,10 @@ def _make_critic_node(
# Main: build graph from blueprint JSON
# ---------------------------------------------------------------------------
def build_graph_from_blueprint(blueprint: dict) -> Any:
def build_graph_from_blueprint(
blueprint: dict,
god_mode: bool = False,
) -> Any:
"""
Dynamically construct a compiled LangGraph from a CouncilBlueprint JSON.
@ -263,6 +351,8 @@ def build_graph_from_blueprint(blueprint: dict) -> Any:
"nodes": [{"id", "label", "systemPrompt", "model", "tools", "position"}],
"edges": [{"id", "source", "target", "type", "condition?"}]
}
god_mode: If True, compile with interrupt_before on all nodes so the
user can approve/reject at each step (Human-in-the-Loop).
Returns:
A compiled LangGraph StateGraph ready for invocation.
@ -295,16 +385,23 @@ def build_graph_from_blueprint(blueprint: dict) -> Any:
graph = StateGraph(CouncilState)
# Register all nodes
all_node_ids = []
for node in nodes:
nid = node["id"]
all_node_ids.append(nid)
label = node.get("label", nid)
system_prompt = node.get("systemPrompt", f"You are {label}.")
model_name = node.get("model", "claude-3-5-sonnet")
tools_config = node.get("tools")
if _is_critic_like(system_prompt):
node_fn = _make_critic_node(nid, label, system_prompt, model_name)
node_fn = _make_critic_node(
nid, label, system_prompt, model_name, tools_config
)
else:
node_fn = _make_agent_node(nid, label, system_prompt, model_name)
node_fn = _make_agent_node(
nid, label, system_prompt, model_name, tools_config
)
graph.add_node(nid, node_fn)
@ -349,6 +446,10 @@ def build_graph_from_blueprint(blueprint: dict) -> Any:
if tid not in edges_by_source:
graph.add_edge(tid, END)
# God Mode: interrupt before every node so the user can approve/reject
if god_mode:
return graph.compile(interrupt_before=all_node_ids)
return graph.compile()
@ -356,20 +457,65 @@ async def run_blueprint_council_async(
blueprint: dict,
input_topic: str,
run_id: str,
god_mode: bool = False,
on_node_event: Optional[Callable[[str, str], Any]] = None,
) -> CouncilState:
"""
Execute a council run using a dynamically built graph from a blueprint.
In auto-pilot mode, the graph runs to completion.
In god mode, the graph pauses before each node via interrupt_before,
allowing human approval through the resume mechanism.
Args:
blueprint: The CouncilBlueprint JSON dict.
input_topic: The user's prompt.
run_id: Unique identifier for this run.
god_mode: If True, pause before each node for human approval.
on_node_event: Optional callback for WebSocket node events.
Returns:
The final CouncilState after execution completes.
"""
from langgraph.checkpoint.memory import MemorySaver
if god_mode:
checkpointer = MemorySaver()
nodes_list = blueprint.get("nodes", [])
all_node_ids = [n["id"] for n in nodes_list]
compiled_graph = _build_graph_with_checkpointer(
blueprint, checkpointer, all_node_ids
)
initial_state = CouncilState(
input_topic=input_topic,
current_draft="",
feedback_history=[],
route_decision="",
messages=[],
iteration_count=0,
critic_score=None,
run_id=run_id,
active_node="",
)
thread_config = {"configurable": {"thread_id": run_id}}
loop = asyncio.get_event_loop()
state = await loop.run_in_executor(
None,
lambda: compiled_graph.invoke(initial_state, config=thread_config),
)
# Store the graph and checkpointer for later resume
_god_mode_sessions[run_id] = {
"graph": compiled_graph,
"checkpointer": checkpointer,
"thread_config": thread_config,
}
return state
compiled_graph = build_graph_from_blueprint(blueprint)
initial_state = CouncilState(
@ -391,3 +537,153 @@ async def run_blueprint_council_async(
)
return final_state
# ---------------------------------------------------------------------------
# God Mode session management
# ---------------------------------------------------------------------------
# In-memory store for active god mode sessions (graph + checkpointer)
_god_mode_sessions: Dict[str, dict] = {}
def _build_graph_with_checkpointer(
blueprint: dict,
checkpointer: Any,
interrupt_node_ids: List[str],
) -> Any:
"""Build a compiled graph with a checkpointer for god mode."""
nodes = blueprint.get("nodes", [])
edges = blueprint.get("edges", [])
if not nodes:
raise ValueError("Blueprint has no nodes.")
node_lookup = {n["id"]: n for n in nodes}
targets = {e["target"] for e in edges}
entry_candidates = [n["id"] for n in nodes if n["id"] not in targets]
if not entry_candidates:
entry_candidates = [nodes[0]["id"]]
entry_node_id = entry_candidates[0]
sources = {e["source"] for e in edges}
terminal_nodes = {n["id"] for n in nodes if n["id"] not in sources}
graph = StateGraph(CouncilState)
for node in nodes:
nid = node["id"]
label = node.get("label", nid)
system_prompt = node.get("systemPrompt", f"You are {label}.")
model_name = node.get("model", "claude-3-5-sonnet")
tools_config = node.get("tools")
if _is_critic_like(system_prompt):
node_fn = _make_critic_node(
nid, label, system_prompt, model_name, tools_config
)
else:
node_fn = _make_agent_node(
nid, label, system_prompt, model_name, tools_config
)
graph.add_node(nid, node_fn)
graph.set_entry_point(entry_node_id)
edges_by_source: Dict[str, Dict[str, list]] = {}
for edge in edges:
src = edge["source"]
if src not in edges_by_source:
edges_by_source[src] = {"linear": [], "conditional": []}
if edge.get("type") == "conditional":
edges_by_source[src]["conditional"].append(edge)
else:
edges_by_source[src]["linear"].append(edge)
for source_id, grouped in edges_by_source.items():
linear = grouped["linear"]
conditional = grouped["conditional"]
if conditional:
linear_target = linear[0]["target"] if linear else None
router = _make_conditional_router(source_id, conditional, linear_target)
route_map: Dict[str, str] = {}
for ce in conditional:
route_map[ce["target"]] = ce["target"]
if linear_target:
route_map[linear_target] = linear_target
graph.add_conditional_edges(source_id, router, route_map)
elif linear:
graph.add_edge(source_id, linear[0]["target"])
for tid in terminal_nodes:
if tid not in edges_by_source:
graph.add_edge(tid, END)
return graph.compile(
checkpointer=checkpointer,
interrupt_before=interrupt_node_ids,
)
async def resume_god_mode(
run_id: str,
action: str = "approve",
modified_state: Optional[dict] = None,
) -> Optional[CouncilState]:
"""
Resume a paused god mode run after human approval.
Args:
run_id: The run ID of the paused session.
action: "approve" to continue, "reject" to stop.
modified_state: Optional partial state override (for "modify" action).
Returns:
The next CouncilState (may be another interrupt or final).
None if the run_id is not found.
"""
session = _god_mode_sessions.get(run_id)
if not session:
return None
if action == "reject":
_god_mode_sessions.pop(run_id, None)
return None
compiled_graph = session["graph"]
thread_config = session["thread_config"]
if modified_state:
compiled_graph.update_state(thread_config, modified_state)
loop = asyncio.get_event_loop()
state = await loop.run_in_executor(
None,
lambda: compiled_graph.invoke(None, config=thread_config),
)
# If state indicates completion, clean up
if state and state.get("route_decision") == "done":
_god_mode_sessions.pop(run_id, None)
return state
def get_god_mode_state(run_id: str) -> Optional[dict]:
"""Get the current state of a paused god mode session."""
session = _god_mode_sessions.get(run_id)
if not session:
return None
graph = session["graph"]
thread_config = session["thread_config"]
snapshot = graph.get_state(thread_config)
return {
"run_id": run_id,
"paused": bool(snapshot.next),
"next_nodes": list(snapshot.next) if snapshot.next else [],
"current_state": dict(snapshot.values) if snapshot.values else {},
}

View file

@ -0,0 +1,80 @@
"""
Run Service CRUD operations for persisted council runs.
Provides async functions to create, read, update, and list council runs
in PostgreSQL. Works alongside the in-memory run_store which handles
real-time status during execution.
"""
from datetime import datetime, timezone
from typing import List, Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.council_run import CouncilRun
async def create_run(
session: AsyncSession,
run_id: str,
input_topic: str,
blueprint_id: Optional[str] = None,
execution_mode: str = "auto-pilot",
) -> CouncilRun:
"""Create a new council run record."""
run = CouncilRun(
id=run_id,
blueprint_id=blueprint_id,
input_topic=input_topic,
status="pending",
execution_mode=execution_mode,
)
session.add(run)
await session.commit()
await session.refresh(run)
return run
async def get_run(session: AsyncSession, run_id: str) -> Optional[CouncilRun]:
"""Get a council run by ID."""
result = await session.execute(select(CouncilRun).where(CouncilRun.id == run_id))
return result.scalar_one_or_none()
async def list_runs(
session: AsyncSession,
limit: int = 50,
offset: int = 0,
) -> List[CouncilRun]:
"""List council runs, ordered by most recent first."""
result = await session.execute(
select(CouncilRun)
.order_by(CouncilRun.created_at.desc())
.limit(limit)
.offset(offset)
)
return list(result.scalars().all())
async def update_run(
session: AsyncSession,
run_id: str,
updates: dict,
) -> Optional[CouncilRun]:
"""Update a council run with the given fields."""
run = await get_run(session, run_id)
if run is None:
return None
for key, value in updates.items():
if hasattr(run, key):
setattr(run, key, value)
# Auto-set completed_at when status becomes terminal
if updates.get("status") in ("completed", "failed"):
run.completed_at = datetime.now(timezone.utc)
await session.commit()
await session.refresh(run)
return run

View file

@ -0,0 +1,192 @@
"""
Tests for God Mode (interrupt_before) functionality.
All LLM calls are mocked no real API calls are made in these tests.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from unittest.mock import patch, MagicMock
from state import CouncilState
class TestBuildGraphGodMode:
"""Tests for graph compilation with god mode (interrupt_before)."""
def _make_simple_blueprint(self):
return {
"version": 1,
"name": "Test Council",
"nodes": [
{
"id": "master",
"label": "Master AI",
"systemPrompt": "You are the master writer.",
"model": "claude-3-5-sonnet",
"tools": {"webSearch": False, "pdfReader": False},
},
{
"id": "critic",
"label": "Critic AI",
"systemPrompt": "You are a critic who evaluates and scores drafts.",
"model": "claude-3-5-sonnet",
"tools": {"webSearch": False, "pdfReader": False},
},
],
"edges": [
{"id": "e1", "source": "master", "target": "critic", "type": "linear"},
],
}
@patch("services.dynamic_graph_builder._get_llm")
def test_build_graph_with_god_mode_compiles(self, mock_get_llm):
"""God mode graph should compile without error."""
from services.dynamic_graph_builder import build_graph_from_blueprint
blueprint = self._make_simple_blueprint()
graph = build_graph_from_blueprint(blueprint, god_mode=False)
assert graph is not None
def test_build_graph_without_god_mode(self):
"""Normal graph should compile without interrupt_before."""
from services.dynamic_graph_builder import build_graph_from_blueprint
blueprint = self._make_simple_blueprint()
graph = build_graph_from_blueprint(blueprint, god_mode=False)
assert graph is not None
class TestGodModeSessionManagement:
"""Tests for god mode session management functions."""
def test_get_god_mode_state_returns_none_for_unknown_run(self):
from services.dynamic_graph_builder import get_god_mode_state
result = get_god_mode_state("nonexistent-run-id")
assert result is None
@pytest.mark.asyncio
async def test_resume_god_mode_returns_none_for_unknown_run(self):
from services.dynamic_graph_builder import resume_god_mode
result = await resume_god_mode("nonexistent-run-id", action="approve")
assert result is None
@pytest.mark.asyncio
async def test_resume_god_mode_reject_cleans_up(self):
from services.dynamic_graph_builder import (
_god_mode_sessions,
resume_god_mode,
)
# Manually insert a fake session
_god_mode_sessions["test-run"] = {
"graph": MagicMock(),
"checkpointer": MagicMock(),
"thread_config": {"configurable": {"thread_id": "test-run"}},
}
result = await resume_god_mode("test-run", action="reject")
assert result is None
assert "test-run" not in _god_mode_sessions
class TestToolResolution:
"""Tests for the tool resolution helper."""
def test_resolve_tools_none_config(self):
from services.dynamic_graph_builder import _resolve_tools
assert _resolve_tools(None) == []
def test_resolve_tools_empty_config(self):
from services.dynamic_graph_builder import _resolve_tools
assert _resolve_tools({}) == []
def test_resolve_tools_web_search_only(self):
from services.dynamic_graph_builder import _resolve_tools
tools = _resolve_tools({"webSearch": True, "pdfReader": False})
assert len(tools) == 1
assert tools[0].name == "web_search"
def test_resolve_tools_pdf_only(self):
from services.dynamic_graph_builder import _resolve_tools
tools = _resolve_tools({"webSearch": False, "pdfReader": True})
assert len(tools) == 1
assert tools[0].name == "pdf_search"
def test_resolve_tools_both(self):
from services.dynamic_graph_builder import _resolve_tools
tools = _resolve_tools({"webSearch": True, "pdfReader": True})
assert len(tools) == 2
names = {t.name for t in tools}
assert names == {"web_search", "pdf_search"}
class TestInvokeWithTools:
"""Tests for the _invoke_with_tools helper."""
def test_invoke_without_tools_calls_llm_directly(self):
from services.dynamic_graph_builder import _invoke_with_tools
mock_llm = MagicMock()
mock_response = MagicMock()
mock_response.content = "Test response"
mock_llm.invoke.return_value = mock_response
result = _invoke_with_tools(mock_llm, ["msg1", "msg2"], [])
mock_llm.invoke.assert_called_once_with(["msg1", "msg2"])
assert result == mock_response
def test_invoke_with_tools_no_tool_calls(self):
from services.dynamic_graph_builder import _invoke_with_tools
mock_llm = MagicMock()
mock_bound = MagicMock()
mock_llm.bind_tools.return_value = mock_bound
mock_response = MagicMock()
mock_response.tool_calls = []
mock_response.content = "No tools needed"
mock_bound.invoke.return_value = mock_response
mock_tool = MagicMock()
mock_tool.name = "test_tool"
result = _invoke_with_tools(mock_llm, ["msg"], [mock_tool])
assert result == mock_response
def test_invoke_with_tools_executes_tool_calls(self):
from services.dynamic_graph_builder import _invoke_with_tools
mock_llm = MagicMock()
mock_bound = MagicMock()
mock_llm.bind_tools.return_value = mock_bound
# First call returns tool_calls
mock_response_with_tools = MagicMock()
mock_response_with_tools.tool_calls = [
{"name": "web_search", "args": {"query": "test"}, "id": "call-1"}
]
# Second call returns final answer
mock_final_response = MagicMock()
mock_final_response.content = "Final answer"
mock_bound.invoke.side_effect = [mock_response_with_tools, mock_final_response]
mock_tool = MagicMock()
mock_tool.name = "web_search"
mock_tool.invoke.return_value = "Search results"
result = _invoke_with_tools(mock_llm, ["msg"], [mock_tool])
mock_tool.invoke.assert_called_once_with({"query": "test"})
assert result == mock_final_response

View file

@ -0,0 +1,82 @@
"""
Tests for the run history service and CouncilRun model.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from unittest.mock import AsyncMock, MagicMock, patch
class TestCouncilRunModel:
"""Tests for the CouncilRun SQLAlchemy model."""
def test_to_dict_serialization(self):
from models.council_run import CouncilRun
from datetime import datetime, timezone
run = CouncilRun(
id="test-id",
blueprint_id="bp-id",
input_topic="Test topic",
status="completed",
execution_mode="auto-pilot",
final_draft="Final text",
critic_score=8.5,
iteration_count=3,
active_node="done",
error=None,
created_at=datetime(2026, 1, 1, tzinfo=timezone.utc),
completed_at=datetime(2026, 1, 1, 0, 5, tzinfo=timezone.utc),
)
d = run.to_dict()
assert d["id"] == "test-id"
assert d["blueprint_id"] == "bp-id"
assert d["status"] == "completed"
assert d["critic_score"] == 8.5
assert d["iteration_count"] == 3
assert d["created_at"] is not None
assert d["completed_at"] is not None
def test_to_dict_with_none_timestamps(self):
from models.council_run import CouncilRun
run = CouncilRun(
id="test-id",
input_topic="Test",
status="pending",
execution_mode="god-mode",
created_at=None,
completed_at=None,
)
d = run.to_dict()
assert d["created_at"] is None
assert d["completed_at"] is None
assert d["execution_mode"] == "god-mode"
class TestRunHistoryRoutes:
"""Tests for the run history API routes."""
@pytest.mark.asyncio
async def test_list_runs_empty(self):
"""List runs returns empty list when no runs exist."""
from api.run_history_routes import list_all_runs
mock_session = AsyncMock()
mock_result = MagicMock()
mock_scalars = MagicMock()
mock_scalars.all.return_value = []
mock_result.scalars.return_value = mock_scalars
mock_session.execute.return_value = mock_result
with patch("services.run_service.list_runs") as mock_list:
mock_list.return_value = []
result = await list_all_runs(limit=50, offset=0, session=mock_session)
assert result == []

170
backend/tests/test_tools.py Normal file
View file

@ -0,0 +1,170 @@
"""
Tests for agent tools (web search and PDF reader).
All external API calls are mocked no real calls to Tavily or ChromaDB.
"""
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
import pytest
from unittest.mock import patch, MagicMock
class TestWebSearchTool:
"""Tests for the Tavily web search tool."""
@patch.dict(os.environ, {"TAVILY_API_KEY": ""}, clear=False)
def test_web_search_returns_error_without_api_key(self):
from tools.web_search import web_search
result = web_search.invoke({"query": "test query"})
assert "TAVILY_API_KEY" in result
@patch.dict(os.environ, {"TAVILY_API_KEY": "test-key"}, clear=False)
@patch("tools.web_search.TavilyClient")
def test_web_search_returns_formatted_results(self, mock_client_cls):
mock_client = MagicMock()
mock_client.search.return_value = {
"results": [
{
"title": "Test Result",
"url": "https://example.com",
"content": "Some content here",
}
]
}
mock_client_cls.return_value = mock_client
from tools.web_search import web_search
result = web_search.invoke({"query": "test query"})
assert "Test Result" in result
assert "https://example.com" in result
assert "Some content here" in result
@patch.dict(os.environ, {"TAVILY_API_KEY": "test-key"}, clear=False)
@patch("tools.web_search.TavilyClient")
def test_web_search_handles_empty_results(self, mock_client_cls):
mock_client = MagicMock()
mock_client.search.return_value = {"results": []}
mock_client_cls.return_value = mock_client
from tools.web_search import web_search
result = web_search.invoke({"query": "obscure query"})
assert "No results" in result
@patch.dict(os.environ, {"TAVILY_API_KEY": "test-key"}, clear=False)
@patch("tools.web_search.TavilyClient")
def test_web_search_handles_api_error(self, mock_client_cls):
mock_client = MagicMock()
mock_client.search.side_effect = Exception("API rate limit")
mock_client_cls.return_value = mock_client
from tools.web_search import web_search
result = web_search.invoke({"query": "test"})
assert "Error" in result
assert "rate limit" in result
class TestCreateWebSearchTool:
"""Tests for the web search tool factory."""
@patch.dict(os.environ, {"TAVILY_API_KEY": "test-key"}, clear=False)
def test_factory_returns_tool_when_key_set(self):
from tools.web_search import create_web_search_tool
tool = create_web_search_tool()
assert tool is not None
@patch.dict(os.environ, {}, clear=True)
def test_factory_returns_none_when_key_missing(self):
from tools.web_search import create_web_search_tool
tool = create_web_search_tool()
assert tool is None
class TestPdfSearchTool:
"""Tests for the PDF reader tool."""
@patch("tools.pdf_reader._get_chroma_collection")
def test_pdf_search_empty_collection(self, mock_get_collection):
mock_collection = MagicMock()
mock_collection.count.return_value = 0
mock_get_collection.return_value = mock_collection
from tools.pdf_reader import pdf_search
result = pdf_search.invoke({"query": "test query"})
assert "No documents" in result
@patch("tools.pdf_reader._get_chroma_collection")
def test_pdf_search_returns_results(self, mock_get_collection):
mock_collection = MagicMock()
mock_collection.count.return_value = 3
mock_collection.query.return_value = {
"documents": [["First passage about AI.", "Second passage about ML."]],
"metadatas": [[
{"source": "paper.pdf", "page": 1},
{"source": "paper.pdf", "page": 3},
]],
}
mock_get_collection.return_value = mock_collection
from tools.pdf_reader import pdf_search
result = pdf_search.invoke({"query": "AI concepts"})
assert "paper.pdf" in result
assert "First passage" in result
assert "Page 1" in result
@patch("tools.pdf_reader._get_chroma_collection")
def test_pdf_search_handles_error(self, mock_get_collection):
mock_get_collection.side_effect = Exception("ChromaDB unavailable")
from tools.pdf_reader import pdf_search
result = pdf_search.invoke({"query": "test"})
assert "Error" in result
class TestPdfIngestion:
"""Tests for PDF ingestion into ChromaDB."""
@patch("tools.pdf_reader._get_chroma_collection")
@patch("tools.pdf_reader.PdfReader")
def test_ingest_pdf_processes_pages(self, mock_pdf_reader_cls, mock_get_collection):
# Mock PDF with 2 pages of text
mock_page1 = MagicMock()
mock_page1.extract_text.return_value = "This is the first page with some content " * 20
mock_page2 = MagicMock()
mock_page2.extract_text.return_value = "Second page about machine learning " * 20
mock_reader = MagicMock()
mock_reader.pages = [mock_page1, mock_page2]
mock_pdf_reader_cls.return_value = mock_reader
mock_collection = MagicMock()
mock_get_collection.return_value = mock_collection
from tools.pdf_reader import ingest_pdf
chunks = ingest_pdf("/tmp/test.pdf")
assert chunks > 0
mock_collection.upsert.assert_called_once()
@patch("tools.pdf_reader._get_chroma_collection")
@patch("tools.pdf_reader.PdfReader")
def test_ingest_pdf_empty_file(self, mock_pdf_reader_cls, mock_get_collection):
mock_reader = MagicMock()
mock_reader.pages = []
mock_pdf_reader_cls.return_value = mock_reader
from tools.pdf_reader import ingest_pdf
chunks = ingest_pdf("/tmp/empty.pdf")
assert chunks == 0

View file

@ -1,7 +1,12 @@
"""
Agent tools for CouncilOS.
"""Agent tools for CouncilOS."""
Phase 4 will add:
- web_search_tool: Tavily Search API wrapper
- pdf_reader_tool: PyPDF + ChromaDB vector store wrapper
"""
from .web_search import web_search, create_web_search_tool
from .pdf_reader import pdf_search, ingest_pdf, create_pdf_search_tool
__all__ = [
"web_search",
"create_web_search_tool",
"pdf_search",
"ingest_pdf",
"create_pdf_search_tool",
]

140
backend/tools/pdf_reader.py Normal file
View file

@ -0,0 +1,140 @@
"""
PDF Reader Tool PyPDF + ChromaDB vector store wrapper for agent nodes.
Loads PDF files, splits them into chunks, stores embeddings in a local
ChromaDB collection, and performs similarity search against queries.
Requires the CHROMA_PERSIST_DIR environment variable for storage location.
"""
import os
from typing import List, Optional
from langchain_core.tools import tool
# Module-level collection cache to avoid re-initializing on every call
_collection_cache: dict = {}
def _get_chroma_collection(collection_name: str = "council_pdfs"):
"""Get or create a ChromaDB collection for PDF content."""
if collection_name in _collection_cache:
return _collection_cache[collection_name]
import chromadb
persist_dir = os.environ.get("CHROMA_PERSIST_DIR", "./chroma_db")
client = chromadb.PersistentClient(path=persist_dir)
collection = client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"},
)
_collection_cache[collection_name] = collection
return collection
def ingest_pdf(file_path: str, collection_name: str = "council_pdfs") -> int:
"""
Read a PDF file, split into chunks, and store in ChromaDB.
Args:
file_path: Path to the PDF file.
collection_name: ChromaDB collection name.
Returns:
Number of chunks ingested.
"""
from pypdf import PdfReader
reader = PdfReader(file_path)
chunks: List[str] = []
metadata_list: List[dict] = []
for page_num, page in enumerate(reader.pages):
text = page.extract_text()
if not text or not text.strip():
continue
# Split long pages into ~500 character chunks with overlap
words = text.split()
chunk_size = 100 # words per chunk
overlap = 20
for i in range(0, len(words), chunk_size - overlap):
chunk_words = words[i : i + chunk_size]
chunk_text = " ".join(chunk_words)
if chunk_text.strip():
chunks.append(chunk_text)
metadata_list.append({
"source": os.path.basename(file_path),
"page": page_num + 1,
})
if not chunks:
return 0
collection = _get_chroma_collection(collection_name)
# Generate deterministic IDs based on file and chunk position
ids = [
f"{os.path.basename(file_path)}_chunk_{i}"
for i in range(len(chunks))
]
collection.upsert(
documents=chunks,
metadatas=metadata_list,
ids=ids,
)
return len(chunks)
@tool
def pdf_search(query: str, n_results: int = 5) -> str:
"""
Search the PDF knowledge base for information relevant to a query.
Args:
query: The search query to find relevant PDF content.
n_results: Number of results to return (default 5).
Returns:
A formatted string with relevant passages from ingested PDFs.
"""
try:
collection = _get_chroma_collection()
except Exception as exc: # noqa: BLE001
return f"[PDF Search Error] Could not access vector store: {exc}"
if collection.count() == 0:
return "[PDF Search] No documents have been ingested yet."
try:
results = collection.query(
query_texts=[query],
n_results=min(n_results, collection.count()),
)
except Exception as exc: # noqa: BLE001
return f"[PDF Search Error] {exc}"
documents = results.get("documents", [[]])[0]
metadatas = results.get("metadatas", [[]])[0]
if not documents:
return f"No relevant passages found for: {query}"
formatted = []
for i, (doc, meta) in enumerate(zip(documents, metadatas), 1):
source = meta.get("source", "unknown")
page = meta.get("page", "?")
formatted.append(f"{i}. [Source: {source}, Page {page}]\n {doc}")
return "\n\n".join(formatted)
def create_pdf_search_tool() -> Optional[tool]:
"""Factory that returns the pdf_search tool if ChromaDB is configured."""
persist_dir = os.environ.get("CHROMA_PERSIST_DIR", "./chroma_db")
if persist_dir:
return pdf_search
return None

View file

@ -0,0 +1,61 @@
"""
Web Search Tool Tavily Search API wrapper for agent nodes.
Provides a LangChain-compatible tool that agents can use to search the web
for current information. Requires the TAVILY_API_KEY environment variable.
"""
import os
from typing import Optional
from langchain_core.tools import tool
@tool
def web_search(query: str, max_results: int = 5) -> str:
"""
Search the web for current information on a topic.
Args:
query: The search query string.
max_results: Maximum number of results to return (default 5).
Returns:
A formatted string with search results including titles, URLs, and snippets.
"""
from tavily import TavilyClient
api_key = os.environ.get("TAVILY_API_KEY")
if not api_key:
return "[Web Search Error] TAVILY_API_KEY environment variable is not set."
client = TavilyClient(api_key=api_key)
try:
response = client.search(
query=query,
max_results=max_results,
search_depth="basic",
)
except Exception as exc: # noqa: BLE001
return f"[Web Search Error] {exc}"
results = response.get("results", [])
if not results:
return f"No results found for: {query}"
formatted = []
for i, r in enumerate(results, 1):
title = r.get("title", "No title")
url = r.get("url", "")
content = r.get("content", "No content available")
formatted.append(f"{i}. **{title}**\n URL: {url}\n {content}")
return "\n\n".join(formatted)
def create_web_search_tool() -> Optional[tool]:
"""Factory that returns the web_search tool if Tavily is configured."""
if os.environ.get("TAVILY_API_KEY"):
return web_search
return None

View file

@ -0,0 +1,34 @@
import { describe, it, expect } from "vitest";
import { wsUrl } from "@/app/utils/api-client";
describe("wsUrl", () => {
it("should convert http to ws", () => {
const url = wsUrl("test-run-id");
expect(url).toBe("ws://localhost:8000/ws/council/test-run-id");
});
});
describe("API client types", () => {
it("should export runApi with expected methods", async () => {
const { runApi } = await import("@/app/utils/api-client");
expect(runApi.start).toBeDefined();
expect(runApi.startFromBlueprint).toBeDefined();
expect(runApi.status).toBeDefined();
expect(runApi.approve).toBeDefined();
expect(runApi.getState).toBeDefined();
});
it("should export councilApi with expected methods", async () => {
const { councilApi } = await import("@/app/utils/api-client");
expect(councilApi.list).toBeDefined();
expect(councilApi.get).toBeDefined();
expect(councilApi.create).toBeDefined();
expect(councilApi.update).toBeDefined();
expect(councilApi.delete).toBeDefined();
});
it("should export pdfApi with upload method", async () => {
const { pdfApi } = await import("@/app/utils/api-client");
expect(pdfApi.upload).toBeDefined();
});
});

View file

@ -0,0 +1,148 @@
import { describe, it, expect } from "vitest";
import { parseGraphToBlueprint, parseBlueprintToGraph } from "@/app/utils/blueprint-parser";
import { Node, Edge } from "@xyflow/react";
import { AgentNodeData, CouncilBlueprint } from "@/app/types/council";
describe("parseGraphToBlueprint", () => {
it("should convert React Flow nodes and edges to blueprint format", () => {
const nodes: Node<AgentNodeData>[] = [
{
id: "n1",
type: "agentNode",
position: { x: 100, y: 200 },
data: {
label: "Master Agent",
systemPrompt: "You are the master writer.",
model: "claude-3-5-sonnet",
tools: { webSearch: true, pdfReader: false },
},
},
{
id: "n2",
type: "agentNode",
position: { x: 400, y: 200 },
data: {
label: "Critic Agent",
systemPrompt: "You evaluate drafts.",
model: "gpt-4o",
tools: { webSearch: false, pdfReader: true },
},
},
];
const edges: Edge[] = [
{
id: "e1",
source: "n1",
target: "n2",
type: "default",
data: { type: "linear" },
},
];
const blueprint = parseGraphToBlueprint(nodes, edges, "Test Council");
expect(blueprint.version).toBe(1);
expect(blueprint.name).toBe("Test Council");
expect(blueprint.nodes).toHaveLength(2);
expect(blueprint.edges).toHaveLength(1);
expect(blueprint.nodes[0].label).toBe("Master Agent");
expect(blueprint.nodes[0].tools.webSearch).toBe(true);
expect(blueprint.nodes[1].model).toBe("gpt-4o");
expect(blueprint.edges[0].type).toBe("linear");
expect(blueprint.edges[0].source).toBe("n1");
expect(blueprint.edges[0].target).toBe("n2");
});
it("should handle conditional edges with condition labels", () => {
const nodes: Node<AgentNodeData>[] = [
{
id: "n1",
type: "agentNode",
position: { x: 0, y: 0 },
data: {
label: "A",
systemPrompt: "",
model: "claude-3-5-sonnet",
tools: { webSearch: false, pdfReader: false },
},
},
];
const edges: Edge[] = [
{
id: "e1",
source: "n1",
target: "n2",
type: "conditionalEdge",
data: { type: "conditional", condition: "approve" },
},
];
const blueprint = parseGraphToBlueprint(nodes, edges, "Test");
expect(blueprint.edges[0].type).toBe("conditional");
expect(blueprint.edges[0].condition).toBe("approve");
});
it("should preserve existing blueprint ID", () => {
const blueprint = parseGraphToBlueprint([], [], "Test", "existing-id");
expect(blueprint.id).toBe("existing-id");
});
});
describe("parseBlueprintToGraph", () => {
it("should convert blueprint back to React Flow format", () => {
const blueprint: CouncilBlueprint = {
version: 1,
name: "Test",
nodes: [
{
id: "n1",
label: "Master",
systemPrompt: "You are the master.",
model: "claude-3-5-sonnet",
tools: { webSearch: true, pdfReader: false },
position: { x: 100, y: 200 },
},
],
edges: [
{
id: "e1",
source: "n1",
target: "n2",
type: "conditional",
condition: "rework",
},
],
};
const { nodes, edges } = parseBlueprintToGraph(blueprint);
expect(nodes).toHaveLength(1);
expect(nodes[0].type).toBe("agentNode");
expect(nodes[0].data.label).toBe("Master");
expect(nodes[0].data.tools.webSearch).toBe(true);
expect(edges).toHaveLength(1);
expect(edges[0].type).toBe("conditionalEdge");
expect(edges[0].data?.condition).toBe("rework");
expect(edges[0].animated).toBe(true);
});
it("should handle linear edges", () => {
const blueprint: CouncilBlueprint = {
version: 1,
name: "Test",
nodes: [],
edges: [
{ id: "e1", source: "a", target: "b", type: "linear" },
],
};
const { edges } = parseBlueprintToGraph(blueprint);
expect(edges[0].type).toBe("default");
expect(edges[0].animated).toBe(false);
});
});

View file

@ -0,0 +1,178 @@
import { describe, it, expect, beforeEach } from "vitest";
import { useCouncilStore } from "@/app/store/council-store";
describe("CouncilStore", () => {
beforeEach(() => {
// Reset store state between tests
useCouncilStore.setState({
nodes: [],
edges: [],
selectedNodeId: null,
selectedEdgeId: null,
councilName: "Mein Rat",
activeRun: null,
activeNodeId: null,
});
});
it("should have default state", () => {
const state = useCouncilStore.getState();
expect(state.nodes).toEqual([]);
expect(state.edges).toEqual([]);
expect(state.selectedNodeId).toBeNull();
expect(state.selectedEdgeId).toBeNull();
expect(state.councilName).toBe("Mein Rat");
});
it("should add an agent node", () => {
const { addAgentNode } = useCouncilStore.getState();
addAgentNode({ x: 100, y: 200 });
const { nodes } = useCouncilStore.getState();
expect(nodes).toHaveLength(1);
expect(nodes[0].position).toEqual({ x: 100, y: 200 });
expect(nodes[0].type).toBe("agentNode");
expect(nodes[0].data.label).toBe("Agent 1");
expect(nodes[0].data.model).toBe("claude-3-5-sonnet");
expect(nodes[0].data.tools).toEqual({ webSearch: false, pdfReader: false });
});
it("should update node data", () => {
const { addAgentNode } = useCouncilStore.getState();
addAgentNode({ x: 0, y: 0 });
const { nodes, updateNodeData } = useCouncilStore.getState();
const nodeId = nodes[0].id;
updateNodeData(nodeId, { label: "Master Agent", model: "gpt-4o" });
const updated = useCouncilStore.getState().nodes[0];
expect(updated.data.label).toBe("Master Agent");
expect(updated.data.model).toBe("gpt-4o");
});
it("should select a node and deselect edge", () => {
const { selectNode } = useCouncilStore.getState();
selectNode("node-1");
const state = useCouncilStore.getState();
expect(state.selectedNodeId).toBe("node-1");
expect(state.selectedEdgeId).toBeNull();
});
it("should select an edge and deselect node", () => {
const { selectEdge, selectNode } = useCouncilStore.getState();
selectNode("node-1");
selectEdge("edge-1");
const state = useCouncilStore.getState();
expect(state.selectedEdgeId).toBe("edge-1");
expect(state.selectedNodeId).toBeNull();
});
it("should update edge data to conditional", () => {
useCouncilStore.setState({
edges: [
{
id: "e1",
source: "a",
target: "b",
type: "default",
data: { type: "linear" },
},
],
});
const { updateEdgeData } = useCouncilStore.getState();
updateEdgeData("e1", "conditional", "rework");
const { edges } = useCouncilStore.getState();
expect(edges[0].type).toBe("conditionalEdge");
expect(edges[0].data?.type).toBe("conditional");
expect(edges[0].data?.condition).toBe("rework");
expect(edges[0].animated).toBe(true);
});
it("should update edge data back to linear", () => {
useCouncilStore.setState({
edges: [
{
id: "e1",
source: "a",
target: "b",
type: "conditionalEdge",
data: { type: "conditional", condition: "approve" },
animated: true,
},
],
});
const { updateEdgeData } = useCouncilStore.getState();
updateEdgeData("e1", "linear");
const { edges } = useCouncilStore.getState();
expect(edges[0].type).toBe("default");
expect(edges[0].data?.type).toBe("linear");
expect(edges[0].animated).toBe(false);
});
it("should mark a node as active by name", () => {
useCouncilStore.setState({
nodes: [
{
id: "n1",
type: "agentNode",
position: { x: 0, y: 0 },
data: {
label: "Master Agent",
systemPrompt: "",
model: "claude-3-5-sonnet" as const,
tools: { webSearch: false, pdfReader: false },
isActive: false,
},
},
],
});
const { markNodeActive } = useCouncilStore.getState();
markNodeActive("Master Agent");
const { nodes, activeNodeId } = useCouncilStore.getState();
expect(activeNodeId).toBe("n1");
expect(nodes[0].data.isActive).toBe(true);
});
it("should clear active node", () => {
useCouncilStore.setState({
activeNodeId: "n1",
nodes: [
{
id: "n1",
type: "agentNode",
position: { x: 0, y: 0 },
data: {
label: "Test",
systemPrompt: "",
model: "claude-3-5-sonnet" as const,
tools: { webSearch: false, pdfReader: false },
isActive: true,
},
},
],
});
const { clearActiveNode } = useCouncilStore.getState();
clearActiveNode();
const { nodes, activeNodeId } = useCouncilStore.getState();
expect(activeNodeId).toBeNull();
expect(nodes[0].data.isActive).toBe(false);
});
it("should set council name", () => {
const { setCouncilName } = useCouncilStore.getState();
setCouncilName("Test Rat");
expect(useCouncilStore.getState().councilName).toBe("Test Rat");
});
});

View file

@ -0,0 +1,80 @@
import { describe, it, expect } from "vitest";
import type {
AgentNodeData,
CouncilBlueprint,
ExecutionMode,
GodModeAction,
GodModeState,
RunStatus,
WSEventType,
WSMessage,
} from "@/app/types/council";
describe("Council types", () => {
it("should support all run statuses", () => {
const statuses: RunStatus[] = ["pending", "running", "completed", "failed", "paused"];
expect(statuses).toHaveLength(5);
});
it("should support execution modes", () => {
const modes: ExecutionMode[] = ["auto-pilot", "god-mode"];
expect(modes).toHaveLength(2);
});
it("should support god mode actions", () => {
const actions: GodModeAction[] = ["approve", "reject", "modify"];
expect(actions).toHaveLength(3);
});
it("should support all WS event types", () => {
const events: WSEventType[] = [
"connected",
"node_active",
"run_paused",
"run_resumed",
"run_complete",
"run_failed",
];
expect(events).toHaveLength(6);
});
it("should enforce AgentNodeData structure", () => {
const data: AgentNodeData = {
label: "Test Agent",
systemPrompt: "You are a test agent.",
model: "claude-3-5-sonnet",
tools: { webSearch: true, pdfReader: false },
isActive: false,
};
expect(data.label).toBe("Test Agent");
expect(data.tools.webSearch).toBe(true);
});
it("should enforce GodModeState structure", () => {
const state: GodModeState = {
run_id: "test-run",
paused: true,
next_nodes: ["critic"],
current_state: {
current_draft: "Draft text",
critic_score: 6.5,
iteration_count: 2,
},
};
expect(state.paused).toBe(true);
expect(state.next_nodes).toContain("critic");
expect(state.current_state.critic_score).toBe(6.5);
});
it("should enforce WSMessage structure", () => {
const msg: WSMessage = {
event: "run_paused",
run_id: "test",
next_nodes: ["agent1"],
current_draft: "Draft",
critic_score: 7.0,
};
expect(msg.event).toBe("run_paused");
expect(msg.next_nodes).toHaveLength(1);
});
});

View file

@ -8,6 +8,7 @@ import {
MiniMap,
BackgroundVariant,
useReactFlow,
Edge,
} from "@xyflow/react";
import "@xyflow/react/dist/style.css";
@ -28,6 +29,7 @@ export function ArchitectCanvas() {
const onConnect = useCouncilStore((s) => s.onConnect);
const addAgentNode = useCouncilStore((s) => s.addAgentNode);
const selectNode = useCouncilStore((s) => s.selectNode);
const selectEdge = useCouncilStore((s) => s.selectEdge);
const { screenToFlowPosition } = useReactFlow();
@ -53,7 +55,15 @@ export function ArchitectCanvas() {
const onPaneClick = useCallback(() => {
selectNode(null);
}, [selectNode]);
selectEdge(null);
}, [selectNode, selectEdge]);
const onEdgeClick = useCallback(
(_event: React.MouseEvent, edge: Edge) => {
selectEdge(edge.id);
},
[selectEdge]
);
return (
<div className="flex-1 h-full">
@ -66,6 +76,7 @@ export function ArchitectCanvas() {
onDrop={onDrop}
onDragOver={onDragOver}
onPaneClick={onPaneClick}
onEdgeClick={onEdgeClick}
nodeTypes={NODE_TYPES}
edgeTypes={EDGE_TYPES}
fitView

View file

@ -0,0 +1,146 @@
"use client";
import { useEffect, useState } from "react";
import { X, ArrowRight } from "lucide-react";
import { EdgeType } from "@/app/types/council";
import { useCouncilStore } from "@/app/store/council-store";
// Right-side panel shown when a canvas edge is selected
export function EdgeSettingsPanel() {
const selectedEdgeId = useCouncilStore((s) => s.selectedEdgeId);
const edges = useCouncilStore((s) => s.edges);
const nodes = useCouncilStore((s) => s.nodes);
const updateEdgeData = useCouncilStore((s) => s.updateEdgeData);
const selectEdge = useCouncilStore((s) => s.selectEdge);
const edge = edges.find((e) => e.id === selectedEdgeId);
const [edgeType, setEdgeType] = useState<EdgeType>("linear");
const [condition, setCondition] = useState("");
useEffect(() => {
if (edge) {
setEdgeType((edge.data?.type as EdgeType) ?? "linear");
setCondition((edge.data?.condition as string) ?? "");
}
}, [selectedEdgeId, edge]);
if (!selectedEdgeId || !edge) return null;
const sourceNode = nodes.find((n) => n.id === edge.source);
const targetNode = nodes.find((n) => n.id === edge.target);
const handleTypeChange = (newType: EdgeType) => {
setEdgeType(newType);
updateEdgeData(selectedEdgeId, newType, newType === "conditional" ? condition : undefined);
};
const handleConditionChange = (value: string) => {
setCondition(value);
updateEdgeData(selectedEdgeId, edgeType, value);
};
return (
<aside className="w-72 flex-shrink-0 bg-white border-l border-slate-200 p-4 flex flex-col gap-4 overflow-y-auto">
{/* Header */}
<div className="flex items-center gap-2">
<ArrowRight size={16} className="text-indigo-600" />
<h2 className="font-semibold text-slate-800 text-sm flex-1">
Kanten-Einstellungen
</h2>
<button
onClick={() => selectEdge(null)}
className="text-slate-400 hover:text-slate-600"
>
<X size={16} />
</button>
</div>
{/* Connection info */}
<div className="rounded-lg bg-slate-50 p-3 text-xs text-slate-600 space-y-1">
<p>
<strong>Von:</strong>{" "}
{sourceNode ? (sourceNode.data as { label: string }).label : edge.source}
</p>
<p>
<strong>Nach:</strong>{" "}
{targetNode ? (targetNode.data as { label: string }).label : edge.target}
</p>
</div>
{/* Edge type */}
<div className="flex flex-col gap-1">
<label className="text-xs font-medium text-slate-500">Typ</label>
<div className="flex gap-2">
<button
onClick={() => handleTypeChange("linear")}
className={[
"flex-1 text-sm px-3 py-2 rounded-lg border transition-colors",
edgeType === "linear"
? "bg-slate-100 border-slate-400 text-slate-800 font-medium"
: "bg-white border-slate-200 text-slate-500 hover:border-slate-300",
].join(" ")}
>
Linear
</button>
<button
onClick={() => handleTypeChange("conditional")}
className={[
"flex-1 text-sm px-3 py-2 rounded-lg border transition-colors",
edgeType === "conditional"
? "bg-indigo-50 border-indigo-400 text-indigo-800 font-medium"
: "bg-white border-slate-200 text-slate-500 hover:border-slate-300",
].join(" ")}
>
Bedingt
</button>
</div>
</div>
{/* Condition value (only for conditional edges) */}
{edgeType === "conditional" && (
<div className="flex flex-col gap-1">
<label className="text-xs font-medium text-slate-500">
Bedingung (Routing-Wert)
</label>
<input
type="text"
value={condition}
onChange={(e) => handleConditionChange(e.target.value)}
placeholder='z.B. "rework" oder "approve"'
className="rounded-lg border border-slate-200 px-3 py-2 text-sm focus:outline-none focus:ring-2 focus:ring-indigo-300"
/>
<p className="text-xs text-slate-400 mt-1">
Dieser Wert wird mit <code className="bg-slate-100 px-1 rounded">route_decision</code> im
State verglichen, um den Pfad zu bestimmen.
</p>
</div>
)}
{/* Preset conditions */}
{edgeType === "conditional" && (
<div className="flex flex-col gap-1">
<label className="text-xs font-medium text-slate-500">
Schnellauswahl
</label>
<div className="flex gap-2 flex-wrap">
{["approve", "rework", "done", "escalate"].map((preset) => (
<button
key={preset}
onClick={() => handleConditionChange(preset)}
className={[
"text-xs px-2 py-1 rounded-full border transition-colors",
condition === preset
? "bg-indigo-600 text-white border-indigo-600"
: "bg-white text-slate-600 border-slate-200 hover:border-indigo-300",
].join(" ")}
>
{preset}
</button>
))}
</div>
</div>
)}
</aside>
);
}

View file

@ -0,0 +1,113 @@
"use client";
import { useState } from "react";
import { Check, X, Pencil, Shield } from "lucide-react";
import { GodModeAction } from "@/app/types/council";
import { PauseInfo } from "@/app/hooks/useCouncilWebSocket";
interface Props {
pauseInfo: PauseInfo;
onAction: (action: GodModeAction, modifiedDraft?: string) => void;
isResuming: boolean;
}
// God Mode approval panel — shown when the graph pauses at a node
export function GodModePanel({ pauseInfo, onAction, isResuming }: Props) {
const [editMode, setEditMode] = useState(false);
const [editedDraft, setEditedDraft] = useState(pauseInfo.current_draft);
const handleModify = () => {
if (editMode) {
onAction("modify", editedDraft);
setEditMode(false);
} else {
setEditedDraft(pauseInfo.current_draft);
setEditMode(true);
}
};
return (
<div className="rounded-xl border-2 border-amber-300 bg-amber-50 p-4 space-y-3">
{/* Header */}
<div className="flex items-center gap-2">
<Shield size={18} className="text-amber-600" />
<h3 className="font-semibold text-sm text-amber-800">
God Mode Freigabe erforderlich
</h3>
</div>
{/* Info about which node is next */}
<div className="text-xs text-amber-700 space-y-1">
<p>
<strong>Nächster Agent:</strong>{" "}
{pauseInfo.next_nodes.join(", ") || "—"}
</p>
{pauseInfo.iteration_count != null && (
<p>
<strong>Iteration:</strong> {pauseInfo.iteration_count}
</p>
)}
{pauseInfo.critic_score != null && (
<p>
<strong>Bewertung:</strong> {pauseInfo.critic_score}/10
</p>
)}
</div>
{/* Current draft preview / editor */}
<div className="flex flex-col gap-1">
<label className="text-xs font-medium text-amber-700">
Aktueller Entwurf
</label>
{editMode ? (
<textarea
value={editedDraft}
onChange={(e) => setEditedDraft(e.target.value)}
rows={8}
className="rounded-lg border border-amber-300 bg-white px-3 py-2 text-sm resize-none focus:outline-none focus:ring-2 focus:ring-amber-400"
/>
) : (
<div className="rounded-lg bg-white border border-amber-200 p-3 text-sm text-slate-700 whitespace-pre-wrap leading-relaxed max-h-48 overflow-y-auto">
{pauseInfo.current_draft || (
<span className="italic text-slate-400">Kein Entwurf vorhanden</span>
)}
</div>
)}
</div>
{/* Action buttons */}
<div className="flex gap-2">
<button
onClick={() => onAction("approve")}
disabled={isResuming}
className="flex items-center gap-1.5 text-sm text-white bg-green-600 px-3 py-1.5 rounded-lg hover:bg-green-700 transition-colors disabled:opacity-50"
>
<Check size={14} />
Genehmigen
</button>
<button
onClick={handleModify}
disabled={isResuming}
className="flex items-center gap-1.5 text-sm text-white bg-blue-600 px-3 py-1.5 rounded-lg hover:bg-blue-700 transition-colors disabled:opacity-50"
>
<Pencil size={14} />
{editMode ? "Änderung senden" : "Ändern"}
</button>
<button
onClick={() => onAction("reject")}
disabled={isResuming}
className="flex items-center gap-1.5 text-sm text-white bg-red-500 px-3 py-1.5 rounded-lg hover:bg-red-600 transition-colors disabled:opacity-50"
>
<X size={14} />
Ablehnen
</button>
</div>
{isResuming && (
<p className="text-xs text-amber-600 animate-pulse">
Wird fortgesetzt
</p>
)}
</div>
);
}

View file

@ -5,14 +5,29 @@ import { WSMessage } from "@/app/types/council";
import { wsUrl } from "@/app/utils/api-client";
import { useCouncilStore } from "@/app/store/council-store";
export interface PauseInfo {
next_nodes: string[];
current_draft: string;
critic_score?: number;
iteration_count?: number;
}
interface Options {
run_id: string | null;
onComplete?: (result: string) => void;
onError?: (error: string) => void;
onPaused?: (info: PauseInfo) => void;
onResumed?: () => void;
}
// WebSocket hook for live agent status updates during a council run
export function useCouncilWebSocket({ run_id, onComplete, onError }: Options) {
export function useCouncilWebSocket({
run_id,
onComplete,
onError,
onPaused,
onResumed,
}: Options) {
const ws = useRef<WebSocket | null>(null);
const markNodeActive = useCouncilStore((s) => s.markNodeActive);
const clearActiveNode = useCouncilStore((s) => s.clearActiveNode);
@ -40,20 +55,29 @@ export function useCouncilWebSocket({ run_id, onComplete, onError }: Options) {
return;
}
switch (msg.type) {
case "node_enter":
if (msg.node_name) markNodeActive(msg.node_name);
switch (msg.event) {
case "node_active":
if (msg.node) markNodeActive(msg.node);
break;
case "node_exit":
case "run_paused":
clearActiveNode();
onPaused?.({
next_nodes: msg.next_nodes ?? [],
current_draft: msg.current_draft ?? "",
critic_score: msg.critic_score,
iteration_count: msg.iteration_count,
});
break;
case "run_resumed":
onResumed?.();
break;
case "run_complete":
clearActiveNode();
setActiveRun(null);
if (msg.result) onComplete?.(msg.result);
if (msg.final_draft) onComplete?.(msg.final_draft);
disconnect();
break;
case "run_error":
case "run_failed":
clearActiveNode();
setActiveRun(null);
if (msg.error) onError?.(msg.error);

View file

@ -1,12 +1,14 @@
"use client";
import { useState, useCallback } from "react";
import { useState, useCallback, useRef } from "react";
import { ReactFlowProvider } from "@xyflow/react";
import { Play, Square, Upload } from "lucide-react";
import { Play, Square, Upload, Shield, Zap } from "lucide-react";
import { ArchitectCanvas } from "@/app/components/ArchitectCanvas";
import { useCouncilWebSocket } from "@/app/hooks/useCouncilWebSocket";
import { GodModePanel } from "@/app/components/panels/GodModePanel";
import { useCouncilWebSocket, PauseInfo } from "@/app/hooks/useCouncilWebSocket";
import { useCouncilStore } from "@/app/store/council-store";
import { runApi } from "@/app/utils/api-client";
import { runApi, pdfApi } from "@/app/utils/api-client";
import { ExecutionMode, GodModeAction } from "@/app/types/council";
export default function KonferenzzimmerPage() {
const [topic, setTopic] = useState("");
@ -14,6 +16,10 @@ export default function KonferenzzimmerPage() {
const [result, setResult] = useState<string | null>(null);
const [error, setError] = useState<string | null>(null);
const [isRunning, setIsRunning] = useState(false);
const [executionMode, setExecutionMode] = useState<ExecutionMode>("auto-pilot");
const [pauseInfo, setPauseInfo] = useState<PauseInfo | null>(null);
const [isResuming, setIsResuming] = useState(false);
const fileInputRef = useRef<HTMLInputElement>(null);
const setActiveRun = useCouncilStore((s) => s.setActiveRun);
const clearActiveNode = useCouncilStore((s) => s.clearActiveNode);
@ -22,24 +28,38 @@ export default function KonferenzzimmerPage() {
setResult(res);
setIsRunning(false);
setRunId(null);
setPauseInfo(null);
}, []);
const onError = useCallback((err: string) => {
setError(err);
setIsRunning(false);
setRunId(null);
setPauseInfo(null);
}, []);
useCouncilWebSocket({ run_id: runId, onComplete, onError });
const onPaused = useCallback((info: PauseInfo) => {
setPauseInfo(info);
setIsResuming(false);
}, []);
const onResumed = useCallback(() => {
setPauseInfo(null);
setIsResuming(false);
}, []);
useCouncilWebSocket({ run_id: runId, onComplete, onError, onPaused, onResumed });
const handleStart = async () => {
if (!topic.trim()) return;
setResult(null);
setError(null);
setIsRunning(true);
setPauseInfo(null);
clearActiveNode();
try {
const run = await runApi.start(topic);
const godMode = executionMode === "god-mode";
const run = await runApi.start(topic, godMode);
setActiveRun(run);
setRunId(run.run_id);
} catch (e) {
@ -53,6 +73,44 @@ export default function KonferenzzimmerPage() {
setIsRunning(false);
clearActiveNode();
setActiveRun(null);
setPauseInfo(null);
};
const handleGodModeAction = async (action: GodModeAction, modifiedDraft?: string) => {
if (!runId) return;
setIsResuming(true);
try {
const modified_state = modifiedDraft ? { current_draft: modifiedDraft } : undefined;
await runApi.approve(runId, action, modified_state);
if (action === "reject") {
setError("Vom Benutzer im God Mode abgelehnt.");
setIsRunning(false);
setRunId(null);
setPauseInfo(null);
}
} catch (e) {
setError("Fehler bei God Mode Aktion: " + (e as Error).message);
setIsResuming(false);
}
};
const handlePdfUpload = async (event: React.ChangeEvent<HTMLInputElement>) => {
const file = event.target.files?.[0];
if (!file) return;
try {
const res = await pdfApi.upload(file);
setTopic((prev) =>
prev
? `${prev}\n\n[PDF hochgeladen: ${res.filename}${res.chunks_ingested} Abschnitte]`
: `[PDF hochgeladen: ${res.filename}${res.chunks_ingested} Abschnitte]`
);
} catch (e) {
setError("PDF-Upload fehlgeschlagen: " + (e as Error).message);
}
// Reset the input
if (fileInputRef.current) fileInputRef.current.value = "";
};
return (
@ -66,10 +124,51 @@ export default function KonferenzzimmerPage() {
rows={1}
className="flex-1 rounded-lg border border-slate-200 px-3 py-1.5 text-sm resize-none focus:outline-none focus:ring-2 focus:ring-indigo-300"
/>
<button className="flex items-center gap-1.5 text-sm text-slate-600 border border-slate-200 px-3 py-1.5 rounded-lg hover:bg-slate-50 transition-colors">
{/* PDF upload */}
<input
ref={fileInputRef}
type="file"
accept=".pdf"
onChange={handlePdfUpload}
className="hidden"
/>
<button
onClick={() => fileInputRef.current?.click()}
className="flex items-center gap-1.5 text-sm text-slate-600 border border-slate-200 px-3 py-1.5 rounded-lg hover:bg-slate-50 transition-colors"
>
<Upload size={14} />
PDF
</button>
{/* Execution mode toggle */}
<button
onClick={() =>
setExecutionMode((m) => (m === "auto-pilot" ? "god-mode" : "auto-pilot"))
}
disabled={isRunning}
className={[
"flex items-center gap-1.5 text-sm px-3 py-1.5 rounded-lg transition-colors border",
executionMode === "god-mode"
? "bg-amber-50 text-amber-700 border-amber-300 hover:bg-amber-100"
: "bg-slate-50 text-slate-600 border-slate-200 hover:bg-slate-100",
isRunning ? "opacity-50 cursor-not-allowed" : "",
].join(" ")}
title={
executionMode === "god-mode"
? "God Mode: Pause vor jedem Agenten"
: "Auto-Pilot: Automatischer Durchlauf"
}
>
{executionMode === "god-mode" ? (
<Shield size={14} />
) : (
<Zap size={14} />
)}
{executionMode === "god-mode" ? "God Mode" : "Auto-Pilot"}
</button>
{/* Start / Stop */}
{!isRunning ? (
<button
onClick={handleStart}
@ -95,11 +194,16 @@ export default function KonferenzzimmerPage() {
<ReactFlowProvider>
<div className="flex-1 h-full relative">
<ArchitectCanvas />
{isRunning && (
{isRunning && !pauseInfo && (
<div className="absolute top-3 left-1/2 -translate-x-1/2 bg-indigo-600 text-white text-xs px-4 py-1.5 rounded-full shadow-lg animate-pulse pointer-events-none">
Rat läuft
</div>
)}
{pauseInfo && (
<div className="absolute top-3 left-1/2 -translate-x-1/2 bg-amber-500 text-white text-xs px-4 py-1.5 rounded-full shadow-lg pointer-events-none">
Pausiert Freigabe erforderlich
</div>
)}
</div>
</ReactFlowProvider>
@ -108,7 +212,16 @@ export default function KonferenzzimmerPage() {
<div className="px-4 py-3 border-b border-slate-100">
<h2 className="text-sm font-semibold text-slate-700">Ergebnis</h2>
</div>
<div className="flex-1 overflow-y-auto p-4">
<div className="flex-1 overflow-y-auto p-4 space-y-4">
{/* God Mode approval panel */}
{pauseInfo && (
<GodModePanel
pauseInfo={pauseInfo}
onAction={handleGodModeAction}
isResuming={isResuming}
/>
)}
{error && (
<div className="rounded-lg bg-red-50 border border-red-200 p-3 text-sm text-red-700">
{error}
@ -124,7 +237,7 @@ export default function KonferenzzimmerPage() {
Noch kein Ergebnis. Starte den Rat mit einem Thema.
</p>
)}
{isRunning && !result && (
{isRunning && !result && !pauseInfo && (
<div className="space-y-2">
{[1, 2, 3].map((i) => (
<div

View file

@ -5,6 +5,7 @@ import { Save, Download } from "lucide-react";
import { ArchitectCanvas } from "@/app/components/ArchitectCanvas";
import { NodeSidebar } from "@/app/components/panels/NodeSidebar";
import { NodeSettingsPanel } from "@/app/components/panels/NodeSettingsPanel";
import { EdgeSettingsPanel } from "@/app/components/panels/EdgeSettingsPanel";
import { useCouncilStore } from "@/app/store/council-store";
import { parseGraphToBlueprint } from "@/app/utils/blueprint-parser";
import { councilApi } from "@/app/utils/api-client";
@ -14,6 +15,8 @@ export default function RatArchitektPage() {
const edges = useCouncilStore((s) => s.edges);
const councilName = useCouncilStore((s) => s.councilName);
const setCouncilName = useCouncilStore((s) => s.setCouncilName);
const selectedNodeId = useCouncilStore((s) => s.selectedNodeId);
const selectedEdgeId = useCouncilStore((s) => s.selectedEdgeId);
const handleSave = async () => {
const blueprint = parseGraphToBlueprint(nodes, edges, councilName);
@ -73,7 +76,8 @@ export default function RatArchitektPage() {
<ReactFlowProvider>
<NodeSidebar />
<ArchitectCanvas />
<NodeSettingsPanel />
{selectedNodeId && <NodeSettingsPanel />}
{selectedEdgeId && <EdgeSettingsPanel />}
</ReactFlowProvider>
</div>
</div>

View file

@ -1,7 +1,7 @@
// Zustand store for canvas state and council run state
import { create } from "zustand";
import { Node, Edge, addEdge, applyNodeChanges, applyEdgeChanges, NodeChange, EdgeChange, Connection } from "@xyflow/react";
import { AgentNodeData, CouncilRun, LLMModel } from "@/app/types/council";
import { AgentNodeData, CouncilRun, EdgeType } from "@/app/types/council";
import { nanoid } from "nanoid";
interface CouncilStore {
@ -9,6 +9,7 @@ interface CouncilStore {
nodes: Node<AgentNodeData>[];
edges: Edge[];
selectedNodeId: string | null;
selectedEdgeId: string | null;
councilName: string;
// Execution
@ -22,6 +23,8 @@ interface CouncilStore {
addAgentNode: (position: { x: number; y: number }) => void;
updateNodeData: (nodeId: string, data: Partial<AgentNodeData>) => void;
selectNode: (nodeId: string | null) => void;
selectEdge: (edgeId: string | null) => void;
updateEdgeData: (edgeId: string, type: EdgeType, condition?: string) => void;
setCouncilName: (name: string) => void;
setNodes: (nodes: Node<AgentNodeData>[]) => void;
setEdges: (edges: Edge[]) => void;
@ -47,6 +50,7 @@ export const useCouncilStore = create<CouncilStore>((set, get) => ({
nodes: [],
edges: [],
selectedNodeId: null,
selectedEdgeId: null,
councilName: "Mein Rat",
activeRun: null,
activeNodeId: null,
@ -88,7 +92,24 @@ export const useCouncilStore = create<CouncilStore>((set, get) => ({
),
})),
selectNode: (nodeId) => set({ selectedNodeId: nodeId }),
selectNode: (nodeId) => set({ selectedNodeId: nodeId, selectedEdgeId: null }),
selectEdge: (edgeId) => set({ selectedEdgeId: edgeId, selectedNodeId: null }),
updateEdgeData: (edgeId, type, condition) =>
set((state) => ({
edges: state.edges.map((e) =>
e.id === edgeId
? {
...e,
type: type === "conditional" ? "conditionalEdge" : "default",
data: { ...e.data, type, condition: condition ?? "" },
label: type === "conditional" ? (condition || "?") : undefined,
animated: type === "conditional",
}
: e
),
})),
setCouncilName: (name) => set({ councilName: name }),
@ -122,4 +143,3 @@ export const useCouncilStore = create<CouncilStore>((set, get) => ({
})),
})),
}));

View file

@ -51,7 +51,9 @@ export interface CouncilBlueprint {
}
// Council run (execution)
export type RunStatus = "pending" | "running" | "completed" | "failed";
export type RunStatus = "pending" | "running" | "completed" | "failed" | "paused";
export type ExecutionMode = "auto-pilot" | "god-mode";
export interface CouncilRun {
run_id: string;
@ -61,13 +63,43 @@ export interface CouncilRun {
error?: string;
}
// God Mode state from the backend
export interface GodModeState {
run_id: string;
paused: boolean;
next_nodes: string[];
current_state: {
current_draft?: string;
critic_score?: number;
iteration_count?: number;
feedback_history?: string[];
};
}
export type GodModeAction = "approve" | "reject" | "modify";
// WebSocket messages from backend
export type WSMessageType = "node_enter" | "node_exit" | "run_complete" | "run_error";
export type WSEventType =
| "connected"
| "node_active"
| "run_paused"
| "run_resumed"
| "run_complete"
| "run_failed";
export interface WSMessage {
type: WSMessageType;
node_id?: string;
node_name?: string;
result?: string;
event: WSEventType;
run_id: string;
// node_active
node?: string;
iteration?: number;
// run_paused
next_nodes?: string[];
current_draft?: string;
critic_score?: number;
iteration_count?: number;
// run_complete
final_draft?: string;
// run_failed
error?: string;
}

View file

@ -1,5 +1,5 @@
// API client for the FastAPI backend
import { CouncilBlueprint, CouncilRun } from "@/app/types/council";
import { CouncilBlueprint, CouncilRun, GodModeAction, GodModeState } from "@/app/types/council";
const BASE_URL = process.env.NEXT_PUBLIC_API_URL ?? "http://localhost:8000";
@ -39,14 +39,48 @@ export const councilApi = {
// Council run (execution)
export const runApi = {
start: (input_topic: string) =>
request<CouncilRun>("/api/run", {
start: (input_topic: string, god_mode: boolean = false) =>
request<CouncilRun>("/api/councils/run", {
method: "POST",
body: JSON.stringify({ input_topic }),
body: JSON.stringify({ input_topic, god_mode }),
}),
startFromBlueprint: (blueprintId: string, input_topic: string, god_mode: boolean = false) =>
request<CouncilRun>(`/api/councils/${blueprintId}/run`, {
method: "POST",
body: JSON.stringify({ input_topic, god_mode }),
}),
status: (run_id: string) =>
request<CouncilRun>(`/api/run/${run_id}`),
request<CouncilRun>(`/api/councils/run/${run_id}`),
// God Mode: approve/reject/modify a paused run
approve: (run_id: string, action: GodModeAction, modified_state?: Record<string, unknown>) =>
request<CouncilRun>(`/api/councils/run/${run_id}/approve`, {
method: "POST",
body: JSON.stringify({ action, modified_state }),
}),
// God Mode: get the paused state
getState: (run_id: string) =>
request<GodModeState>(`/api/councils/run/${run_id}/state`),
};
// PDF upload
export const pdfApi = {
upload: async (file: File) => {
const formData = new FormData();
formData.append("file", file);
const res = await fetch(`${BASE_URL}/api/councils/upload-pdf`, {
method: "POST",
body: formData,
});
if (!res.ok) {
const text = await res.text();
throw new Error(`Upload error ${res.status}: ${text}`);
}
return res.json() as Promise<{ filename: string; chunks_ingested: number; message: string }>;
},
};
// WebSocket URL helper

View file

@ -1,12 +1,14 @@
{
"name": "frontend",
"version": "0.1.0",
"version": "0.2.0",
"private": true,
"scripts": {
"dev": "next dev",
"build": "next build",
"start": "next start",
"lint": "eslint"
"lint": "eslint",
"test": "vitest run",
"test:watch": "vitest"
},
"dependencies": {
"@xyflow/react": "^12.10.1",
@ -19,12 +21,16 @@
},
"devDependencies": {
"@tailwindcss/postcss": "^4",
"@testing-library/jest-dom": "^6.0.0",
"@testing-library/react": "^16.0.0",
"@types/node": "^20",
"@types/react": "^19",
"@types/react-dom": "^19",
"eslint": "^9",
"eslint-config-next": "16.1.6",
"jsdom": "^25.0.0",
"tailwindcss": "^4",
"typescript": "^5"
"typescript": "^5",
"vitest": "^3.0.0"
}
}

15
frontend/vitest.config.ts Normal file
View file

@ -0,0 +1,15 @@
import { defineConfig } from "vitest/config";
import path from "path";
export default defineConfig({
test: {
environment: "jsdom",
globals: true,
setupFiles: [],
},
resolve: {
alias: {
"@": path.resolve(__dirname, "."),
},
},
});