arrow_backBACK TO LLM ENGINEERING IN PRODUCTION
Lesson 10LLM Engineering in Production17 min read

Cost Optimization for LLM Applications

April 01, 2026

TL;DR

LLM costs are dominated by tokens. Cut them four ways: (1) Cache — semantic cache for repeated/similar queries saves 60-80%. (2) Route — send simple queries to cheap models, hard ones to expensive models. (3) Compress — shorter prompts, summarized history, fewer examples. (4) Batch — use batch APIs for non-real-time work at 50% discount. A well-optimized system costs 5-10x less than a naive one with the same output quality.

Your LLM prototype costs $2/day. Your production system serving 10,000 users costs $2,000/day. Your CFO asks why the AI bill tripled last month. You check the logs and discover that 40% of your tokens go to repeating the same 2,000-word system prompt on every request, 30% go to conversation history that nobody reads, and 20% go to queries that a model 10x cheaper could handle. This is the norm, not the exception. Most LLM applications waste 50-90% of their token budget.

This lesson covers every practical strategy for cutting costs without cutting quality. We’ll start by understanding where your tokens actually go, then systematically attack each waste category.

Understanding Your Cost Profile

Before optimizing anything, you need to know where the money goes. LLM costs are billed per token — both input and output. Here’s where tokens typically go in a production application:

Component % of Total Tokens Example (GPT-4o)
System prompt 15-30% 500-2,000 tokens repeated every request
Conversation history 20-40% Grows with each turn, often 3,000+ tokens
Retrieved context (RAG) 15-30% 3-5 chunks at 500 tokens each
User query 2-5% Usually short
Model output 10-25% Varies by task

The insight: input tokens dominate, and most of them are repetitive. The system prompt is identical across requests. Retrieved context is often similar. Conversation history accumulates redundancy. This is good news — repetitive things can be cached.

Token Counting in Production

You can’t optimize what you don’t measure. Every LLM request should be tracked with its token count and cost.

import tiktoken
from openai import OpenAI
from dataclasses import dataclass
from datetime import datetime
import json

# Pricing per 1M tokens (as of early 2026 — check current pricing)
MODEL_PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "gpt-4.1": {"input": 2.00, "output": 8.00},
    "gpt-4.1-mini": {"input": 0.40, "output": 1.60},
    "gpt-4.1-nano": {"input": 0.10, "output": 0.40},
    "claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
    "claude-haiku-3.5": {"input": 0.80, "output": 4.00},
}

@dataclass
class UsageRecord:
    timestamp: datetime
    model: str
    input_tokens: int
    output_tokens: int
    cached_tokens: int
    cost_usd: float
    user_id: str
    endpoint: str

def calculate_cost(model: str, input_tokens: int, output_tokens: int,
                   cached_tokens: int = 0) -> float:
    """Calculate the cost of a single LLM request."""
    pricing = MODEL_PRICING.get(model)
    if not pricing:
        raise ValueError(f"Unknown model: {model}")

    # Cached tokens are typically 50% cheaper (OpenAI) or 90% cheaper (Anthropic)
    regular_input = input_tokens - cached_tokens
    cached_discount = 0.5  # Adjust per provider

    input_cost = (regular_input * pricing["input"] / 1_000_000 +
                  cached_tokens * pricing["input"] * cached_discount / 1_000_000)
    output_cost = output_tokens * pricing["output"] / 1_000_000

    return round(input_cost + output_cost, 6)


def track_llm_usage(response, model: str, user_id: str, endpoint: str) -> UsageRecord:
    """Extract usage from an OpenAI response and log it."""
    usage = response.usage

    record = UsageRecord(
        timestamp=datetime.utcnow(),
        model=model,
        input_tokens=usage.prompt_tokens,
        output_tokens=usage.completion_tokens,
        cached_tokens=getattr(usage, "prompt_tokens_details", {}).get(
            "cached_tokens", 0
        ) if hasattr(usage, "prompt_tokens_details") else 0,
        cost_usd=calculate_cost(
            model, usage.prompt_tokens, usage.completion_tokens
        ),
        user_id=user_id,
        endpoint=endpoint,
    )

    # Log to your analytics pipeline
    print(json.dumps({
        "event": "llm_usage",
        "model": record.model,
        "input_tokens": record.input_tokens,
        "output_tokens": record.output_tokens,
        "cost_usd": record.cost_usd,
        "user_id": record.user_id,
        "endpoint": record.endpoint,
    }))

    return record

Use this on every request. After a week, you’ll know exactly which endpoints, users, and query types cost the most.

Prompt Caching: Built-in Provider Support

Both OpenAI and Anthropic offer server-side prompt caching. When you send the same prefix repeatedly, the provider caches it and charges less for subsequent requests.

How it works: If the first N tokens of your prompt match a cached prefix, those tokens are served from cache at a discounted rate. This happens automatically on OpenAI (for prompts over 1,024 tokens) and with explicit cache control on Anthropic.

OpenAI Automatic Prompt Caching

OpenAI caches automatically. No code changes needed — just make sure your prompts share a common prefix.

from openai import OpenAI

client = OpenAI()

# This system prompt is 1,500+ tokens. After the first request,
# subsequent requests with the same prefix get cached pricing.
SYSTEM_PROMPT = """You are a senior financial analyst assistant.
You help users analyze quarterly earnings reports, SEC filings,
and market data. You follow these rules:

1. Always cite specific numbers from the provided documents.
2. When comparing periods, use percentage changes.
3. Flag any inconsistencies between reported numbers.
4. Use professional financial terminology.
5. Format currency as $X.XM or $X.XB.
... (imagine 1,500 tokens of detailed instructions) ..."""

def analyze_earnings(user_query: str, context: str) -> str:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user", "content": f"Context:\n{context}\n\nQuestion: {user_query}"},
        ],
    )

    # Check if caching kicked in
    if hasattr(response.usage, "prompt_tokens_details"):
        cached = response.usage.prompt_tokens_details.cached_tokens
        total = response.usage.prompt_tokens
        print(f"Cache hit: {cached}/{total} tokens ({cached/total*100:.0f}%)")

    return response.choices[0].message.content

Key rules for OpenAI caching:

  • Minimum 1,024 tokens in the prefix
  • Cache lives for 5-10 minutes of inactivity
  • Same prefix must be identical — one character difference breaks the cache
  • Put the stable part (system prompt) first, variable part (user query) last

Anthropic Explicit Cache Control

Anthropic lets you mark exactly which parts of the prompt to cache using cache_control blocks.

import anthropic

client = anthropic.Anthropic()

# Mark the system prompt for caching
response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    system=[
        {
            "type": "text",
            "text": "You are a senior financial analyst assistant...(long prompt)...",
            "cache_control": {"type": "ephemeral"}  # Cache this block
        }
    ],
    messages=[
        {"role": "user", "content": "Analyze Q3 revenue trends."}
    ],
)

# Anthropic returns cache stats directly
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
# Cache read tokens cost 90% less than regular input tokens on Anthropic

Anthropic caching saves 90% on cached tokens (vs. 50% on OpenAI). If your system prompt is 2,000 tokens and you make 1,000 requests/hour, that’s 2M tokens/hour. At $3/1M tokens, that’s $6/hour. With 90% caching, it’s $0.60/hour. Over a month: $4,320 saved on system prompt alone.

Semantic Caching: Cache Similar Queries

Prompt caching handles identical prefixes. But what about similar user queries? “What’s your return policy?” and “How do I return an item?” should produce the same answer.

Semantic caching uses embeddings to find similar past queries and returns the cached response instead of calling the LLM.

import hashlib
import json
import time
import numpy as np
import redis
from openai import OpenAI

client = OpenAI()
cache = redis.Redis(host="localhost", port=6379, db=0)

SIMILARITY_THRESHOLD = 0.95  # Cosine similarity — tune this carefully
CACHE_TTL = 3600  # 1 hour

def get_embedding(text: str) -> list[float]:
    """Get embedding for a text string."""
    response = client.embeddings.create(
        model="text-embedding-3-small",
        input=text,
    )
    return response.data[0].embedding

def cosine_similarity(a: list[float], b: list[float]) -> float:
    """Calculate cosine similarity between two vectors."""
    a_np, b_np = np.array(a), np.array(b)
    return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))

class SemanticCache:
    def __init__(self, redis_client: redis.Redis, threshold: float = 0.95):
        self.redis = redis_client
        self.threshold = threshold

    def _cache_key(self, prefix: str, query_hash: str) -> str:
        return f"sem_cache:{prefix}:{query_hash}"

    def get(self, query: str, namespace: str = "default") -> str | None:
        """Look up a semantically similar cached response."""
        query_embedding = get_embedding(query)

        # Get all cached embeddings in this namespace
        pattern = f"sem_cache:{namespace}:*"
        keys = self.redis.keys(pattern)

        best_match = None
        best_similarity = 0.0

        for key in keys:
            cached = json.loads(self.redis.get(key))
            similarity = cosine_similarity(query_embedding, cached["embedding"])

            if similarity > best_similarity:
                best_similarity = similarity
                best_match = cached

        if best_match and best_similarity >= self.threshold:
            return best_match["response"]

        return None

    def set(self, query: str, response: str, namespace: str = "default"):
        """Cache a query-response pair with its embedding."""
        embedding = get_embedding(query)
        query_hash = hashlib.md5(query.encode()).hexdigest()

        data = {
            "query": query,
            "response": response,
            "embedding": embedding,
            "timestamp": time.time(),
        }

        key = self._cache_key(namespace, query_hash)
        self.redis.setex(key, CACHE_TTL, json.dumps(data))


# Usage
sem_cache = SemanticCache(cache, threshold=0.95)

def ask_with_cache(query: str) -> str:
    # Check semantic cache first
    cached = sem_cache.get(query, namespace="support")
    if cached:
        print("Semantic cache hit!")
        return cached

    # Cache miss — call the LLM
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "You are a helpful support agent."},
            {"role": "user", "content": query},
        ],
        temperature=0,  # Deterministic — safe to cache
    )

    result = response.choices[0].message.content
    sem_cache.set(query, result, namespace="support")
    return result

Important: Only cache responses where temperature=0. Non-deterministic responses should not be cached — users expect variety.

Production warning: The naive approach above scans all keys linearly. For production, use a vector database (pgvector, Pinecone, Qdrant) for the embedding lookup. The pattern is the same — you’re just replacing the Redis scan with a proper vector similarity search.

Response Caching with Redis

For deterministic queries where the input is exactly the same (not just similar), a simple key-value cache works.

import hashlib
import json
import redis
from openai import OpenAI

client = OpenAI()
cache = redis.Redis(host="localhost", port=6379, db=0)

def cached_llm_call(
    messages: list[dict],
    model: str = "gpt-4o",
    temperature: float = 0,
    cache_ttl: int = 3600,
) -> str:
    """LLM call with exact-match response caching."""
    if temperature > 0:
        # Non-deterministic — skip cache
        response = client.chat.completions.create(
            model=model, messages=messages, temperature=temperature
        )
        return response.choices[0].message.content

    # Build a cache key from the full request
    cache_input = json.dumps({
        "model": model,
        "messages": messages,
        "temperature": temperature,
    }, sort_keys=True)
    cache_key = f"llm:exact:{hashlib.sha256(cache_input.encode()).hexdigest()}"

    # Check cache
    cached = cache.get(cache_key)
    if cached:
        return cached.decode("utf-8")

    # Cache miss
    response = client.chat.completions.create(
        model=model, messages=messages, temperature=temperature
    )
    result = response.choices[0].message.content

    cache.setex(cache_key, cache_ttl, result)
    return result

This is dead simple and saves the most money for applications with repeated identical queries — FAQ bots, classification endpoints, extraction pipelines.

Model Routing: Cheap Models for Easy Queries

Not every query needs GPT-4o or Claude Sonnet. A question like “What are your business hours?” can be answered by a model that costs 10-25x less. Model routing sends each query to the cheapest model that can handle it well.

from openai import OpenAI
from enum import Enum

client = OpenAI()

class ModelTier(Enum):
    CHEAP = "gpt-4.1-nano"      # $0.10/1M input
    MEDIUM = "gpt-4.1-mini"     # $0.40/1M input
    EXPENSIVE = "gpt-4.1"       # $2.00/1M input

# Cost ratios: CHEAP is 20x cheaper than EXPENSIVE

def classify_query_complexity(query: str) -> ModelTier:
    """Use a cheap model to classify query complexity."""
    response = client.chat.completions.create(
        model="gpt-4.1-nano",  # Classification itself uses the cheapest model
        messages=[
            {
                "role": "system",
                "content": """Classify the user query complexity.
Reply with ONLY one word: SIMPLE, MEDIUM, or COMPLEX.

SIMPLE: Factual lookups, yes/no questions, greetings, simple formatting.
MEDIUM: Summaries, comparisons, explanations of concepts, multi-step but straightforward.
COMPLEX: Analysis, creative writing, code generation, multi-document reasoning, nuanced judgment.""",
            },
            {"role": "user", "content": query},
        ],
        max_tokens=10,
        temperature=0,
    )

    classification = response.choices[0].message.content.strip().upper()

    mapping = {
        "SIMPLE": ModelTier.CHEAP,
        "MEDIUM": ModelTier.MEDIUM,
        "COMPLEX": ModelTier.EXPENSIVE,
    }
    return mapping.get(classification, ModelTier.MEDIUM)


def route_and_respond(query: str, system_prompt: str) -> tuple[str, str]:
    """Route query to appropriate model tier and get response."""
    tier = classify_query_complexity(query)
    model = tier.value

    response = client.chat.completions.create(
        model=model,
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": query},
        ],
    )

    return response.choices[0].message.content, model


# Example usage
answer, model_used = route_and_respond(
    "What are your business hours?",
    "You are a helpful customer support agent for Acme Corp."
)
print(f"Answered by {model_used}: {answer}")

Smarter routing with confidence: If the cheap model’s response has low confidence, escalate to a more expensive model.

def route_with_fallback(query: str, system_prompt: str) -> str:
    """Try cheap model first, escalate if confidence is low."""
    # First attempt with cheap model
    response = client.chat.completions.create(
        model=ModelTier.CHEAP.value,
        messages=[
            {"role": "system", "content": system_prompt + "\nEnd your response with CONFIDENCE: HIGH or CONFIDENCE: LOW."},
            {"role": "user", "content": query},
        ],
    )

    result = response.choices[0].message.content

    if "CONFIDENCE: LOW" in result:
        # Escalate to expensive model
        response = client.chat.completions.create(
            model=ModelTier.EXPENSIVE.value,
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": query},
            ],
        )
        return response.choices[0].message.content

    # Strip the confidence tag and return
    return result.replace("CONFIDENCE: HIGH", "").strip()

Real-world routing distribution: In a typical support chatbot, 50-60% of queries are SIMPLE, 25-30% are MEDIUM, and 10-20% are COMPLEX. If you route correctly, your blended cost is 60-70% lower than using the expensive model for everything.

Prompt Compression

Shorter prompts = fewer tokens = less money. Here are three practical compression techniques.

1. Shorter System Prompts

Most system prompts are bloated. They repeat instructions, use verbose language, and include examples that aren’t needed for every query.

# BEFORE: 850 tokens
VERBOSE_PROMPT = """
You are a helpful customer support assistant for TechCorp, a leading
technology solutions provider. Your role is to assist customers with
their inquiries about our products and services. When responding to
customers, you should always be polite, professional, and thorough.

When a customer asks about pricing, you should provide accurate pricing
information based on the current pricing tiers. If you do not know the
exact pricing, you should direct the customer to the pricing page on
our website at https://techcorp.com/pricing.

When a customer asks about technical issues, you should first try to
troubleshoot the issue by asking clarifying questions. If you cannot
resolve the issue, you should create a support ticket and provide the
customer with the ticket number.

Here are some examples of how to respond:
Customer: "What's the price of the Pro plan?"
You: "The Pro plan is $49/month when billed annually..."
(... 15 more examples ...)
"""

# AFTER: 180 tokens — same behavior
COMPRESSED_PROMPT = """TechCorp support agent. Be concise and accurate.

Rules:
- Pricing questions: give exact tier pricing. Unknown? Link to techcorp.com/pricing
- Technical issues: troubleshoot first, then create ticket with number
- Tone: professional, not verbose

Response format: direct answer, then next steps if any."""

The compressed version produces the same quality responses. Test both with your eval suite to verify.

2. Conversation History Summarization

Conversation history grows linearly and becomes the biggest token sink in multi-turn applications. Summarize older turns instead of keeping them verbatim.

from openai import OpenAI

client = OpenAI()

def summarize_history(
    messages: list[dict], keep_recent: int = 4
) -> list[dict]:
    """Summarize old messages, keep recent ones verbatim."""
    if len(messages) <= keep_recent:
        return messages

    old_messages = messages[:-keep_recent]
    recent_messages = messages[-keep_recent:]

    # Summarize old messages with a cheap model
    history_text = "\n".join(
        f"{m['role']}: {m['content']}" for m in old_messages
    )

    summary_response = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {
                "role": "system",
                "content": "Summarize this conversation in 2-3 sentences. Keep key facts, decisions, and unanswered questions.",
            },
            {"role": "user", "content": history_text},
        ],
        max_tokens=150,
    )

    summary = summary_response.choices[0].message.content

    return [
        {"role": "system", "content": f"Previous conversation summary: {summary}"},
        *recent_messages,
    ]


# Example: 20-turn conversation compressed to summary + last 4 turns
# Before: ~4,000 tokens of history
# After: ~600 tokens (150 summary + 4 recent messages)

3. Selective Few-Shot Examples

Instead of including 10 examples in every prompt, select the most relevant 1-2 examples based on the query.

from openai import OpenAI
import numpy as np

client = OpenAI()

# Pre-computed examples with embeddings
FEW_SHOT_EXAMPLES = [
    {
        "query": "I want to cancel my subscription",
        "response": "I can help with cancellation. Your current plan...",
        "embedding": [0.1, 0.2, ...],  # Pre-computed
        "category": "cancellation",
    },
    {
        "query": "The app crashes when I upload files",
        "response": "Let's troubleshoot the upload issue. First...",
        "embedding": [0.3, 0.4, ...],
        "category": "technical",
    },
    # ... 50 more examples
]

def select_examples(query: str, examples: list[dict], top_k: int = 2) -> list[dict]:
    """Select the most relevant few-shot examples for the query."""
    query_emb = get_embedding(query)  # Reuse from earlier

    scored = []
    for ex in examples:
        sim = cosine_similarity(query_emb, ex["embedding"])
        scored.append((sim, ex))

    scored.sort(key=lambda x: x[0], reverse=True)
    return [ex for _, ex in scored[:top_k]]


def build_prompt_with_selected_examples(query: str, system_base: str) -> list[dict]:
    """Build prompt with only the most relevant examples."""
    examples = select_examples(query, FEW_SHOT_EXAMPLES, top_k=2)

    examples_text = "\n\n".join(
        f"User: {ex['query']}\nAssistant: {ex['response']}"
        for ex in examples
    )

    return [
        {
            "role": "system",
            "content": f"{system_base}\n\nExamples:\n{examples_text}",
        },
        {"role": "user", "content": query},
    ]

Batch APIs: 50% Off for Async Work

OpenAI’s Batch API processes requests asynchronously at a 50% discount. Perfect for workloads that don’t need real-time responses: nightly report generation, bulk classification, content moderation queues, embedding large document sets.

import json
import time
from openai import OpenAI

client = OpenAI()

def create_batch_file(requests: list[dict], filename: str = "/tmp/batch_input.jsonl"):
    """Create a JSONL file for batch processing."""
    with open(filename, "w") as f:
        for i, req in enumerate(requests):
            batch_request = {
                "custom_id": f"request-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": req.get("model", "gpt-4o-mini"),
                    "messages": req["messages"],
                    "max_tokens": req.get("max_tokens", 500),
                    "temperature": 0,
                },
            }
            f.write(json.dumps(batch_request) + "\n")

    return filename


def submit_batch(filename: str) -> str:
    """Upload file and submit batch job."""
    # Upload the JSONL file
    with open(filename, "rb") as f:
        file_obj = client.files.create(file=f, purpose="batch")

    # Create the batch
    batch = client.batches.create(
        input_file_id=file_obj.id,
        endpoint="/v1/chat/completions",
        completion_window="24h",
    )

    print(f"Batch submitted: {batch.id}")
    return batch.id


def poll_batch(batch_id: str, interval: int = 30) -> list[dict]:
    """Poll until batch completes and return results."""
    while True:
        batch = client.batches.retrieve(batch_id)
        print(f"Status: {batch.status} | Completed: {batch.request_counts.completed}/{batch.request_counts.total}")

        if batch.status == "completed":
            # Download results
            result_file = client.files.content(batch.output_file_id)
            results = []
            for line in result_file.text.strip().split("\n"):
                results.append(json.loads(line))
            return results

        if batch.status in ("failed", "expired", "cancelled"):
            raise RuntimeError(f"Batch {batch.status}: {batch.errors}")

        time.sleep(interval)


# Example: Classify 10,000 support tickets at 50% off
tickets = [
    {"messages": [
        {"role": "system", "content": "Classify this ticket as: billing, technical, feature_request, or other. Reply with one word."},
        {"role": "user", "content": ticket_text},
    ]}
    for ticket_text in load_tickets()  # Your function to load tickets
]

batch_file = create_batch_file(tickets)
batch_id = submit_batch(batch_file)
results = poll_batch(batch_id)

# Cost: 10,000 tickets * ~200 tokens each * $0.15/1M (mini) * 0.5 (batch discount)
# = $0.15 instead of $0.30. At GPT-4o scale: $2.50 instead of $5.00

Context Window Management

As conversations grow, you need strategies to keep the context window under control without losing important information.

import tiktoken

def manage_context_window(
    messages: list[dict],
    system_prompt: str,
    max_context_tokens: int = 8000,
    model: str = "gpt-4o",
) -> list[dict]:
    """Keep conversation within token budget using sliding window + summarization."""
    enc = tiktoken.encoding_for_model(model)

    system_tokens = len(enc.encode(system_prompt))
    available = max_context_tokens - system_tokens - 500  # Reserve 500 for response

    # Start from most recent, add messages until budget exhausted
    selected = []
    token_count = 0

    for msg in reversed(messages):
        msg_tokens = len(enc.encode(msg["content"]))
        if token_count + msg_tokens > available:
            break
        selected.insert(0, msg)
        token_count += msg_tokens

    # If we dropped messages, add a summary of what was dropped
    if len(selected) < len(messages):
        dropped = messages[:len(messages) - len(selected)]
        summary = summarize_history(dropped, keep_recent=0)

        if summary:
            selected.insert(0, {
                "role": "system",
                "content": f"Earlier conversation summary: {summary[0]['content']}"
            })

    return [{"role": "system", "content": system_prompt}] + selected

Output Length Control

Output tokens are 2-5x more expensive than input tokens. Controlling output length is a direct cost lever.

# Strategy 1: Aggressive max_tokens
response = client.chat.completions.create(
    model="gpt-4o",
    messages=messages,
    max_tokens=150,  # Force concise responses
)

# Strategy 2: Stop sequences for structured output
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": "Answer in one paragraph. End with [DONE]."},
        {"role": "user", "content": query},
    ],
    stop=["[DONE]"],  # Stop generating as soon as [DONE] is produced
)

# Strategy 3: Instruct conciseness in the system prompt
CONCISE_PROMPT = """Answer the user's question in 2-3 sentences maximum.
If a longer answer is needed, provide the key point first, then ask
if the user wants more detail. Never repeat the question back."""

Cost impact: If your average output is 500 tokens but could be 150, you cut output costs by 70%. For GPT-4o at $10/1M output tokens, that’s significant at scale.

Smaller Models as First Pass

Use tiny models as classifiers, filters, and pre-processors before sending anything to expensive models.

def needs_llm_response(query: str) -> bool:
    """Use a tiny model to check if the query even needs an LLM response."""
    response = client.chat.completions.create(
        model="gpt-4.1-nano",
        messages=[
            {
                "role": "system",
                "content": """Classify if this query needs an LLM response or can be handled
by a simple lookup/redirect. Reply YES or NO.

NO examples: "hi", "thanks", "ok", greetings, single word messages.
YES examples: questions, requests, complaints, anything needing a real answer.""",
            },
            {"role": "user", "content": query},
        ],
        max_tokens=5,
        temperature=0,
    )

    return "YES" in response.choices[0].message.content.upper()


def prefilter_rag_results(query: str, chunks: list[str]) -> list[str]:
    """Use a cheap model to filter irrelevant RAG chunks before the expensive call."""
    relevant = []

    for chunk in chunks:
        response = client.chat.completions.create(
            model="gpt-4.1-nano",
            messages=[
                {
                    "role": "system",
                    "content": "Is this document chunk relevant to the query? Reply YES or NO only.",
                },
                {
                    "role": "user",
                    "content": f"Query: {query}\n\nChunk: {chunk[:500]}",
                },
            ],
            max_tokens=5,
            temperature=0,
        )

        if "YES" in response.choices[0].message.content.upper():
            relevant.append(chunk)

    return relevant

Real Cost Analysis: Before vs. After

Here are three real scenarios with actual math.

Scenario 1: Customer Support Chatbot

Metric Before After Savings
Model GPT-4o for everything Routed (60% nano, 30% mini, 10% 4o) -
Requests/day 10,000 10,000 -
Avg input tokens 3,500 (2K system + 1K history + 500 query) 1,200 (compressed prompt + summarized history) 66% fewer
Avg output tokens 400 150 (concise mode) 62% fewer
Semantic cache hit rate 0% 35% 35% requests free
Daily input cost $87.50 $4.80 94%
Daily output cost $40.00 $5.20 87%
Daily total $127.50 $10.00 92%

Scenario 2: RAG Document Q&A

Metric Before After Savings
Model GPT-4o GPT-4o with prompt caching -
Requests/day 5,000 5,000 -
System prompt tokens 2,000 x 5,000 = 10M 2,000 x 5,000 = 10M (90% cached) 90% cheaper
RAG context tokens 2,500 x 5,000 = 12.5M 1,500 x 5,000 = 7.5M (pre-filtered) 40% fewer
Response cache hit 0% 20% 20% free
Daily total $86.25 $22.50 74%

Scenario 3: Batch Content Processing

Metric Before After Savings
Task Categorize 100K articles/day Same -
Model GPT-4o real-time GPT-4o-mini batch API -
Per-article tokens 800 in + 50 out 400 in + 20 out (compressed) 50% fewer
API pricing Standard 50% batch discount 50% off
Daily total $250.00 $6.00 97.6%

Monitoring Costs: Dashboards and Alerts

Build cost monitoring into your application from day one.

import time
from collections import defaultdict
from dataclasses import dataclass, field

@dataclass
class CostTracker:
    """Track LLM costs with alerts and budget limits."""
    daily_budget: float = 100.0  # USD
    alert_threshold: float = 0.8  # Alert at 80% of budget

    _daily_costs: dict = field(default_factory=lambda: defaultdict(float))
    _request_counts: dict = field(default_factory=lambda: defaultdict(int))

    def record(self, cost: float, model: str, endpoint: str):
        today = time.strftime("%Y-%m-%d")
        self._daily_costs[today] += cost
        self._request_counts[f"{today}:{model}:{endpoint}"] += 1

        # Check budget
        if self._daily_costs[today] > self.daily_budget * self.alert_threshold:
            self._send_alert(
                f"LLM cost alert: ${self._daily_costs[today]:.2f} "
                f"({self._daily_costs[today]/self.daily_budget*100:.0f}% of daily budget)"
            )

        if self._daily_costs[today] > self.daily_budget:
            self._send_alert(f"BUDGET EXCEEDED: ${self._daily_costs[today]:.2f}")
            # Optionally: switch to cheaper models or enable aggressive caching
            return False  # Signal to caller that budget is exceeded

        return True

    def get_daily_report(self) -> dict:
        today = time.strftime("%Y-%m-%d")
        return {
            "date": today,
            "total_cost": self._daily_costs[today],
            "budget_remaining": self.daily_budget - self._daily_costs[today],
            "budget_utilization": f"{self._daily_costs[today]/self.daily_budget*100:.1f}%",
        }

    def _send_alert(self, message: str):
        # Integrate with Slack, PagerDuty, email, etc.
        print(f"ALERT: {message}")


# Global cost tracker
cost_tracker = CostTracker(daily_budget=200.0)

# Use it after every LLM call
def tracked_llm_call(messages, model="gpt-4o", **kwargs):
    response = client.chat.completions.create(
        model=model, messages=messages, **kwargs
    )
    cost = calculate_cost(
        model,
        response.usage.prompt_tokens,
        response.usage.completion_tokens,
    )
    within_budget = cost_tracker.record(cost, model, "chat")

    if not within_budget:
        # Fallback: switch to cheaper model for remaining requests
        print("Budget exceeded — switching to cheaper model")

    return response

The Optimization Playbook

Apply these optimizations in order of effort vs. impact:

Priority Strategy Effort Cost Reduction When to Use
1 Response caching (exact match) Low 20-40% Any app with repeated queries
2 Prompt caching (provider-side) None 15-30% System prompts > 1K tokens
3 Shorter system prompts Low 10-20% Always — most prompts are bloated
4 Output length control Low 10-25% Always
5 Model routing Medium 40-70% Apps with varied query complexity
6 Batch API Medium 50% Non-real-time workloads
7 Semantic caching Medium 20-40% High-volume, repetitive query patterns
8 History summarization Medium 20-40% Multi-turn conversations
9 RAG chunk filtering Medium 15-30% RAG applications
10 Smaller models as filters Medium 10-30% Pipelines with pre-processing steps

Start with items 1-4. They require minimal code changes and deliver the biggest bang for the effort. Then move to routing and batching. Semantic caching and the rest are for when you’re already optimized and need to squeeze out more.

Key Takeaways

  1. Measure first. Track per-request token counts and costs before optimizing anything. You’ll be surprised where the tokens actually go.

  2. Provider prompt caching is free money. Structure your prompts so the stable prefix is long and identical across requests. OpenAI gives 50% off cached tokens, Anthropic gives 90% off.

  3. Semantic caching saves 20-40% for applications with repetitive query patterns. Use it only with temperature=0.

  4. Model routing is the biggest single lever. Most queries don’t need your most expensive model. A simple classifier routing 60% of traffic to a tiny model cuts costs by 60-70%.

  5. Compress everything. Shorter system prompts, summarized history, fewer examples, filtered RAG chunks, concise output instructions. Every token costs money.

  6. Batch APIs give 50% off for any workload that doesn’t need real-time responses. Nightly jobs, content processing, bulk classification — batch all of it.

  7. Set budget alerts from day one. A single runaway prompt loop can burn through your monthly budget in hours. Monitor and alert before it becomes a CFO conversation.

  8. Combine strategies. The real savings come from stacking: prompt caching + routing + compression + caching together can reduce costs by 90%+ compared to a naive implementation.