Lab 11: Telemetry & Monitoring
Advanced
Due: 2026-05-26
docker-compose.yml
prometheus.yml
2.
Section titled “2. telemetry.py — OpenTelemetry Setup”
telemetry.py
3.
Section titled “3. instrumented_agent.py — Instrumented Agent”
instrumented_agent.py
grafana/provisioning/dashboards/agent-dashboard.json
load_generator.py
Objectives
Section titled “Objectives”- Add instrumentation to the agent pipeline using the OpenTelemetry SDK
- Configure a Prometheus metrics endpoint and connect a Grafana dashboard
- Collect key metrics including token usage, latency, and error rates
Monitoring Architecture
Section titled “Monitoring Architecture”OBSERVABILITY ARCHITECTURE
Agent CodeOpenTelemetry SDK
↓
OTel Collector
↓
tracesJaeger (distributed tracing)
metricsPrometheus → Grafana
logsLoki → Grafana
Implementation Requirements
Section titled “Implementation Requirements”1. Docker Compose Infrastructure Setup
Section titled “1. Docker Compose Infrastructure Setup”version: '3.8'
services: prometheus: image: prom/prometheus:v2.51.0 ports: - "9090:9090" volumes: - ./prometheus.yml:/etc/prometheus/prometheus.yml - prometheus_data:/prometheus command: - '--config.file=/etc/prometheus/prometheus.yml' - '--storage.tsdb.retention.time=7d'
grafana: image: grafana/grafana:10.4.0 ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin - GF_USERS_ALLOW_SIGN_UP=false volumes: - grafana_data:/var/lib/grafana - ./grafana/provisioning:/etc/grafana/provisioning depends_on: - prometheus
jaeger: image: jaegertracing/all-in-one:1.56 ports: - "16686:16686" # UI - "4317:4317" # OTLP gRPC - "4318:4318" # OTLP HTTP
volumes: prometheus_data: grafana_data:global: scrape_interval: 15s evaluation_interval: 15s
scrape_configs: - job_name: 'ai-agent' static_configs: - targets: ['host.docker.internal:8001'] metrics_path: '/metrics'2. telemetry.py — OpenTelemetry Setup
Section titled “2. telemetry.py — OpenTelemetry Setup”from opentelemetry import trace, metricsfrom opentelemetry.sdk.trace import TracerProviderfrom opentelemetry.sdk.trace.export import BatchSpanProcessorfrom opentelemetry.sdk.metrics import MeterProviderfrom opentelemetry.sdk.metrics.export import PeriodicExportingMetricReaderfrom opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporterfrom opentelemetry.exporter.prometheus import PrometheusMetricReaderfrom prometheus_client import start_http_server
def setup_telemetry( service_name: str = "ai-agent", otlp_endpoint: str = "http://localhost:4318", metrics_port: int = 8001,): """Initializes OpenTelemetry tracing and metrics."""
# Tracing setup tracer_provider = TracerProvider() otlp_exporter = OTLPSpanExporter(endpoint=f"{otlp_endpoint}/v1/traces") tracer_provider.add_span_processor(BatchSpanProcessor(otlp_exporter)) trace.set_tracer_provider(tracer_provider)
# Metrics setup (Prometheus scrape endpoint) prometheus_reader = PrometheusMetricReader() meter_provider = MeterProvider(metric_readers=[prometheus_reader]) metrics.set_meter_provider(meter_provider)
# Start Prometheus HTTP server start_http_server(port=metrics_port) print(f"[Telemetry] Prometheus metrics: http://localhost:{metrics_port}/metrics")
return ( trace.get_tracer(service_name), metrics.get_meter(service_name) )3. instrumented_agent.py — Instrumented Agent
Section titled “3. instrumented_agent.py — Instrumented Agent”import anthropicimport timefrom opentelemetry import tracefrom opentelemetry.trace import Status, StatusCodefrom opentelemetry import metricsfrom telemetry import setup_telemetry
tracer, meter = setup_telemetry(service_name="lab11-agent")
# Metric definitionstoken_counter = meter.create_counter( "agent.tokens.total", description="Total token usage", unit="tokens")request_duration = meter.create_histogram( "agent.request.duration", description="API request duration", unit="ms")error_counter = meter.create_counter( "agent.errors.total", description="API error count")iteration_counter = meter.create_counter( "agent.iterations.total", description="Ralph loop iteration count")
class InstrumentedAgent: def __init__(self): self.client = anthropic.Anthropic() self.messages: list[dict] = []
def call(self, prompt: str, iteration: int = 1) -> str: with tracer.start_as_current_span("agent.call") as span: span.set_attribute("iteration", iteration) span.set_attribute("prompt_length", len(prompt))
start_ms = time.perf_counter() * 1000
try: self.messages.append({"role": "user", "content": prompt}) response = self.client.messages.create( model="claude-sonnet-4-6", max_tokens=2048, messages=self.messages )
elapsed_ms = time.perf_counter() * 1000 - start_ms text = response.content[0].text self.messages.append({"role": "assistant", "content": text})
# Record metrics labels = {"model": "claude-sonnet-4-6", "iteration": str(iteration)} token_counter.add( response.usage.input_tokens + response.usage.output_tokens, labels ) request_duration.record(elapsed_ms, labels) iteration_counter.add(1, labels)
# Add trace attributes span.set_attribute("input_tokens", response.usage.input_tokens) span.set_attribute("output_tokens", response.usage.output_tokens) span.set_attribute("duration_ms", elapsed_ms) span.set_status(Status(StatusCode.OK))
return text
except Exception as e: span.set_status(Status(StatusCode.ERROR, str(e))) span.record_exception(e) error_counter.add(1, {"error_type": type(e).__name__}) raise4. Grafana Dashboard Provisioning
Section titled “4. Grafana Dashboard Provisioning”{ "title": "AI Agent Monitoring", "panels": [ { "title": "Token Usage per Hour", "type": "graph", "targets": [ { "expr": "rate(agent_tokens_total[5m])", "legendFormat": "{{model}}" } ] }, { "title": "API Request Latency (P95)", "type": "stat", "targets": [ { "expr": "histogram_quantile(0.95, rate(agent_request_duration_bucket[5m]))", "legendFormat": "P95 Latency (ms)" } ] }, { "title": "Error Rate", "type": "graph", "targets": [ { "expr": "rate(agent_errors_total[5m])", "legendFormat": "{{error_type}}" } ] }, { "title": "Iteration Count", "type": "stat", "targets": [ { "expr": "sum(agent_iterations_total)" } ] } ]}5. Load Generation Script
Section titled “5. Load Generation Script”import asyncioimport randomfrom instrumented_agent import InstrumentedAgent
PROMPTS = [ "Write a function to print the Fibonacci sequence in Python.", "What is the most efficient way to remove duplicates from a list?", "Explain how to sort a dictionary by its values.", "Write Python code to read a file and count the number of lines.", "Explain best practices for exception handling (try/except).",]
async def run_load(n_requests: int = 20): agent = InstrumentedAgent() for i in range(n_requests): prompt = random.choice(PROMPTS) try: response = agent.call(prompt, iteration=i + 1) print(f"[{i+1}/{n_requests}] OK ({len(response)} chars)") except Exception as e: print(f"[{i+1}/{n_requests}] ERROR: {e}") await asyncio.sleep(0.5)
if __name__ == "__main__": asyncio.run(run_load(20))- Start Prometheus, Grafana, and Jaeger with
docker-compose up -d - Install packages:
pip install opentelemetry-sdk opentelemetry-exporter-otlp opentelemetry-exporter-prometheus prometheus-client - Run
python instrumented_agent.pyand checkhttp://localhost:8001/metrics - Generate load with
python load_generator.py - Check the dashboard in Grafana (
http://localhost:3000, admin/admin) - Check distributed traces in Jaeger (
http://localhost:16686)
Deliverables
Section titled “Deliverables”Submit a PR to assignments/lab-11/[student-id]/:
-
docker-compose.yml— Monitoring infrastructure -
prometheus.yml— Scraping configuration -
telemetry.py— OTel initialization -
instrumented_agent.py— Agent instrumented with 4 metrics -
load_generator.py— Load generation script -
screenshots/grafana_dashboard.png— Grafana dashboard screenshot -
screenshots/jaeger_trace.png— Jaeger trace screenshot -
README.md— Metric interpretation, performance issues found, improvement proposals