|
""" |
|
Orchestrator: The "life coach" that routes between agents. |
|
|
|
This is intentionally thin — it only handles: |
|
1. Reading shared memory (JSON files) |
|
2. Deciding which phase to run next |
|
3. Dispatching to the appropriate agent |
|
4. Writing results back to shared memory |
|
|
|
No domain logic lives here. Each agent is self-contained. |
|
""" |
|
|
|
import json |
|
import subprocess |
|
from pathlib import Path |
|
from datetime import datetime, timedelta |
|
from enum import Enum |
|
from typing import Any |
|
|
|
# In production, replace with your actual API client |
|
# from anthropic import Anthropic |
|
|
|
|
|
class Phase(Enum): |
|
CONTEXT = "context" |
|
PLAN = "plan" |
|
IMPLEMENT = "implement" |
|
REVIEW = "review" |
|
DONE = "done" |
|
|
|
|
|
class ModelTier(Enum): |
|
FAST = "claude-3-5-haiku-20241022" # Context gathering, implementation |
|
BALANCED = "claude-sonnet-4-20250514" # Implementation with more reasoning |
|
POWERFUL = "claude-opus-4-20250514" # Planning, review |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# Configuration |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
MEMORY_DIR = Path(".agent_memory") |
|
CONTEXT_FILE = MEMORY_DIR / "context.json" |
|
PLAN_FILE = MEMORY_DIR / "plan.json" |
|
IMPL_FILE = MEMORY_DIR / "impl_report.json" |
|
REVIEW_FILE = MEMORY_DIR / "review.json" |
|
|
|
CONTEXT_STALENESS_HOURS = 4 # Re-gather context if older than this |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# Memory Management |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
def ensure_memory_dir(): |
|
MEMORY_DIR.mkdir(exist_ok=True) |
|
|
|
|
|
def read_memory(file: Path) -> dict | None: |
|
if not file.exists(): |
|
return None |
|
return json.loads(file.read_text()) |
|
|
|
|
|
def write_memory(file: Path, data: dict): |
|
ensure_memory_dir() |
|
file.write_text(json.dumps(data, indent=2, default=str)) |
|
|
|
|
|
def is_stale(file: Path, hours: int = CONTEXT_STALENESS_HOURS) -> bool: |
|
if not file.exists(): |
|
return True |
|
data = read_memory(file) |
|
if not data or "timestamp" not in data: |
|
return True |
|
ts = datetime.fromisoformat(data["timestamp"]) |
|
return datetime.now() - ts > timedelta(hours=hours) |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# Agent Dispatch |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
def call_agent( |
|
model: ModelTier, |
|
system_prompt: str, |
|
user_message: str, |
|
tools: list[dict] | None = None |
|
) -> str: |
|
""" |
|
Call an AI agent with the specified model. |
|
|
|
In production, this would use the Anthropic API. |
|
For now, this is a stub that shows the structure. |
|
""" |
|
print(f"[Orchestrator] Dispatching to {model.value}") |
|
print(f"[Orchestrator] System prompt length: {len(system_prompt)} chars") |
|
print(f"[Orchestrator] User message length: {len(user_message)} chars") |
|
|
|
# Pseudocode for actual API call: |
|
# client = Anthropic() |
|
# response = client.messages.create( |
|
# model=model.value, |
|
# max_tokens=8192, |
|
# system=system_prompt, |
|
# messages=[{"role": "user", "content": user_message}], |
|
# tools=tools or [] |
|
# ) |
|
# return response.content[0].text |
|
|
|
return "{}" # Stub |
|
|
|
|
|
def load_prompt(phase: Phase) -> str: |
|
"""Load the system prompt for a phase.""" |
|
prompt_file = Path(__file__).parent / "prompts" / f"{phase.value}.txt" |
|
if prompt_file.exists(): |
|
return prompt_file.read_text() |
|
return f"You are the {phase.value} agent." |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# Phase Handlers |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
def run_context_phase(task: str) -> dict: |
|
""" |
|
Phase 1: Gather context using a fast model. |
|
|
|
The context agent explores the codebase and produces a structured |
|
summary that subsequent agents can consume without re-exploring. |
|
""" |
|
system_prompt = load_prompt(Phase.CONTEXT) |
|
|
|
user_message = f""" |
|
Task: {task} |
|
|
|
Explore the codebase and produce a context.json with: |
|
- Relevant files and their summaries |
|
- Dependencies |
|
- Existing patterns/conventions |
|
- Technical constraints |
|
- Scope boundaries |
|
|
|
Output ONLY valid JSON matching the ContextOutput schema. |
|
""" |
|
|
|
result = call_agent( |
|
model=ModelTier.FAST, |
|
system_prompt=system_prompt, |
|
user_message=user_message |
|
) |
|
|
|
data = json.loads(result) |
|
write_memory(CONTEXT_FILE, data) |
|
return data |
|
|
|
|
|
def run_plan_phase() -> dict: |
|
""" |
|
Phase 2: Create a detailed plan using a powerful model. |
|
|
|
The planner reads the context and produces an actionable plan |
|
with clear task boundaries for parallel execution. |
|
""" |
|
context = read_memory(CONTEXT_FILE) |
|
if not context: |
|
raise RuntimeError("No context found — run context phase first") |
|
|
|
system_prompt = load_prompt(Phase.PLAN) |
|
|
|
user_message = f""" |
|
Context: |
|
{json.dumps(context, indent=2)} |
|
|
|
Create a detailed implementation plan with: |
|
- Ordered tasks with clear boundaries |
|
- Acceptance criteria for each task |
|
- Parallel execution groups (tasks that can run simultaneously) |
|
- Risk assessment |
|
|
|
Output ONLY valid JSON matching the PlanOutput schema. |
|
""" |
|
|
|
result = call_agent( |
|
model=ModelTier.POWERFUL, |
|
system_prompt=system_prompt, |
|
user_message=user_message |
|
) |
|
|
|
data = json.loads(result) |
|
write_memory(PLAN_FILE, data) |
|
|
|
# Create a git checkpoint |
|
git_checkpoint("plan-approved") |
|
|
|
return data |
|
|
|
|
|
def run_implement_phase() -> list[dict]: |
|
""" |
|
Phase 3: Execute the plan using fast models in parallel. |
|
|
|
Each parallel group runs simultaneously. Within a group, |
|
agents have non-overlapping file scopes to avoid conflicts. |
|
""" |
|
plan = read_memory(PLAN_FILE) |
|
if not plan: |
|
raise RuntimeError("No plan found — run plan phase first") |
|
|
|
context = read_memory(CONTEXT_FILE) |
|
system_prompt = load_prompt(Phase.IMPLEMENT) |
|
|
|
all_results = [] |
|
|
|
# Process each parallel group |
|
for group_idx, task_ids in enumerate(plan.get("parallel_groups", [[]])): |
|
print(f"[Orchestrator] Running parallel group {group_idx + 1}: {task_ids}") |
|
|
|
# In production, these would run in parallel (asyncio, threads, etc.) |
|
group_results = [] |
|
for task_id in task_ids: |
|
task = next(t for t in plan["tasks"] if t["id"] == task_id) |
|
|
|
user_message = f""" |
|
Context: |
|
{json.dumps(context, indent=2)} |
|
|
|
Your assigned task: |
|
{json.dumps(task, indent=2)} |
|
|
|
Implement this task. You may ONLY modify these files: |
|
{task.get("files_to_modify", [])} |
|
|
|
You may create these new files: |
|
{task.get("files_to_create", [])} |
|
|
|
Output ONLY valid JSON matching the TaskResult schema. |
|
""" |
|
|
|
result = call_agent( |
|
model=ModelTier.FAST, |
|
system_prompt=system_prompt, |
|
user_message=user_message |
|
) |
|
|
|
group_results.append(json.loads(result)) |
|
|
|
all_results.extend(group_results) |
|
|
|
# Aggregate results |
|
impl_report = { |
|
"timestamp": datetime.now().isoformat(), |
|
"results": all_results, |
|
"git_commit_sha": git_checkpoint("implementation-complete") |
|
} |
|
write_memory(IMPL_FILE, impl_report) |
|
|
|
return all_results |
|
|
|
|
|
def run_review_phase() -> dict: |
|
""" |
|
Phase 4: Review all changes using a powerful model. |
|
|
|
The reviewer checks for: |
|
- Correctness and edge cases |
|
- Plan adherence (did we drift?) |
|
- Code quality issues |
|
""" |
|
plan = read_memory(PLAN_FILE) |
|
impl_report = read_memory(IMPL_FILE) |
|
context = read_memory(CONTEXT_FILE) |
|
|
|
if not all([plan, impl_report, context]): |
|
raise RuntimeError("Missing prior phase outputs") |
|
|
|
system_prompt = load_prompt(Phase.REVIEW) |
|
|
|
user_message = f""" |
|
Original Context: |
|
{json.dumps(context, indent=2)} |
|
|
|
Approved Plan: |
|
{json.dumps(plan, indent=2)} |
|
|
|
Implementation Report: |
|
{json.dumps(impl_report, indent=2)} |
|
|
|
Review all changes and check: |
|
1. Do the changes satisfy the acceptance criteria? |
|
2. Are there any bugs, edge cases, or security issues? |
|
3. Did the implementation drift from the plan? |
|
4. Is the code quality acceptable? |
|
|
|
Output ONLY valid JSON matching the ReviewOutput schema. |
|
""" |
|
|
|
result = call_agent( |
|
model=ModelTier.POWERFUL, |
|
system_prompt=system_prompt, |
|
user_message=user_message |
|
) |
|
|
|
data = json.loads(result) |
|
write_memory(REVIEW_FILE, data) |
|
|
|
return data |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# Git Checkpoints |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
def git_checkpoint(name: str) -> str | None: |
|
"""Create a git commit as a rollback checkpoint.""" |
|
try: |
|
subprocess.run(["git", "add", "-A"], check=True, capture_output=True) |
|
subprocess.run( |
|
["git", "commit", "-m", f"[agent-checkpoint] {name}"], |
|
check=True, |
|
capture_output=True |
|
) |
|
result = subprocess.run( |
|
["git", "rev-parse", "HEAD"], |
|
check=True, |
|
capture_output=True, |
|
text=True |
|
) |
|
return result.stdout.strip() |
|
except subprocess.CalledProcessError: |
|
return None |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# Main Orchestration Loop |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
def determine_next_phase() -> Phase: |
|
"""Decide which phase to run based on current state.""" |
|
|
|
# No context or stale? Start fresh. |
|
if is_stale(CONTEXT_FILE): |
|
return Phase.CONTEXT |
|
|
|
# No plan yet? |
|
if not PLAN_FILE.exists(): |
|
return Phase.PLAN |
|
|
|
# No implementation yet? |
|
if not IMPL_FILE.exists(): |
|
return Phase.IMPLEMENT |
|
|
|
# No review yet? |
|
if not REVIEW_FILE.exists(): |
|
return Phase.REVIEW |
|
|
|
# Check review verdict |
|
review = read_memory(REVIEW_FILE) |
|
if review: |
|
action = review.get("recommended_action") |
|
if action == "merge": |
|
return Phase.DONE |
|
elif action == "fix_and_re_review": |
|
# Clear impl and review, re-run from implementation |
|
IMPL_FILE.unlink(missing_ok=True) |
|
REVIEW_FILE.unlink(missing_ok=True) |
|
return Phase.IMPLEMENT |
|
elif action == "re_plan": |
|
# Clear everything after context |
|
PLAN_FILE.unlink(missing_ok=True) |
|
IMPL_FILE.unlink(missing_ok=True) |
|
REVIEW_FILE.unlink(missing_ok=True) |
|
return Phase.PLAN |
|
|
|
return Phase.DONE |
|
|
|
|
|
def run(task: str | None = None): |
|
"""Main entry point for the orchestrator.""" |
|
|
|
ensure_memory_dir() |
|
|
|
while True: |
|
phase = determine_next_phase() |
|
print(f"\n{'='*60}") |
|
print(f"[Orchestrator] Current phase: {phase.value}") |
|
print(f"{'='*60}\n") |
|
|
|
if phase == Phase.DONE: |
|
print("[Orchestrator] Workflow complete!") |
|
break |
|
|
|
if phase == Phase.CONTEXT: |
|
if not task: |
|
raise ValueError("Task required for context phase") |
|
run_context_phase(task) |
|
|
|
elif phase == Phase.PLAN: |
|
run_plan_phase() |
|
# Optional: pause here for human approval |
|
# input("Press Enter to approve the plan and continue...") |
|
|
|
elif phase == Phase.IMPLEMENT: |
|
run_implement_phase() |
|
|
|
elif phase == Phase.REVIEW: |
|
run_review_phase() |
|
|
|
|
|
# ───────────────────────────────────────────────────────────────── |
|
# CLI |
|
# ───────────────────────────────────────────────────────────────── |
|
|
|
if __name__ == "__main__": |
|
import argparse |
|
|
|
parser = argparse.ArgumentParser(description="Multi-agent orchestrator") |
|
parser.add_argument("--task", help="Task description for the agents") |
|
parser.add_argument( |
|
"--phase", |
|
choices=["context", "plan", "implement", "review", "auto"], |
|
default="auto", |
|
help="Run a specific phase or auto-detect" |
|
) |
|
parser.add_argument( |
|
"--reset", |
|
action="store_true", |
|
help="Clear all memory and start fresh" |
|
) |
|
|
|
args = parser.parse_args() |
|
|
|
if args.reset: |
|
import shutil |
|
shutil.rmtree(MEMORY_DIR, ignore_errors=True) |
|
print("[Orchestrator] Memory cleared") |
|
|
|
if args.phase == "auto": |
|
run(args.task) |
|
else: |
|
phase = Phase(args.phase) |
|
if phase == Phase.CONTEXT: |
|
run_context_phase(args.task or "") |
|
elif phase == Phase.PLAN: |
|
run_plan_phase() |
|
elif phase == Phase.IMPLEMENT: |
|
run_implement_phase() |
|
elif phase == Phase.REVIEW: |
|
run_review_phase() |