Lab 07: Multi-Agent Pipeline Design
Advanced
Due: 2026-04-21
schemas/planner_output.json
schemas/coder_output.json
2.
Section titled “2. base_agent.py — Common Agent Base Class”
base_agent.py
3.
Section titled “3. planner_agent.py — Planner Agent”
planner_agent.py
4.
Section titled “4. coder_agent.py — Coder Agent”
coder_agent.py
pipeline_design.md
Objectives
Section titled “Objectives”- Apply multi-agent system design principles
- Design a 5-stage pipeline architecture (Planner → Researcher → Coder → QA → Reviewer)
- Define JSON schemas for inter-agent communication
- Implement the Planner→Coder 2-stage pipeline in practice
Multi-Agent Pipeline Architecture
Section titled “Multi-Agent Pipeline Architecture”A single agent shows limitations on complex tasks. By separating roles so that each agent has a narrow, well-defined responsibility, quality and traceability improve significantly.
MULTI-AGENT PIPELINE
User Request
↓
Plannergenerates spec.md
↓
Researchergenerates context.json
↓
Codercode changes
↓
QAtest results
↓
Reviewerfinal approve/reject
Implementation Requirements
Section titled “Implementation Requirements”1. Inter-Agent Communication JSON Schemas
Section titled “1. Inter-Agent Communication JSON Schemas”All agents exchange data using a defined schema.
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "PlannerOutput", "type": "object", "required": ["task_id", "objective", "subtasks", "constraints"], "properties": { "task_id": { "type": "string", "pattern": "^task-[0-9]{4}$" }, "objective": { "type": "string", "minLength": 10 }, "subtasks": { "type": "array", "minItems": 1, "items": { "type": "object", "required": ["id", "description", "assignee", "depends_on"], "properties": { "id": { "type": "string" }, "description": { "type": "string" }, "assignee": { "type": "string", "enum": ["researcher", "coder", "qa", "reviewer"] }, "depends_on": { "type": "array", "items": { "type": "string" } } } } }, "constraints": { "type": "object", "properties": { "max_iterations": { "type": "integer", "minimum": 1, "maximum": 20 }, "forbidden_packages": { "type": "array", "items": { "type": "string" } }, "target_files": { "type": "array", "items": { "type": "string" } } } } }}{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "CoderOutput", "type": "object", "required": ["task_id", "changes", "test_command", "status"], "properties": { "task_id": { "type": "string" }, "changes": { "type": "array", "items": { "type": "object", "required": ["file", "action", "description"], "properties": { "file": { "type": "string" }, "action": { "enum": ["create", "modify", "delete"] }, "description": { "type": "string" } } } }, "test_command": { "type": "string" }, "status": { "enum": ["complete", "partial", "blocked"] }, "blocker": { "type": "string" } }}2. base_agent.py — Common Agent Base Class
Section titled “2. base_agent.py — Common Agent Base Class”import anthropicimport jsonfrom abc import ABC, abstractmethodfrom pathlib import Pathfrom jsonschema import validate, ValidationError
class BaseAgent(ABC): """Common base class for all pipeline agents."""
def __init__(self, name: str, schema_path: str | None = None): self.name = name self.client = anthropic.Anthropic() self.schema = self._load_schema(schema_path) if schema_path else None self.messages: list[dict] = []
def _load_schema(self, path: str) -> dict: return json.loads(Path(path).read_text())
def _validate_output(self, data: dict) -> bool: if self.schema is None: return True try: validate(instance=data, schema=self.schema) return True except ValidationError as e: print(f"[{self.name}] Schema validation failed: {e.message}") return False
def _call(self, system: str, user: str) -> str: self.messages.append({"role": "user", "content": user}) response = self.client.messages.create( model="claude-sonnet-4-6", max_tokens=4096, system=system, messages=self.messages, ) text = response.content[0].text self.messages.append({"role": "assistant", "content": text}) return text
def _extract_json(self, text: str) -> dict: """Extracts a JSON block from the response text.""" import re match = re.search(r"```json\s*([\s\S]+?)\s*```", text) if match: return json.loads(match.group(1)) # Pure JSON without a code block return json.loads(text.strip())
@abstractmethod def run(self, input_data: dict) -> dict: pass3. planner_agent.py — Planner Agent
Section titled “3. planner_agent.py — Planner Agent”from base_agent import BaseAgentimport uuid
PLANNER_SYSTEM = """You are a software planning agent. Your job is to decompose a user requestinto a structured plan that other agents can execute.
Output ONLY a valid JSON object matching the PlannerOutput schema.Do not include any explanation outside the JSON block."""
class PlannerAgent(BaseAgent): def __init__(self): super().__init__( name="Planner", schema_path="schemas/planner_output.json" )
def run(self, input_data: dict) -> dict: objective = input_data["objective"] codebase_summary = input_data.get("codebase_summary", "")
user_prompt = f"""Objective: {objective}
Codebase context:{codebase_summary}
Create a task plan. Use task_id format "task-XXXX".Assign subtasks to: researcher, coder, qa, reviewer.""" response_text = self._call(PLANNER_SYSTEM, user_prompt)
try: plan = self._extract_json(response_text) except Exception as e: print(f"[Planner] JSON parsing failed: {e}") # Fallback plan plan = { "task_id": f"task-{uuid.uuid4().hex[:4]}", "objective": objective, "subtasks": [ { "id": "st-01", "description": objective, "assignee": "coder", "depends_on": [] } ], "constraints": {"max_iterations": 5, "forbidden_packages": [], "target_files": []} }
if self._validate_output(plan): print(f"[Planner] Plan created: {len(plan['subtasks'])} subtasks") return plan4. coder_agent.py — Coder Agent
Section titled “4. coder_agent.py — Coder Agent”import osimport subprocessfrom base_agent import BaseAgent
# Command mapping by AI coding CLI toolTOOL_COMMANDS = { "claude": ["claude", "--print", "--no-color", "--dangerously-skip-permissions"], "gemini": ["gemini"], # use pipe mode "codex": ["codex", "--approval-mode", "full-auto"],}
CODER_SYSTEM = """You are a coding agent. You receive a task plan and implement the code changes.After making changes, run the specified test command to verify.
Output a JSON object matching the CoderOutput schema."""
class CoderAgent(BaseAgent): def __init__(self): super().__init__( name="Coder", schema_path="schemas/coder_output.json" ) self.ai_cli = os.environ.get("AI_CLI", "claude")
def run(self, input_data: dict) -> dict: plan = input_data["plan"] coder_tasks = [ t for t in plan["subtasks"] if t["assignee"] == "coder" ]
user_prompt = f"""Task ID: {plan['task_id']}Objective: {plan['objective']}
Your subtasks:{chr(10).join(f"- {t['description']}" for t in coder_tasks)}
Constraints:- Max iterations: {plan['constraints'].get('max_iterations', 5)}- Forbidden packages: {plan['constraints'].get('forbidden_packages', [])}
Implement the changes and report what you did.""" response_text = self._call(CODER_SYSTEM, user_prompt)
# Run AI coding CLI in headless mode to perform actual code changes cmd = TOOL_COMMANDS.get(self.ai_cli, TOOL_COMMANDS["claude"]) result = subprocess.run( cmd + [response_text], capture_output=True, text=True, timeout=300 )
return { "task_id": plan["task_id"], "changes": [], # In practice, parsed from git diff "test_command": "pytest tests/ -q", "status": "complete" if result.returncode == 0 else "partial", "agent_output": result.stdout[:500] }- Create the two schema files in the
schemas/directory - Implement
base_agent.py(pip install jsonschema) - Implement and test
planner_agent.pyin isolation - Implement
coder_agent.py - Write and run
pipeline.pyto connect the two agents
5-Stage Pipeline Design Document
Section titled “5-Stage Pipeline Design Document”Based on the 2 stages you implemented, document the design for the remaining 3 stages.
## Researcher (not yet implemented)- Input: PlannerOutput- Role: Explore the codebase, list relevant files, identify dependencies- Output: context.json (relevant files + summary of key functions)
## QA (implemented in Lab 09)- Input: CoderOutput- Role: Run tests, measure code coverage, detect regressions- Output: qa_report.json
## Reviewer (not yet implemented)- Input: QA report + CoderOutput- Role: Review code quality, make approve/reject decision- Output: review_decision.jsonDeliverables
Section titled “Deliverables”Submit a PR to assignments/lab-07/[student-id]/:
-
schemas/planner_output.json,schemas/coder_output.json -
base_agent.py— Common base class -
planner_agent.py— Fully functional planner -
coder_agent.py— Fully functional coder -
pipeline.py— Script connecting and running Planner→Coder -
pipeline_design.md— Full 5-stage design document -
README.md— Execution results and anticipated challenges when extending the pipeline