Skip to content

Lab 11: Telemetry & Monitoring

Advanced Due: 2026-05-26
  • Add instrumentation to the agent pipeline using the OpenTelemetry SDK
  • Configure a Prometheus metrics endpoint and connect a Grafana dashboard
  • Collect key metrics including token usage, latency, and error rates
OBSERVABILITY ARCHITECTURE
Agent CodeOpenTelemetry SDK
OTel Collector
tracesJaeger (distributed tracing)
metricsPrometheus → Grafana
logsLoki → Grafana
docker-compose.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.51.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=7d'
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
jaeger:
image: jaegertracing/all-in-one:1.56
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
volumes:
prometheus_data:
grafana_data:
prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'ai-agent'
static_configs:
- targets: ['host.docker.internal:8001']
metrics_path: '/metrics'
telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from prometheus_client import start_http_server
def setup_telemetry(
service_name: str = "ai-agent",
otlp_endpoint: str = "http://localhost:4318",
metrics_port: int = 8001,
):
"""Initializes OpenTelemetry tracing and metrics."""
# Tracing setup
tracer_provider = TracerProvider()
otlp_exporter = OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces")
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
# Metrics setup (Prometheus scrape endpoint)
prometheus_reader = PrometheusMetricReader()
meter_provider = MeterProvider(metric_readers=[prometheus_reader])
metrics.set_meter_provider(meter_provider)
# Start Prometheus HTTP server
start_http_server(port=metrics_port)
print(f"[Telemetry] Prometheus metrics: http://localhost:{metrics_port}/metrics")
return (
trace.get_tracer(service_name),
metrics.get_meter(service_name)
)

3. instrumented_agent.py — Instrumented Agent

Section titled “3. instrumented_agent.py — Instrumented Agent”
instrumented_agent.py
import anthropic
import os
import time
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry import metrics
from event_recorder import EventRecorder
from telemetry import setup_telemetry
tracer, meter = setup_telemetry(service_name="lab11-agent")
# Metric definitions
token_counter = meter.create_counter(
"agent.tokens.total",
description="Total token usage",
unit="tokens"
)
request_duration = meter.create_histogram(
"agent.request.duration",
description="API request duration",
unit="ms"
)
error_counter = meter.create_counter(
"agent.errors.total",
description="API error count"
)
iteration_counter = meter.create_counter(
"agent.iterations.total",
description="Ralph loop iteration count"
)
class InstrumentedAgent:
def __init__(self):
self.client = anthropic.Anthropic()
self.model = os.environ.get("ANTHROPIC_MODEL", "claude-sonnet-4-6")
self.messages: list[dict] = []
self.recorder = EventRecorder("sessions/example")
self.session_event_id = self.recorder.append(
"session.start", "agent", {"model": self.model}
)
def call(self, prompt: str, iteration: int = 1) -> str:
with tracer.start_as_current_span("agent.call") as span:
span.set_attribute("iteration", iteration)
span.set_attribute("prompt_length", len(prompt))
start_ms = time.perf_counter() * 1000
try:
self.messages.append({"role": "user", "content": prompt})
request_event_id = self.recorder.append(
"llm.request",
"agent",
{"iteration": iteration, "prompt_length": len(prompt)},
parent_id=self.session_event_id,
)
response = self.client.messages.create(
model=self.model,
max_tokens=2048,
messages=self.messages
)
elapsed_ms = time.perf_counter() * 1000 - start_ms
text = response.content[0].text
self.messages.append({"role": "assistant", "content": text})
# Record metrics
labels = {"model": self.model, "iteration": str(iteration)}
token_counter.add(
response.usage.input_tokens + response.usage.output_tokens,
labels
)
request_duration.record(elapsed_ms, labels)
iteration_counter.add(1, labels)
# Add trace attributes
span.set_attribute("input_tokens", response.usage.input_tokens)
span.set_attribute("output_tokens", response.usage.output_tokens)
span.set_attribute("duration_ms", elapsed_ms)
span.set_status(Status(StatusCode.OK))
self.recorder.append(
"llm.response",
"agent",
{
"iteration": iteration,
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"duration_ms": elapsed_ms,
},
parent_id=request_event_id,
)
return text
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
error_counter.add(1, {"error_type": type(e).__name__})
self.recorder.append(
"hook.fired",
"agent",
{"error_type": type(e).__name__, "message": str(e)},
parent_id=self.session_event_id,
)
raise

4. event_recorder.py — Replayable Event Log

Section titled “4. event_recorder.py — Replayable Event Log”

OpenTelemetry is for live monitoring; Agent OS Runtime’s event log is for audit and replay. instrumented_agent.py must also write lifecycle events to .events.jsonl.

event_recorder.py
import json
import time
from pathlib import Path
from uuid import uuid4
class EventRecorder:
def __init__(self, session_dir: str):
self.session_dir = Path(session_dir)
self.session_dir.mkdir(parents=True, exist_ok=True)
self.path = self.session_dir / ".events.jsonl"
def append(self, event_type: str, actor: str, payload: dict, parent_id: str | None = None) -> str:
event = {
"id": str(uuid4()),
"ts": time.time(),
"type": event_type,
"actor": actor,
"payload": payload,
"schema_version": "v1",
"parent_id": parent_id,
}
with self.path.open("a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
return event["id"]

Required event types: session.start, llm.request, llm.response, hook.fired, tool.invoke, tool.result, session.close. Use Agent OS Checklist as the replay standard.

grafana/provisioning/dashboards/agent-dashboard.json
{
"title": "AI Agent Monitoring",
"panels": [
{
"title": "Token Usage per Hour",
"type": "graph",
"targets": [
{
"expr": "rate(agent_tokens_total[5m])",
"legendFormat": "{{model}}"
}
]
},
{
"title": "API Request Latency (P95)",
"type": "stat",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(agent_request_duration_bucket[5m]))",
"legendFormat": "P95 Latency (ms)"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(agent_errors_total[5m])",
"legendFormat": "{{error_type}}"
}
]
},
{
"title": "Iteration Count",
"type": "stat",
"targets": [
{
"expr": "sum(agent_iterations_total)"
}
]
}
]
}
load_generator.py
import asyncio
import random
from instrumented_agent import InstrumentedAgent
PROMPTS = [
"Write a function to print the Fibonacci sequence in Python.",
"What is the most efficient way to remove duplicates from a list?",
"Explain how to sort a dictionary by its values.",
"Write Python code to read a file and count the number of lines.",
"Explain best practices for exception handling (try/except).",
]
async def run_load(n_requests: int = 20):
agent = InstrumentedAgent()
for i in range(n_requests):
prompt = random.choice(PROMPTS)
try:
response = agent.call(prompt, iteration=i + 1)
print(f"[{i+1}/{n_requests}] OK ({len(response)} chars)")
except Exception as e:
print(f"[{i+1}/{n_requests}] ERROR: {e}")
await asyncio.sleep(0.5)
if __name__ == "__main__":
asyncio.run(run_load(20))
  1. Start Prometheus, Grafana, and Jaeger with docker-compose up -d
  2. Install packages: pip install opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-exporter-prometheus prometheus-client
  3. Run python instrumented_agent.py and check http://localhost:8001/metrics
  4. Generate load with python load_generator.py
  5. Check the dashboard in Grafana (http://localhost:3000, admin/admin)
  6. Check distributed traces in Jaeger (http://localhost:16686)

Submit a PR to assignments/lab-11/[student-id]/:

  • docker-compose.yml — Monitoring infrastructure
  • prometheus.yml — Scraping configuration
  • telemetry.py — OTel initialization
  • instrumented_agent.py — Agent instrumented with 4 metrics
  • event_recorder.py.events.jsonl recorder
  • sessions/example/.events.jsonl — execution trace
  • replay_snapshot.json — final state derived from event log
  • load_generator.py — Load generation script
  • screenshots/grafana_dashboard.png — Grafana dashboard screenshot
  • screenshots/jaeger_trace.png — Jaeger trace screenshot
  • README.md — Metric interpretation, performance issues found, improvement proposals