Skip to content

Lab 11: Telemetry & Monitoring

Advanced Due: 2026-05-26
  • Add instrumentation to the agent pipeline using the OpenTelemetry SDK
  • Configure a Prometheus metrics endpoint and connect a Grafana dashboard
  • Collect key metrics including token usage, latency, and error rates
OBSERVABILITY ARCHITECTURE
Agent CodeOpenTelemetry SDK
OTel Collector
tracesJaeger (distributed tracing)
metricsPrometheus → Grafana
logsLoki → Grafana
docker-compose.yml
version: '3.8'
services:
prometheus:
image: prom/prometheus:v2.51.0
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
- prometheus_data:/prometheus
command:
- '--config.file=/etc/prometheus/prometheus.yml'
- '--storage.tsdb.retention.time=7d'
grafana:
image: grafana/grafana:10.4.0
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_USERS_ALLOW_SIGN_UP=false
volumes:
- grafana_data:/var/lib/grafana
- ./grafana/provisioning:/etc/grafana/provisioning
depends_on:
- prometheus
jaeger:
image: jaegertracing/all-in-one:1.56
ports:
- "16686:16686" # UI
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
volumes:
prometheus_data:
grafana_data:
prometheus.yml
global:
scrape_interval: 15s
evaluation_interval: 15s
scrape_configs:
- job_name: 'ai-agent'
static_configs:
- targets: ['host.docker.internal:8001']
metrics_path: '/metrics'
telemetry.py
from opentelemetry import trace, metrics
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from prometheus_client import start_http_server
def setup_telemetry(
service_name: str = "ai-agent",
otlp_endpoint: str = "http://localhost:4318",
metrics_port: int = 8001,
):
"""Initializes OpenTelemetry tracing and metrics."""
# Tracing setup
tracer_provider = TracerProvider()
otlp_exporter = OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces")
tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter))
trace.set_tracer_provider(tracer_provider)
# Metrics setup (Prometheus scrape endpoint)
prometheus_reader = PrometheusMetricReader()
meter_provider = MeterProvider(metric_readers=[prometheus_reader])
metrics.set_meter_provider(meter_provider)
# Start Prometheus HTTP server
start_http_server(port=metrics_port)
print(f"[Telemetry] Prometheus metrics: http://localhost:{metrics_port}/metrics")
return (
trace.get_tracer(service_name),
metrics.get_meter(service_name)
)

3. instrumented_agent.py — Instrumented Agent

Section titled “3. instrumented_agent.py — Instrumented Agent”
instrumented_agent.py
import anthropic
import time
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
from opentelemetry import metrics
from telemetry import setup_telemetry
tracer, meter = setup_telemetry(service_name="lab11-agent")
# Metric definitions
token_counter = meter.create_counter(
"agent.tokens.total",
description="Total token usage",
unit="tokens"
)
request_duration = meter.create_histogram(
"agent.request.duration",
description="API request duration",
unit="ms"
)
error_counter = meter.create_counter(
"agent.errors.total",
description="API error count"
)
iteration_counter = meter.create_counter(
"agent.iterations.total",
description="Ralph loop iteration count"
)
class InstrumentedAgent:
def __init__(self):
self.client = anthropic.Anthropic()
self.messages: list[dict] = []
def call(self, prompt: str, iteration: int = 1) -> str:
with tracer.start_as_current_span("agent.call") as span:
span.set_attribute("iteration", iteration)
span.set_attribute("prompt_length", len(prompt))
start_ms = time.perf_counter() * 1000
try:
self.messages.append({"role": "user", "content": prompt})
response = self.client.messages.create(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=self.messages
)
elapsed_ms = time.perf_counter() * 1000 - start_ms
text = response.content[0].text
self.messages.append({"role": "assistant", "content": text})
# Record metrics
labels = {"model": "claude-sonnet-4-6", "iteration": str(iteration)}
token_counter.add(
response.usage.input_tokens + response.usage.output_tokens,
labels
)
request_duration.record(elapsed_ms, labels)
iteration_counter.add(1, labels)
# Add trace attributes
span.set_attribute("input_tokens", response.usage.input_tokens)
span.set_attribute("output_tokens", response.usage.output_tokens)
span.set_attribute("duration_ms", elapsed_ms)
span.set_status(Status(StatusCode.OK))
return text
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
error_counter.add(1, {"error_type": type(e).__name__})
raise
grafana/provisioning/dashboards/agent-dashboard.json
{
"title": "AI Agent Monitoring",
"panels": [
{
"title": "Token Usage per Hour",
"type": "graph",
"targets": [
{
"expr": "rate(agent_tokens_total[5m])",
"legendFormat": "{{model}}"
}
]
},
{
"title": "API Request Latency (P95)",
"type": "stat",
"targets": [
{
"expr": "histogram_quantile(0.95, rate(agent_request_duration_bucket[5m]))",
"legendFormat": "P95 Latency (ms)"
}
]
},
{
"title": "Error Rate",
"type": "graph",
"targets": [
{
"expr": "rate(agent_errors_total[5m])",
"legendFormat": "{{error_type}}"
}
]
},
{
"title": "Iteration Count",
"type": "stat",
"targets": [
{
"expr": "sum(agent_iterations_total)"
}
]
}
]
}
load_generator.py
import asyncio
import random
from instrumented_agent import InstrumentedAgent
PROMPTS = [
"Write a function to print the Fibonacci sequence in Python.",
"What is the most efficient way to remove duplicates from a list?",
"Explain how to sort a dictionary by its values.",
"Write Python code to read a file and count the number of lines.",
"Explain best practices for exception handling (try/except).",
]
async def run_load(n_requests: int = 20):
agent = InstrumentedAgent()
for i in range(n_requests):
prompt = random.choice(PROMPTS)
try:
response = agent.call(prompt, iteration=i + 1)
print(f"[{i+1}/{n_requests}] OK ({len(response)} chars)")
except Exception as e:
print(f"[{i+1}/{n_requests}] ERROR: {e}")
await asyncio.sleep(0.5)
if __name__ == "__main__":
asyncio.run(run_load(20))
  1. Start Prometheus, Grafana, and Jaeger with docker-compose up -d
  2. Install packages: pip install opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-exporter-prometheus prometheus-client
  3. Run python instrumented_agent.py and check http://localhost:8001/metrics
  4. Generate load with python load_generator.py
  5. Check the dashboard in Grafana (http://localhost:3000, admin/admin)
  6. Check distributed traces in Jaeger (http://localhost:16686)

Submit a PR to assignments/lab-11/[student-id]/:

  • docker-compose.yml — Monitoring infrastructure
  • prometheus.yml — Scraping configuration
  • telemetry.py — OTel initialization
  • instrumented_agent.py — Agent instrumented with 4 metrics
  • load_generator.py — Load generation script
  • screenshots/grafana_dashboard.png — Grafana dashboard screenshot
  • screenshots/jaeger_trace.png — Jaeger trace screenshot
  • README.md — Metric interpretation, performance issues found, improvement proposals