arrow_backBACK TO LLM ENGINEERING IN PRODUCTION
Lesson 14LLM Engineering in Production17 min read

LLM Observability and Monitoring

April 01, 2026

TL;DR

You can't debug what you can't see. Log every LLM call: model, tokens, latency, cost, and a sample of input/output. Trace multi-step chains end-to-end with correlation IDs. Track four dashboards: cost per day, P95 latency, error rate, and output quality score. Set alerts on cost spikes and latency degradation. Use Langfuse or LangSmith for LLM-specific tracing. Quality drift is the silent killer — run eval suites on a schedule, not just at deploy time.

Traditional application monitoring asks: “Is the server up? Is latency acceptable? Are there errors?” LLM monitoring asks all of that plus: “Is the model still giving good answers? Why did cost triple overnight? Did that prompt change make things worse? Why did the agent take 47 seconds on a simple question?”

LLM applications are uniquely hard to monitor because their behavior is non-deterministic, their costs are variable per request, and “failure” often looks like a successful HTTP 200 response with a terrible answer. You need purpose-built observability for this.

What Makes LLM Observability Different

In a traditional web service, a 200 status code means success. In an LLM application, a 200 status code means the API accepted your request and returned tokens. Those tokens might be a perfect answer, a hallucination, a refusal, or complete nonsense. The HTTP layer tells you nothing about output quality.

Here’s what changes:

Traditional App LLM App
Response time is predictable Response time varies 2-30x depending on output length
Cost per request is ~constant Cost varies by input/output token count
Errors are explicit (4xx, 5xx) “Errors” are often valid responses with bad content
Testing is deterministic Same input can produce different outputs
Degradation is binary (works/broken) Degradation is gradual (quality drifts over time)

What to Log

Every LLM call should capture these fields:

from dataclasses import dataclass, field, asdict
from datetime import datetime
from typing import Optional
import hashlib
import json
import time

@dataclass
class LLMCallRecord:
    # Identity
    call_id: str                          # Unique ID for this call
    trace_id: str                         # Correlation ID for multi-step chains
    span_name: str                        # e.g., "rag_generate", "agent_step_3"

    # Request
    model: str                            # e.g., "gpt-4o", "claude-sonnet-4-20250514"
    temperature: float
    max_tokens: int
    messages_hash: str                    # Hash of input messages (NOT the full text)
    message_count: int                    # Number of messages in the conversation
    system_prompt_version: str            # Version tag for your system prompt

    # Response
    finish_reason: str                    # "stop", "length", "tool_calls", "content_filter"
    output_length: int                    # Character count of the response

    # Tokens & Cost
    prompt_tokens: int
    completion_tokens: int
    total_tokens: int
    estimated_cost_usd: float

    # Performance
    latency_ms: float                     # Total request time
    time_to_first_token_ms: Optional[float] = None  # For streaming responses

    # Quality signals
    has_tool_calls: bool = False
    tool_names: list = field(default_factory=list)
    error: Optional[str] = None

    # Metadata
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())
    user_id: Optional[str] = None
    environment: str = "production"

What NOT to Log

This is equally important:

# NEVER log these:
# - Full user messages (contain PII — names, emails, medical info, etc.)
# - Full model responses (may contain PII echoed back, or sensitive generated content)
# - API keys (obviously)
# - Raw embeddings (massive, useless in logs)

# INSTEAD, log:
# - A hash of the input messages (for deduplication and debugging)
# - The first 100 characters of the response (for quick debugging)
# - A quality score (if you have one)

def hash_messages(messages: list) -> str:
    """Create a deterministic hash of the message content."""
    content = json.dumps(messages, sort_keys=True)
    return hashlib.sha256(content.encode()).hexdigest()[:16]

def safe_preview(text: str, max_length: int = 100) -> str:
    """Truncated preview for logs — reduces PII exposure."""
    if len(text) <= max_length:
        return text
    return text[:max_length] + "..."

If you need to debug a specific call, store full inputs/outputs in a separate, access-controlled data store with a retention policy and encryption. Never put them in your general application logs.

Building an LLM Call Wrapper

Wrap every LLM call to capture metrics automatically. This is the single most valuable piece of observability infrastructure you can build:

import openai
import time
import uuid
import logging
import json
from typing import Optional, Generator
from contextlib import contextmanager

logger = logging.getLogger("llm_observability")

# Cost per token (update these as pricing changes)
MODEL_COSTS = {
    "gpt-4o": {"input": 2.50 / 1_000_000, "output": 10.00 / 1_000_000},
    "gpt-4o-mini": {"input": 0.15 / 1_000_000, "output": 0.60 / 1_000_000},
    "claude-sonnet-4-20250514": {"input": 3.00 / 1_000_000, "output": 15.00 / 1_000_000},
}


def estimate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
    """Calculate estimated cost for an LLM call."""
    costs = MODEL_COSTS.get(model, {"input": 0.01 / 1_000_000, "output": 0.03 / 1_000_000})
    return (prompt_tokens * costs["input"]) + (completion_tokens * costs["output"])


class ObservableLLMClient:
    """
    Wrapper around OpenAI client that automatically logs every call.
    Drop-in replacement — same API, added observability.
    """

    def __init__(self, trace_id: Optional[str] = None, user_id: Optional[str] = None,
                 environment: str = "production"):
        self.client = openai.OpenAI()
        self.trace_id = trace_id or str(uuid.uuid4())
        self.user_id = user_id
        self.environment = environment
        self.call_log: list[LLMCallRecord] = []

    def chat_completion(
        self,
        messages: list,
        model: str = "gpt-4o",
        temperature: float = 0.0,
        max_tokens: int = 4096,
        tools: Optional[list] = None,
        span_name: str = "default",
        system_prompt_version: str = "unknown",
        **kwargs
    ):
        """Make an LLM call with automatic observability."""
        call_id = str(uuid.uuid4())[:8]
        start_time = time.time()

        try:
            create_kwargs = {
                "model": model,
                "messages": messages,
                "temperature": temperature,
                "max_tokens": max_tokens,
                **kwargs
            }
            if tools:
                create_kwargs["tools"] = tools

            response = self.client.chat.completions.create(**create_kwargs)
            latency_ms = (time.time() - start_time) * 1000

            message = response.choices[0].message
            usage = response.usage

            record = LLMCallRecord(
                call_id=call_id,
                trace_id=self.trace_id,
                span_name=span_name,
                model=model,
                temperature=temperature,
                max_tokens=max_tokens,
                messages_hash=hash_messages(messages),
                message_count=len(messages),
                system_prompt_version=system_prompt_version,
                finish_reason=response.choices[0].finish_reason,
                output_length=len(message.content or ""),
                prompt_tokens=usage.prompt_tokens,
                completion_tokens=usage.completion_tokens,
                total_tokens=usage.total_tokens,
                estimated_cost_usd=estimate_cost(model, usage.prompt_tokens, usage.completion_tokens),
                latency_ms=round(latency_ms, 1),
                has_tool_calls=bool(message.tool_calls),
                tool_names=[tc.function.name for tc in (message.tool_calls or [])],
                user_id=self.user_id,
                environment=self.environment,
            )

            self.call_log.append(record)
            self._emit(record)

            return response

        except Exception as e:
            latency_ms = (time.time() - start_time) * 1000

            record = LLMCallRecord(
                call_id=call_id,
                trace_id=self.trace_id,
                span_name=span_name,
                model=model,
                temperature=temperature,
                max_tokens=max_tokens,
                messages_hash=hash_messages(messages),
                message_count=len(messages),
                system_prompt_version=system_prompt_version,
                finish_reason="error",
                output_length=0,
                prompt_tokens=0,
                completion_tokens=0,
                total_tokens=0,
                estimated_cost_usd=0.0,
                latency_ms=round(latency_ms, 1),
                error=f"{type(e).__name__}: {str(e)}",
                user_id=self.user_id,
                environment=self.environment,
            )

            self.call_log.append(record)
            self._emit(record)
            raise

    def _emit(self, record: LLMCallRecord):
        """Emit the record to your logging infrastructure."""
        log_data = asdict(record)
        logger.info(json.dumps(log_data))

    def get_trace_summary(self) -> dict:
        """Get a summary of all calls in this trace."""
        return {
            "trace_id": self.trace_id,
            "total_calls": len(self.call_log),
            "total_tokens": sum(r.total_tokens for r in self.call_log),
            "total_cost": round(sum(r.estimated_cost_usd for r in self.call_log), 6),
            "total_latency_ms": round(sum(r.latency_ms for r in self.call_log), 1),
            "errors": sum(1 for r in self.call_log if r.error),
            "models_used": list(set(r.model for r in self.call_log)),
        }


# Usage
llm = ObservableLLMClient(user_id="user_123", environment="production")

response = llm.chat_completion(
    messages=[
        {"role": "system", "content": "You are a helpful assistant."},
        {"role": "user", "content": "Explain DNS in one sentence."}
    ],
    model="gpt-4o-mini",
    span_name="simple_question",
    system_prompt_version="v2.1"
)

print(response.choices[0].message.content)
print(f"Trace summary: {json.dumps(llm.get_trace_summary(), indent=2)}")

Distributed Tracing for Multi-Step Chains

A RAG pipeline involves multiple LLM calls: embedding the query, retrieving documents, generating the answer, maybe a reranking step. An agent might make 5-10 LLM calls per user query. You need to trace all of these as a single operation.

import uuid
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from typing import Optional

@dataclass
class Span:
    span_id: str
    trace_id: str
    parent_span_id: Optional[str]
    name: str
    started_at: float
    ended_at: Optional[float] = None
    metadata: dict = field(default_factory=dict)
    status: str = "ok"
    error: Optional[str] = None

    @property
    def duration_ms(self) -> float:
        if self.ended_at is None:
            return 0
        return round((self.ended_at - self.started_at) * 1000, 1)


class TraceContext:
    """Lightweight distributed tracing for LLM pipelines."""

    def __init__(self, trace_id: Optional[str] = None):
        self.trace_id = trace_id or str(uuid.uuid4())
        self.spans: list[Span] = []
        self._span_stack: list[str] = []

    @contextmanager
    def span(self, name: str, metadata: Optional[dict] = None):
        """Create a traced span. Spans can be nested."""
        span = Span(
            span_id=str(uuid.uuid4())[:8],
            trace_id=self.trace_id,
            parent_span_id=self._span_stack[-1] if self._span_stack else None,
            name=name,
            started_at=time.time(),
            metadata=metadata or {}
        )
        self._span_stack.append(span.span_id)

        try:
            yield span
            span.status = "ok"
        except Exception as e:
            span.status = "error"
            span.error = str(e)
            raise
        finally:
            span.ended_at = time.time()
            self._span_stack.pop()
            self.spans.append(span)

    def summary(self) -> dict:
        total_ms = sum(s.duration_ms for s in self.spans if s.parent_span_id is None)
        return {
            "trace_id": self.trace_id,
            "total_spans": len(self.spans),
            "total_duration_ms": total_ms,
            "spans": [
                {
                    "name": s.name,
                    "duration_ms": s.duration_ms,
                    "status": s.status,
                    "parent": s.parent_span_id,
                    "metadata": s.metadata
                }
                for s in self.spans
            ]
        }


# Example: tracing a RAG pipeline
def traced_rag_pipeline(query: str) -> dict:
    trace = TraceContext()

    with trace.span("rag_pipeline", {"query_length": len(query)}) as root:

        with trace.span("embed_query") as embed_span:
            # query_embedding = embed(query)
            time.sleep(0.05)  # Simulated
            embed_span.metadata["model"] = "text-embedding-3-small"
            embed_span.metadata["dimensions"] = 1536

        with trace.span("vector_search") as search_span:
            # results = vector_store.search(query_embedding, top_k=5)
            time.sleep(0.1)  # Simulated
            search_span.metadata["top_k"] = 5
            search_span.metadata["results_found"] = 5

        with trace.span("generate_answer") as gen_span:
            # answer = llm.generate(query, results)
            time.sleep(0.3)  # Simulated
            gen_span.metadata["model"] = "gpt-4o"
            gen_span.metadata["prompt_tokens"] = 1500
            gen_span.metadata["completion_tokens"] = 200

    result = trace.summary()
    logger.info(json.dumps(result))
    return result


# Output:
# {
#   "trace_id": "abc-123",
#   "total_spans": 4,
#   "total_duration_ms": 462.3,
#   "spans": [
#     {"name": "embed_query", "duration_ms": 51.2, "status": "ok", ...},
#     {"name": "vector_search", "duration_ms": 102.1, "status": "ok", ...},
#     {"name": "generate_answer", "duration_ms": 305.8, "status": "ok", ...},
#     {"name": "rag_pipeline", "duration_ms": 462.3, "status": "ok", ...}
#   ]
# }

This tells you exactly where time is spent. If your RAG pipeline takes 2 seconds, the trace shows whether it’s the embedding (50ms), vector search (100ms), or generation (1850ms) that’s slow.

OpenTelemetry Integration

If your organization already uses OpenTelemetry, you can export LLM spans alongside your existing traces:

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor, ConsoleSpanExporter
from opentelemetry.sdk.resources import Resource

# Setup (once at application startup)
resource = Resource.create({"service.name": "llm-service"})
provider = TracerProvider(resource=resource)
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
trace.set_tracer_provider(provider)

tracer = trace.get_tracer("llm-service")


def otel_llm_call(messages: list, model: str = "gpt-4o") -> str:
    """LLM call instrumented with OpenTelemetry."""
    with tracer.start_as_current_span("llm.chat_completion") as span:
        span.set_attribute("llm.model", model)
        span.set_attribute("llm.message_count", len(messages))
        span.set_attribute("llm.system_prompt_hash", hash_messages(messages[:1]))

        start = time.time()
        try:
            client = openai.OpenAI()
            response = client.chat.completions.create(
                model=model,
                messages=messages
            )

            usage = response.usage
            span.set_attribute("llm.prompt_tokens", usage.prompt_tokens)
            span.set_attribute("llm.completion_tokens", usage.completion_tokens)
            span.set_attribute("llm.total_tokens", usage.total_tokens)
            span.set_attribute("llm.finish_reason", response.choices[0].finish_reason)
            span.set_attribute("llm.latency_ms", (time.time() - start) * 1000)
            span.set_attribute("llm.cost_usd", estimate_cost(model, usage.prompt_tokens, usage.completion_tokens))
            span.set_status(trace.StatusCode.OK)

            return response.choices[0].message.content

        except Exception as e:
            span.set_status(trace.StatusCode.ERROR, str(e))
            span.record_exception(e)
            raise

The advantage of OpenTelemetry is that your LLM spans appear alongside HTTP spans, database spans, and cache spans in the same trace. You can see the full picture: user request → API handler → LLM call → database write → response.

Langfuse Integration

Langfuse is an open-source LLM observability platform purpose-built for this problem. It gives you traces, scoring, cost tracking, and prompt management in a single tool.

from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context

# Initialize (set LANGFUSE_PUBLIC_KEY, LANGFUSE_SECRET_KEY, LANGFUSE_HOST in env)
langfuse = Langfuse()


@observe(as_type="generation")
def generate_answer(query: str, context: str) -> str:
    """Langfuse automatically traces this function as an LLM generation."""
    client = openai.OpenAI()

    messages = [
        {"role": "system", "content": "Answer based on the provided context only."},
        {"role": "user", "content": f"Context: {context}\n\nQuestion: {query}"}
    ]

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        temperature=0.0
    )

    result = response.choices[0].message.content

    # Report usage to Langfuse for cost tracking
    langfuse_context.update_current_observation(
        model="gpt-4o",
        usage={
            "input": response.usage.prompt_tokens,
            "output": response.usage.completion_tokens
        },
        metadata={"query_length": len(query), "context_length": len(context)}
    )

    return result


@observe()  # Traces the full pipeline
def rag_pipeline(query: str) -> str:
    """Full RAG pipeline traced in Langfuse."""
    # Each sub-function creates a nested span
    context = retrieve_documents(query)
    answer = generate_answer(query, context)
    return answer


@observe()
def retrieve_documents(query: str) -> str:
    """Document retrieval step."""
    # Your retrieval logic here
    langfuse_context.update_current_observation(
        metadata={"top_k": 5, "vector_db": "chromadb"}
    )
    return "Retrieved context goes here..."


# Scoring — rate the quality of outputs
def score_response(trace_id: str, score: float, comment: str = ""):
    """Score a trace for quality tracking."""
    langfuse.score(
        trace_id=trace_id,
        name="quality",
        value=score,  # 0.0 to 1.0
        comment=comment
    )

Langfuse gives you a web dashboard where you can see every trace, drill into individual spans, review inputs/outputs (with access controls), track costs over time, and score outputs for quality.

Key Dashboards

You need four dashboards. Build these from your logged data:

1. Cost Dashboard

from collections import defaultdict
from datetime import datetime, timedelta

def calculate_cost_metrics(records: list[LLMCallRecord]) -> dict:
    """Aggregate cost metrics from call records."""
    daily_costs = defaultdict(float)
    model_costs = defaultdict(float)
    user_costs = defaultdict(float)

    for r in records:
        day = r.timestamp[:10]  # "2026-04-01"
        daily_costs[day] += r.estimated_cost_usd
        model_costs[r.model] += r.estimated_cost_usd
        if r.user_id:
            user_costs[r.user_id] += r.estimated_cost_usd

    return {
        "total_cost": round(sum(r.estimated_cost_usd for r in records), 4),
        "daily_costs": dict(daily_costs),
        "cost_by_model": dict(model_costs),
        "top_users_by_cost": dict(sorted(user_costs.items(), key=lambda x: x[1], reverse=True)[:10]),
        "avg_cost_per_call": round(sum(r.estimated_cost_usd for r in records) / max(len(records), 1), 6),
    }

What to track: daily cost, cost by model, cost by feature, cost by user, average cost per request. Alert when daily cost exceeds 1.5x the 7-day average.

2. Latency Dashboard

import statistics

def calculate_latency_metrics(records: list[LLMCallRecord]) -> dict:
    """Calculate latency percentiles."""
    latencies = [r.latency_ms for r in records if r.error is None]

    if not latencies:
        return {"error": "No successful calls to analyze"}

    latencies.sort()

    def percentile(data, p):
        k = (len(data) - 1) * (p / 100)
        f = int(k)
        c = f + 1 if f + 1 < len(data) else f
        return data[f] + (k - f) * (data[c] - data[f])

    return {
        "p50_ms": round(percentile(latencies, 50), 1),
        "p90_ms": round(percentile(latencies, 90), 1),
        "p95_ms": round(percentile(latencies, 95), 1),
        "p99_ms": round(percentile(latencies, 99), 1),
        "mean_ms": round(statistics.mean(latencies), 1),
        "min_ms": round(min(latencies), 1),
        "max_ms": round(max(latencies), 1),
        "total_calls": len(latencies),
    }

What to track: P50, P95, P99 latency, broken down by model and endpoint. Alert when P95 exceeds 2x the baseline.

3. Error Rate Dashboard

def calculate_error_metrics(records: list[LLMCallRecord]) -> dict:
    """Track error rates and types."""
    errors = [r for r in records if r.error is not None]
    error_types = defaultdict(int)

    for r in errors:
        # Extract error type from the error string
        error_type = r.error.split(":")[0] if ":" in r.error else r.error
        error_types[error_type] += 1

    # Track finish_reason distribution
    finish_reasons = defaultdict(int)
    for r in records:
        finish_reasons[r.finish_reason] += 1

    return {
        "total_calls": len(records),
        "error_count": len(errors),
        "error_rate": round(len(errors) / max(len(records), 1) * 100, 2),
        "error_types": dict(error_types),
        "finish_reasons": dict(finish_reasons),
        # "content_filter" finish_reason means the model refused to answer
        "refusal_rate": round(finish_reasons.get("content_filter", 0) / max(len(records), 1) * 100, 2),
    }

What to track: overall error rate, error rate by type (timeout, rate limit, content filter, invalid response), and refusal rate. Alert on error rate above 2%.

4. Quality Score Dashboard

def calculate_quality_metrics(scored_records: list[dict]) -> dict:
    """
    Track quality over time.
    Each record has: {"timestamp": ..., "score": 0.0-1.0, "evaluator": "auto"|"human"}
    """
    if not scored_records:
        return {"error": "No scored records"}

    scores = [r["score"] for r in scored_records]
    auto_scores = [r["score"] for r in scored_records if r["evaluator"] == "auto"]
    human_scores = [r["score"] for r in scored_records if r["evaluator"] == "human"]

    # Calculate weekly trend
    weekly_scores = defaultdict(list)
    for r in scored_records:
        # ISO week number
        dt = datetime.fromisoformat(r["timestamp"])
        week_key = f"{dt.year}-W{dt.isocalendar()[1]:02d}"
        weekly_scores[week_key].append(r["score"])

    weekly_averages = {
        week: round(statistics.mean(scores), 3)
        for week, scores in sorted(weekly_scores.items())
    }

    return {
        "overall_mean": round(statistics.mean(scores), 3),
        "overall_median": round(statistics.median(scores), 3),
        "auto_eval_mean": round(statistics.mean(auto_scores), 3) if auto_scores else None,
        "human_eval_mean": round(statistics.mean(human_scores), 3) if human_scores else None,
        "weekly_trend": weekly_averages,
        "below_threshold": sum(1 for s in scores if s < 0.7),
        "below_threshold_rate": round(sum(1 for s in scores if s < 0.7) / len(scores) * 100, 2),
    }

Cost Monitoring and Alerting

Cost spikes are the most common LLM production incident. A prompt regression that doubles output length, a bug that retries in a loop, or a sudden traffic increase can burn through your budget fast.

class CostMonitor:
    """Monitor LLM costs and alert on anomalies."""

    def __init__(self, daily_budget_usd: float = 100.0, alert_callback=None):
        self.daily_budget = daily_budget_usd
        self.alert_callback = alert_callback or self._default_alert
        self.daily_spend: dict[str, float] = defaultdict(float)
        self.hourly_spend: dict[str, float] = defaultdict(float)

    def record_cost(self, cost_usd: float, model: str):
        now = datetime.utcnow()
        day_key = now.strftime("%Y-%m-%d")
        hour_key = now.strftime("%Y-%m-%d-%H")

        self.daily_spend[day_key] += cost_usd
        self.hourly_spend[hour_key] += cost_usd

        # Check daily budget
        if self.daily_spend[day_key] > self.daily_budget:
            self.alert_callback(
                severity="critical",
                message=f"Daily budget exceeded: ${self.daily_spend[day_key]:.2f} / ${self.daily_budget:.2f}",
                context={"day": day_key, "model": model}
            )

        # Check for hourly spike (more than 1/8 of daily budget in one hour)
        hourly_threshold = self.daily_budget / 8
        if self.hourly_spend[hour_key] > hourly_threshold:
            self.alert_callback(
                severity="warning",
                message=f"Hourly cost spike: ${self.hourly_spend[hour_key]:.2f} (threshold: ${hourly_threshold:.2f})",
                context={"hour": hour_key, "model": model}
            )

    def _default_alert(self, severity: str, message: str, context: dict):
        logger.warning(f"[{severity.upper()}] {message} | {json.dumps(context)}")

    def get_daily_report(self) -> dict:
        today = datetime.utcnow().strftime("%Y-%m-%d")
        return {
            "date": today,
            "spend": round(self.daily_spend.get(today, 0), 4),
            "budget": self.daily_budget,
            "utilization": round(self.daily_spend.get(today, 0) / self.daily_budget * 100, 1),
        }


# Usage
cost_monitor = CostMonitor(daily_budget_usd=50.0)

# After each LLM call:
cost_monitor.record_cost(0.003, "gpt-4o")

Latency Breakdown

LLM latency isn’t a single number. Understanding where time goes is essential for optimization:

import time
import openai

def measure_latency_breakdown(messages: list, model: str = "gpt-4o") -> dict:
    """Measure detailed latency breakdown for an LLM call."""
    client = openai.OpenAI()

    # Non-streaming: total latency
    start = time.time()
    response = client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens=500
    )
    total_latency = time.time() - start

    tokens = response.usage.completion_tokens

    return {
        "total_ms": round(total_latency * 1000, 1),
        "completion_tokens": tokens,
        "ms_per_token": round((total_latency * 1000) / max(tokens, 1), 1),
    }


def measure_streaming_latency(messages: list, model: str = "gpt-4o") -> dict:
    """Measure time-to-first-token and total time for streaming."""
    client = openai.OpenAI()

    start = time.time()
    first_token_time = None
    token_count = 0

    stream = client.chat.completions.create(
        model=model,
        messages=messages,
        max_tokens=500,
        stream=True
    )

    for chunk in stream:
        if chunk.choices[0].delta.content:
            if first_token_time is None:
                first_token_time = time.time()
            token_count += 1

    end_time = time.time()

    return {
        "time_to_first_token_ms": round((first_token_time - start) * 1000, 1) if first_token_time else None,
        "total_ms": round((end_time - start) * 1000, 1),
        "generation_ms": round((end_time - first_token_time) * 1000, 1) if first_token_time else None,
        "tokens_generated": token_count,
    }

In a typical RAG call, the time breakdown looks like:

  • Embedding: 50-100ms
  • Vector search: 10-50ms
  • LLM generation (TTFT): 200-500ms
  • LLM generation (full): 1000-3000ms
  • Total: 1200-3600ms

The LLM generation dominates. Optimizing your retrieval from 50ms to 20ms is irrelevant when generation takes 2 seconds. Focus on model selection and prompt length.

Quality Drift Detection

This is the most dangerous problem in LLM production. The model doesn’t crash — it just gradually gets worse. Maybe OpenAI rolled out a new model version. Maybe your data changed. Maybe your prompts interact differently with the new model weights.

import statistics
from datetime import datetime, timedelta

class QualityDriftDetector:
    """
    Detect when LLM output quality degrades over time.
    Uses a simple statistical process control approach.
    """

    def __init__(self, baseline_scores: list[float], sensitivity: float = 2.0):
        """
        baseline_scores: quality scores from your initial evaluation
        sensitivity: number of standard deviations before alerting (lower = more sensitive)
        """
        self.baseline_mean = statistics.mean(baseline_scores)
        self.baseline_std = statistics.stdev(baseline_scores) if len(baseline_scores) > 1 else 0.1
        self.sensitivity = sensitivity
        self.lower_bound = self.baseline_mean - (sensitivity * self.baseline_std)
        self.recent_scores: list[dict] = []

    def add_score(self, score: float, metadata: dict = None):
        """Add a new quality score."""
        self.recent_scores.append({
            "score": score,
            "timestamp": datetime.utcnow().isoformat(),
            "metadata": metadata or {}
        })

    def check_drift(self, window_size: int = 50) -> dict:
        """Check if recent scores indicate quality drift."""
        if len(self.recent_scores) < window_size:
            return {"status": "insufficient_data", "samples": len(self.recent_scores)}

        recent = [s["score"] for s in self.recent_scores[-window_size:]]
        recent_mean = statistics.mean(recent)

        # Is the recent mean significantly below the baseline?
        drift_detected = recent_mean < self.lower_bound

        # Calculate Z-score
        z_score = (recent_mean - self.baseline_mean) / max(self.baseline_std, 0.001)

        return {
            "status": "drift_detected" if drift_detected else "ok",
            "baseline_mean": round(self.baseline_mean, 4),
            "recent_mean": round(recent_mean, 4),
            "z_score": round(z_score, 2),
            "lower_bound": round(self.lower_bound, 4),
            "window_size": window_size,
            "drift_detected": drift_detected,
        }


# Usage: run this on a schedule (daily or after deployments)
def scheduled_quality_check():
    """Run eval suite and check for drift. Call this from a cron job."""
    # Load your test dataset
    test_cases = [
        {"query": "What is our return policy?", "expected_keywords": ["30 days", "refund"]},
        {"query": "How do I reset my password?", "expected_keywords": ["settings", "email"]},
        # ... 50-100 test cases
    ]

    scores = []
    for case in test_cases:
        answer = rag_pipeline(case["query"])  # Your production pipeline
        score = evaluate_answer(answer, case["expected_keywords"])
        scores.append(score)

    detector = QualityDriftDetector(
        baseline_scores=[0.85, 0.90, 0.88, 0.92, 0.87, 0.91],  # From initial evaluation
        sensitivity=2.0
    )

    for s in scores:
        detector.add_score(s)

    result = detector.check_drift()

    if result["drift_detected"]:
        # Send alert
        logger.critical(f"Quality drift detected! Recent mean: {result['recent_mean']}, baseline: {result['baseline_mean']}")
        # send_pagerduty_alert(result)

    return result

Debugging Production Issues

When a user reports “the bot gave me a wrong answer,” you need to trace back from the output to the inputs. Here’s a systematic approach:

def debug_llm_response(trace_id: str) -> dict:
    """
    Given a trace ID, reconstruct what happened.
    In production, this queries your logging/tracing backend.
    """
    # 1. Find all spans for this trace
    # spans = tracing_backend.get_spans(trace_id)

    # 2. For each LLM call, check:
    debug_report = {
        "trace_id": trace_id,
        "checks": []
    }

    # Check: Was the right model used?
    debug_report["checks"].append({
        "check": "model_version",
        "question": "Was the expected model used?",
        # "model_used": span.model,
        # "model_expected": "gpt-4o"
    })

    # Check: Was the system prompt correct?
    debug_report["checks"].append({
        "check": "system_prompt",
        "question": "Was the correct system prompt version deployed?",
        # "prompt_version": span.system_prompt_version,
        # "current_version": config.get("system_prompt_version")
    })

    # Check: What context was retrieved? (for RAG)
    debug_report["checks"].append({
        "check": "retrieved_context",
        "question": "Were relevant documents retrieved?",
        # "document_ids": retrieval_span.metadata["doc_ids"],
        # "relevance_scores": retrieval_span.metadata["scores"]
    })

    # Check: Were there token truncations?
    debug_report["checks"].append({
        "check": "truncation",
        "question": "Was the context truncated to fit the token limit?",
        # "total_tokens": span.prompt_tokens,
        # "max_tokens": config.get("max_context_tokens")
    })

    # Check: What was the finish_reason?
    debug_report["checks"].append({
        "check": "finish_reason",
        "question": "Did the model finish normally?",
        # "finish_reason": span.finish_reason
        # "length" means the response was cut off
        # "content_filter" means the model refused
    })

    return debug_report

The most common root causes, in order:

  1. Bad retrieval — the right documents weren’t found
  2. Context truncation — relevant information was cut to fit the token limit
  3. Prompt regression — a prompt change made things worse
  4. Model update — the provider updated the model weights
  5. New edge case — input the system wasn’t designed to handle

Alerting Rules

Not everything deserves a page. Set up tiered alerting:

ALERT_RULES = {
    # Critical — pages the on-call engineer
    "daily_cost_exceeded": {
        "condition": lambda metrics: metrics["daily_cost"] > 200.0,
        "severity": "critical",
        "message": "Daily LLM cost exceeded $200"
    },
    "error_rate_high": {
        "condition": lambda metrics: metrics["error_rate"] > 5.0,
        "severity": "critical",
        "message": "LLM error rate above 5%"
    },

    # Warning — sends to Slack
    "p95_latency_degraded": {
        "condition": lambda metrics: metrics["p95_latency_ms"] > 5000,
        "severity": "warning",
        "message": "P95 latency above 5 seconds"
    },
    "quality_drift": {
        "condition": lambda metrics: metrics.get("quality_mean", 1.0) < 0.75,
        "severity": "warning",
        "message": "Quality score dropped below 0.75"
    },
    "cost_spike_hourly": {
        "condition": lambda metrics: metrics["hourly_cost"] > metrics["daily_budget"] / 8,
        "severity": "warning",
        "message": "Hourly cost spike detected"
    },

    # Info — logged but no notification
    "token_usage_high": {
        "condition": lambda metrics: metrics["avg_tokens_per_call"] > 4000,
        "severity": "info",
        "message": "Average token usage per call above 4000"
    },
}


def evaluate_alerts(metrics: dict) -> list[dict]:
    """Check all alert rules against current metrics."""
    triggered = []
    for rule_name, rule in ALERT_RULES.items():
        try:
            if rule["condition"](metrics):
                triggered.append({
                    "rule": rule_name,
                    "severity": rule["severity"],
                    "message": rule["message"],
                    "timestamp": datetime.utcnow().isoformat()
                })
        except (KeyError, TypeError):
            pass  # Missing metric, skip this rule

    return triggered

Integrating with Existing Monitoring

If you’re already using Datadog, Grafana, or similar platforms, emit LLM metrics as custom metrics:

# Datadog integration
try:
    from datadog import statsd

    def emit_datadog_metrics(record: LLMCallRecord):
        """Send LLM metrics to Datadog."""
        tags = [
            f"model:{record.model}",
            f"span:{record.span_name}",
            f"environment:{record.environment}",
        ]

        statsd.increment("llm.calls.total", tags=tags)
        statsd.histogram("llm.latency_ms", record.latency_ms, tags=tags)
        statsd.histogram("llm.tokens.total", record.total_tokens, tags=tags)
        statsd.histogram("llm.cost_usd", record.estimated_cost_usd, tags=tags)

        if record.error:
            statsd.increment("llm.calls.errors", tags=tags + [f"error_type:{record.error.split(':')[0]}"])

        if record.finish_reason == "content_filter":
            statsd.increment("llm.calls.filtered", tags=tags)

except ImportError:
    pass  # Datadog not installed


# Prometheus integration
try:
    from prometheus_client import Counter, Histogram, Gauge

    llm_calls_total = Counter("llm_calls_total", "Total LLM API calls", ["model", "span", "status"])
    llm_latency = Histogram("llm_latency_seconds", "LLM call latency", ["model"])
    llm_tokens = Histogram("llm_tokens_total", "Tokens per LLM call", ["model", "direction"])
    llm_cost = Counter("llm_cost_usd_total", "Cumulative LLM cost in USD", ["model"])

    def emit_prometheus_metrics(record: LLMCallRecord):
        status = "error" if record.error else "ok"
        llm_calls_total.labels(model=record.model, span=record.span_name, status=status).inc()
        llm_latency.labels(model=record.model).observe(record.latency_ms / 1000)
        llm_tokens.labels(model=record.model, direction="input").observe(record.prompt_tokens)
        llm_tokens.labels(model=record.model, direction="output").observe(record.completion_tokens)
        llm_cost.labels(model=record.model).inc(record.estimated_cost_usd)

except ImportError:
    pass  # Prometheus client not installed

Putting It All Together

Here’s a minimal but complete observability layer you can drop into any LLM application:

"""
llm_observability.py — Drop-in observability for LLM applications.
No external dependencies beyond the OpenAI SDK.
"""
import json
import time
import uuid
import logging
from datetime import datetime
from typing import Optional
from dataclasses import dataclass, field, asdict

logger = logging.getLogger("llm")
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
logger.setLevel(logging.INFO)


@dataclass
class CallMetrics:
    call_id: str
    trace_id: str
    model: str
    prompt_tokens: int = 0
    completion_tokens: int = 0
    latency_ms: float = 0.0
    cost_usd: float = 0.0
    finish_reason: str = ""
    error: Optional[str] = None
    timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat())


class LLMObserver:
    """Minimal observability. Wraps any OpenAI-compatible client."""

    def __init__(self):
        self.metrics: list[CallMetrics] = []
        self._daily_cost: float = 0.0

    def record(self, metrics: CallMetrics):
        self.metrics.append(metrics)
        self._daily_cost += metrics.cost_usd
        logger.info(json.dumps(asdict(metrics), default=str))

        if self._daily_cost > 100:
            logger.critical(f"DAILY COST ALERT: ${self._daily_cost:.2f}")

    def summary(self) -> dict:
        if not self.metrics:
            return {}
        latencies = [m.latency_ms for m in self.metrics]
        return {
            "calls": len(self.metrics),
            "total_tokens": sum(m.prompt_tokens + m.completion_tokens for m in self.metrics),
            "total_cost": round(sum(m.cost_usd for m in self.metrics), 4),
            "avg_latency_ms": round(sum(latencies) / len(latencies), 1),
            "errors": sum(1 for m in self.metrics if m.error),
        }


# Global instance
observer = LLMObserver()

Key Takeaways

  1. Log every LLM call with structured data. Model, tokens, latency, cost, finish reason, and a message hash. This is non-negotiable for production.

  2. Never log full user messages or model responses in general logs. They contain PII. Use hashes for correlation, store full content in an access-controlled system with retention policies.

  3. Use correlation IDs to trace multi-step chains. A single user request might trigger 5 LLM calls across retrieval, generation, and validation. Link them with a trace ID so you can debug end-to-end.

  4. Track four metrics religiously: daily cost, P95 latency, error rate, and quality score. If you only have bandwidth for one dashboard, make it cost.

  5. Set up cost alerting before you need it. A runaway loop or prompt regression can burn $1000 in hours. Alert on daily budget thresholds and hourly spikes.

  6. Quality drift is the silent killer. Your system won’t crash — it will just start giving worse answers. Run eval suites on a schedule, not just at deploy time. Track quality scores over time and alert on statistical deviations.

  7. Time-to-first-token matters more than total latency for user experience. Streaming lets users start reading while the model generates. Measure TTFT separately from total response time.

  8. Integrate with your existing monitoring stack. Don’t build a parallel monitoring system. Emit LLM metrics to Datadog, Prometheus, or whatever your team already uses. Add LLM-specific dashboards on top.

  9. Build debugging workflows before you need them. When a user reports a bad response, you need to trace back from the output to the inputs within minutes, not hours. Prepare the queries and dashboards in advance.

  10. Langfuse or LangSmith are worth using. For LLM-specific tracing, scoring, and prompt management, purpose-built tools save weeks of development time over building your own.