Skip to content

Lab 07: Multi-Agent Pipeline Design

Advanced Due: 2026-04-21
  • Apply multi-agent system design principles
  • Design a 5-stage pipeline architecture (Planner → Researcher → Coder → QA → Reviewer)
  • Define JSON schemas for inter-agent communication
  • Implement the Planner→Coder 2-stage pipeline in practice

A single agent shows limitations on complex tasks. By separating roles so that each agent has a narrow, well-defined responsibility, quality and traceability improve significantly.

MULTI-AGENT PIPELINE
User Request
Plannergenerates spec.md
Researchergenerates context.json
Codercode changes
QAtest results
Reviewerfinal approve/reject

All agents exchange data using a defined schema.

schemas/planner_output.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "PlannerOutput",
"type": "object",
"required": ["task_id", "objective", "subtasks", "constraints"],
"properties": {
"task_id": { "type": "string", "pattern": "^task-[0-9]{4}$" },
"objective": { "type": "string", "minLength": 10 },
"subtasks": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["id", "description", "assignee", "depends_on"],
"properties": {
"id": { "type": "string" },
"description": { "type": "string" },
"assignee": {
"type": "string",
"enum": ["researcher", "coder", "qa", "reviewer"]
},
"depends_on": {
"type": "array",
"items": { "type": "string" }
}
}
}
},
"constraints": {
"type": "object",
"properties": {
"max_iterations": { "type": "integer", "minimum": 1, "maximum": 20 },
"forbidden_packages": { "type": "array", "items": { "type": "string" } },
"target_files": { "type": "array", "items": { "type": "string" } }
}
}
}
}
schemas/coder_output.json
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "CoderOutput",
"type": "object",
"required": ["task_id", "changes", "test_command", "status"],
"properties": {
"task_id": { "type": "string" },
"changes": {
"type": "array",
"items": {
"type": "object",
"required": ["file", "action", "description"],
"properties": {
"file": { "type": "string" },
"action": { "enum": ["create", "modify", "delete"] },
"description": { "type": "string" }
}
}
},
"test_command": { "type": "string" },
"status": { "enum": ["complete", "partial", "blocked"] },
"blocker": { "type": "string" }
}
}

2. base_agent.py — Common Agent Base Class

Section titled “2. base_agent.py — Common Agent Base Class”
base_agent.py
import anthropic
import json
from abc import ABC, abstractmethod
from pathlib import Path
from jsonschema import validate, ValidationError
class BaseAgent(ABC):
"""Common base class for all pipeline agents."""
def __init__(self, name: str, schema_path: str | None = None):
self.name = name
self.client = anthropic.Anthropic()
self.schema = self._load_schema(schema_path) if schema_path else None
self.messages: list[dict] = []
def _load_schema(self, path: str) -> dict:
return json.loads(Path(path).read_text())
def _validate_output(self, data: dict) -> bool:
if self.schema is None:
return True
try:
validate(instance=data, schema=self.schema)
return True
except ValidationError as e:
print(f"[{self.name}] Schema validation failed: {e.message}")
return False
def _call(self, system: str, user: str) -> str:
self.messages.append({"role": "user", "content": user})
response = self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=4096,
system=system,
messages=self.messages,
)
text = response.content[0].text
self.messages.append({"role": "assistant", "content": text})
return text
def _extract_json(self, text: str) -> dict:
"""Extracts a JSON block from the response text."""
import re
match = re.search(r"```json\s*([\s\S]+?)\s*```", text)
if match:
return json.loads(match.group(1))
# Pure JSON without a code block
return json.loads(text.strip())
@abstractmethod
def run(self, input_data: dict) -> dict:
pass
planner_agent.py
from base_agent import BaseAgent
import uuid
PLANNER_SYSTEM = """
You are a software planning agent. Your job is to decompose a user request
into a structured plan that other agents can execute.
Output ONLY a valid JSON object matching the PlannerOutput schema.
Do not include any explanation outside the JSON block.
"""
class PlannerAgent(BaseAgent):
def __init__(self):
super().__init__(
name="Planner",
schema_path="schemas/planner_output.json"
)
def run(self, input_data: dict) -> dict:
objective = input_data["objective"]
codebase_summary = input_data.get("codebase_summary", "")
user_prompt = f"""
Objective: {objective}
Codebase context:
{codebase_summary}
Create a task plan. Use task_id format "task-XXXX".
Assign subtasks to: researcher, coder, qa, reviewer.
"""
response_text = self._call(PLANNER_SYSTEM, user_prompt)
try:
plan = self._extract_json(response_text)
except Exception as e:
print(f"[Planner] JSON parsing failed: {e}")
# Fallback plan
plan = {
"task_id": f"task-{uuid.uuid4().hex[:4]}",
"objective": objective,
"subtasks": [
{
"id": "st-01",
"description": objective,
"assignee": "coder",
"depends_on": []
}
],
"constraints": {"max_iterations": 5, "forbidden_packages": [], "target_files": []}
}
if self._validate_output(plan):
print(f"[Planner] Plan created: {len(plan['subtasks'])} subtasks")
return plan
coder_agent.py
import os
import subprocess
from base_agent import BaseAgent
# Command mapping by AI coding CLI tool
TOOL_COMMANDS = {
"claude": ["claude", "--print", "--no-color", "--dangerously-skip-permissions"],
"gemini": ["gemini"], # use pipe mode
"codex": ["codex", "--approval-mode", "full-auto"],
}
CODER_SYSTEM = """
You are a coding agent. You receive a task plan and implement the code changes.
After making changes, run the specified test command to verify.
Output a JSON object matching the CoderOutput schema.
"""
class CoderAgent(BaseAgent):
def __init__(self):
super().__init__(
name="Coder",
schema_path="schemas/coder_output.json"
)
self.ai_cli = os.environ.get("AI_CLI", "claude")
def run(self, input_data: dict) -> dict:
plan = input_data["plan"]
coder_tasks = [
t for t in plan["subtasks"] if t["assignee"] == "coder"
]
user_prompt = f"""
Task ID: {plan['task_id']}
Objective: {plan['objective']}
Your subtasks:
{chr(10).join(f"- {t['description']}" for t in coder_tasks)}
Constraints:
- Max iterations: {plan['constraints'].get('max_iterations', 5)}
- Forbidden packages: {plan['constraints'].get('forbidden_packages', [])}
Implement the changes and report what you did.
"""
response_text = self._call(CODER_SYSTEM, user_prompt)
# Run AI coding CLI in headless mode to perform actual code changes
cmd = TOOL_COMMANDS.get(self.ai_cli, TOOL_COMMANDS["claude"])
result = subprocess.run(
cmd + [response_text],
capture_output=True, text=True, timeout=300
)
return {
"task_id": plan["task_id"],
"changes": [], # In practice, parsed from git diff
"test_command": "pytest tests/ -q",
"status": "complete" if result.returncode == 0 else "partial",
"agent_output": result.stdout[:500]
}
  1. Create the two schema files in the schemas/ directory
  2. Implement base_agent.py (pip install jsonschema)
  3. Implement and test planner_agent.py in isolation
  4. Implement coder_agent.py
  5. Write and run pipeline.py to connect the two agents

Based on the 2 stages you implemented, document the design for the remaining 3 stages.

pipeline_design.md
## Researcher (not yet implemented)
- Input: PlannerOutput
- Role: Explore the codebase, list relevant files, identify dependencies
- Output: context.json (relevant files + summary of key functions)
## QA (implemented in Lab 09)
- Input: CoderOutput
- Role: Run tests, measure code coverage, detect regressions
- Output: qa_report.json
## Reviewer (not yet implemented)
- Input: QA report + CoderOutput
- Role: Review code quality, make approve/reject decision
- Output: review_decision.json

Submit a PR to assignments/lab-07/[student-id]/:

  • schemas/planner_output.json, schemas/coder_output.json
  • base_agent.py — Common base class
  • planner_agent.py — Fully functional planner
  • coder_agent.py — Fully functional coder
  • pipeline.py — Script connecting and running Planner→Coder
  • pipeline_design.md — Full 5-stage design document
  • README.md — Execution results and anticipated challenges when extending the pipeline