Your LLM prototype costs $2/day. Your production system serving 10,000 users costs $2,000/day. Your CFO asks why the AI bill tripled last month. You check the logs and discover that 40% of your tokens go to repeating the same 2,000-word system prompt on every request, 30% go to conversation history that nobody reads, and 20% go to queries that a model 10x cheaper could handle. This is the norm, not the exception. Most LLM applications waste 50-90% of their token budget.
This lesson covers every practical strategy for cutting costs without cutting quality. We’ll start by understanding where your tokens actually go, then systematically attack each waste category.
Understanding Your Cost Profile
Before optimizing anything, you need to know where the money goes. LLM costs are billed per token — both input and output. Here’s where tokens typically go in a production application:
| Component | % of Total Tokens | Example (GPT-4o) |
|---|---|---|
| System prompt | 15-30% | 500-2,000 tokens repeated every request |
| Conversation history | 20-40% | Grows with each turn, often 3,000+ tokens |
| Retrieved context (RAG) | 15-30% | 3-5 chunks at 500 tokens each |
| User query | 2-5% | Usually short |
| Model output | 10-25% | Varies by task |
The insight: input tokens dominate, and most of them are repetitive. The system prompt is identical across requests. Retrieved context is often similar. Conversation history accumulates redundancy. This is good news — repetitive things can be cached.
Token Counting in Production
You can’t optimize what you don’t measure. Every LLM request should be tracked with its token count and cost.
import tiktoken
from openai import OpenAI
from dataclasses import dataclass
from datetime import datetime
import json
# Pricing per 1M tokens (as of early 2026 — check current pricing)
MODEL_PRICING = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4.1": {"input": 2.00, "output": 8.00},
"gpt-4.1-mini": {"input": 0.40, "output": 1.60},
"gpt-4.1-nano": {"input": 0.10, "output": 0.40},
"claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
"claude-haiku-3.5": {"input": 0.80, "output": 4.00},
}
@dataclass
class UsageRecord:
timestamp: datetime
model: str
input_tokens: int
output_tokens: int
cached_tokens: int
cost_usd: float
user_id: str
endpoint: str
def calculate_cost(model: str, input_tokens: int, output_tokens: int,
cached_tokens: int = 0) -> float:
"""Calculate the cost of a single LLM request."""
pricing = MODEL_PRICING.get(model)
if not pricing:
raise ValueError(f"Unknown model: {model}")
# Cached tokens are typically 50% cheaper (OpenAI) or 90% cheaper (Anthropic)
regular_input = input_tokens - cached_tokens
cached_discount = 0.5 # Adjust per provider
input_cost = (regular_input * pricing["input"] / 1_000_000 +
cached_tokens * pricing["input"] * cached_discount / 1_000_000)
output_cost = output_tokens * pricing["output"] / 1_000_000
return round(input_cost + output_cost, 6)
def track_llm_usage(response, model: str, user_id: str, endpoint: str) -> UsageRecord:
"""Extract usage from an OpenAI response and log it."""
usage = response.usage
record = UsageRecord(
timestamp=datetime.utcnow(),
model=model,
input_tokens=usage.prompt_tokens,
output_tokens=usage.completion_tokens,
cached_tokens=getattr(usage, "prompt_tokens_details", {}).get(
"cached_tokens", 0
) if hasattr(usage, "prompt_tokens_details") else 0,
cost_usd=calculate_cost(
model, usage.prompt_tokens, usage.completion_tokens
),
user_id=user_id,
endpoint=endpoint,
)
# Log to your analytics pipeline
print(json.dumps({
"event": "llm_usage",
"model": record.model,
"input_tokens": record.input_tokens,
"output_tokens": record.output_tokens,
"cost_usd": record.cost_usd,
"user_id": record.user_id,
"endpoint": record.endpoint,
}))
return recordUse this on every request. After a week, you’ll know exactly which endpoints, users, and query types cost the most.
Prompt Caching: Built-in Provider Support
Both OpenAI and Anthropic offer server-side prompt caching. When you send the same prefix repeatedly, the provider caches it and charges less for subsequent requests.
How it works: If the first N tokens of your prompt match a cached prefix, those tokens are served from cache at a discounted rate. This happens automatically on OpenAI (for prompts over 1,024 tokens) and with explicit cache control on Anthropic.
OpenAI Automatic Prompt Caching
OpenAI caches automatically. No code changes needed — just make sure your prompts share a common prefix.
from openai import OpenAI
client = OpenAI()
# This system prompt is 1,500+ tokens. After the first request,
# subsequent requests with the same prefix get cached pricing.
SYSTEM_PROMPT = """You are a senior financial analyst assistant.
You help users analyze quarterly earnings reports, SEC filings,
and market data. You follow these rules:
1. Always cite specific numbers from the provided documents.
2. When comparing periods, use percentage changes.
3. Flag any inconsistencies between reported numbers.
4. Use professional financial terminology.
5. Format currency as $X.XM or $X.XB.
... (imagine 1,500 tokens of detailed instructions) ..."""
def analyze_earnings(user_query: str, context: str) -> str:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": f"Context:\n{context}\n\nQuestion: {user_query}"},
],
)
# Check if caching kicked in
if hasattr(response.usage, "prompt_tokens_details"):
cached = response.usage.prompt_tokens_details.cached_tokens
total = response.usage.prompt_tokens
print(f"Cache hit: {cached}/{total} tokens ({cached/total*100:.0f}%)")
return response.choices[0].message.contentKey rules for OpenAI caching:
- Minimum 1,024 tokens in the prefix
- Cache lives for 5-10 minutes of inactivity
- Same prefix must be identical — one character difference breaks the cache
- Put the stable part (system prompt) first, variable part (user query) last
Anthropic Explicit Cache Control
Anthropic lets you mark exactly which parts of the prompt to cache using cache_control blocks.
import anthropic
client = anthropic.Anthropic()
# Mark the system prompt for caching
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system=[
{
"type": "text",
"text": "You are a senior financial analyst assistant...(long prompt)...",
"cache_control": {"type": "ephemeral"} # Cache this block
}
],
messages=[
{"role": "user", "content": "Analyze Q3 revenue trends."}
],
)
# Anthropic returns cache stats directly
print(f"Cache read tokens: {response.usage.cache_read_input_tokens}")
print(f"Cache creation tokens: {response.usage.cache_creation_input_tokens}")
# Cache read tokens cost 90% less than regular input tokens on AnthropicAnthropic caching saves 90% on cached tokens (vs. 50% on OpenAI). If your system prompt is 2,000 tokens and you make 1,000 requests/hour, that’s 2M tokens/hour. At $3/1M tokens, that’s $6/hour. With 90% caching, it’s $0.60/hour. Over a month: $4,320 saved on system prompt alone.
Semantic Caching: Cache Similar Queries
Prompt caching handles identical prefixes. But what about similar user queries? “What’s your return policy?” and “How do I return an item?” should produce the same answer.
Semantic caching uses embeddings to find similar past queries and returns the cached response instead of calling the LLM.
import hashlib
import json
import time
import numpy as np
import redis
from openai import OpenAI
client = OpenAI()
cache = redis.Redis(host="localhost", port=6379, db=0)
SIMILARITY_THRESHOLD = 0.95 # Cosine similarity — tune this carefully
CACHE_TTL = 3600 # 1 hour
def get_embedding(text: str) -> list[float]:
"""Get embedding for a text string."""
response = client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
def cosine_similarity(a: list[float], b: list[float]) -> float:
"""Calculate cosine similarity between two vectors."""
a_np, b_np = np.array(a), np.array(b)
return float(np.dot(a_np, b_np) / (np.linalg.norm(a_np) * np.linalg.norm(b_np)))
class SemanticCache:
def __init__(self, redis_client: redis.Redis, threshold: float = 0.95):
self.redis = redis_client
self.threshold = threshold
def _cache_key(self, prefix: str, query_hash: str) -> str:
return f"sem_cache:{prefix}:{query_hash}"
def get(self, query: str, namespace: str = "default") -> str | None:
"""Look up a semantically similar cached response."""
query_embedding = get_embedding(query)
# Get all cached embeddings in this namespace
pattern = f"sem_cache:{namespace}:*"
keys = self.redis.keys(pattern)
best_match = None
best_similarity = 0.0
for key in keys:
cached = json.loads(self.redis.get(key))
similarity = cosine_similarity(query_embedding, cached["embedding"])
if similarity > best_similarity:
best_similarity = similarity
best_match = cached
if best_match and best_similarity >= self.threshold:
return best_match["response"]
return None
def set(self, query: str, response: str, namespace: str = "default"):
"""Cache a query-response pair with its embedding."""
embedding = get_embedding(query)
query_hash = hashlib.md5(query.encode()).hexdigest()
data = {
"query": query,
"response": response,
"embedding": embedding,
"timestamp": time.time(),
}
key = self._cache_key(namespace, query_hash)
self.redis.setex(key, CACHE_TTL, json.dumps(data))
# Usage
sem_cache = SemanticCache(cache, threshold=0.95)
def ask_with_cache(query: str) -> str:
# Check semantic cache first
cached = sem_cache.get(query, namespace="support")
if cached:
print("Semantic cache hit!")
return cached
# Cache miss — call the LLM
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful support agent."},
{"role": "user", "content": query},
],
temperature=0, # Deterministic — safe to cache
)
result = response.choices[0].message.content
sem_cache.set(query, result, namespace="support")
return resultImportant: Only cache responses where temperature=0. Non-deterministic responses should not be cached — users expect variety.
Production warning: The naive approach above scans all keys linearly. For production, use a vector database (pgvector, Pinecone, Qdrant) for the embedding lookup. The pattern is the same — you’re just replacing the Redis scan with a proper vector similarity search.
Response Caching with Redis
For deterministic queries where the input is exactly the same (not just similar), a simple key-value cache works.
import hashlib
import json
import redis
from openai import OpenAI
client = OpenAI()
cache = redis.Redis(host="localhost", port=6379, db=0)
def cached_llm_call(
messages: list[dict],
model: str = "gpt-4o",
temperature: float = 0,
cache_ttl: int = 3600,
) -> str:
"""LLM call with exact-match response caching."""
if temperature > 0:
# Non-deterministic — skip cache
response = client.chat.completions.create(
model=model, messages=messages, temperature=temperature
)
return response.choices[0].message.content
# Build a cache key from the full request
cache_input = json.dumps({
"model": model,
"messages": messages,
"temperature": temperature,
}, sort_keys=True)
cache_key = f"llm:exact:{hashlib.sha256(cache_input.encode()).hexdigest()}"
# Check cache
cached = cache.get(cache_key)
if cached:
return cached.decode("utf-8")
# Cache miss
response = client.chat.completions.create(
model=model, messages=messages, temperature=temperature
)
result = response.choices[0].message.content
cache.setex(cache_key, cache_ttl, result)
return resultThis is dead simple and saves the most money for applications with repeated identical queries — FAQ bots, classification endpoints, extraction pipelines.
Model Routing: Cheap Models for Easy Queries
Not every query needs GPT-4o or Claude Sonnet. A question like “What are your business hours?” can be answered by a model that costs 10-25x less. Model routing sends each query to the cheapest model that can handle it well.
from openai import OpenAI
from enum import Enum
client = OpenAI()
class ModelTier(Enum):
CHEAP = "gpt-4.1-nano" # $0.10/1M input
MEDIUM = "gpt-4.1-mini" # $0.40/1M input
EXPENSIVE = "gpt-4.1" # $2.00/1M input
# Cost ratios: CHEAP is 20x cheaper than EXPENSIVE
def classify_query_complexity(query: str) -> ModelTier:
"""Use a cheap model to classify query complexity."""
response = client.chat.completions.create(
model="gpt-4.1-nano", # Classification itself uses the cheapest model
messages=[
{
"role": "system",
"content": """Classify the user query complexity.
Reply with ONLY one word: SIMPLE, MEDIUM, or COMPLEX.
SIMPLE: Factual lookups, yes/no questions, greetings, simple formatting.
MEDIUM: Summaries, comparisons, explanations of concepts, multi-step but straightforward.
COMPLEX: Analysis, creative writing, code generation, multi-document reasoning, nuanced judgment.""",
},
{"role": "user", "content": query},
],
max_tokens=10,
temperature=0,
)
classification = response.choices[0].message.content.strip().upper()
mapping = {
"SIMPLE": ModelTier.CHEAP,
"MEDIUM": ModelTier.MEDIUM,
"COMPLEX": ModelTier.EXPENSIVE,
}
return mapping.get(classification, ModelTier.MEDIUM)
def route_and_respond(query: str, system_prompt: str) -> tuple[str, str]:
"""Route query to appropriate model tier and get response."""
tier = classify_query_complexity(query)
model = tier.value
response = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
],
)
return response.choices[0].message.content, model
# Example usage
answer, model_used = route_and_respond(
"What are your business hours?",
"You are a helpful customer support agent for Acme Corp."
)
print(f"Answered by {model_used}: {answer}")Smarter routing with confidence: If the cheap model’s response has low confidence, escalate to a more expensive model.
def route_with_fallback(query: str, system_prompt: str) -> str:
"""Try cheap model first, escalate if confidence is low."""
# First attempt with cheap model
response = client.chat.completions.create(
model=ModelTier.CHEAP.value,
messages=[
{"role": "system", "content": system_prompt + "\nEnd your response with CONFIDENCE: HIGH or CONFIDENCE: LOW."},
{"role": "user", "content": query},
],
)
result = response.choices[0].message.content
if "CONFIDENCE: LOW" in result:
# Escalate to expensive model
response = client.chat.completions.create(
model=ModelTier.EXPENSIVE.value,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": query},
],
)
return response.choices[0].message.content
# Strip the confidence tag and return
return result.replace("CONFIDENCE: HIGH", "").strip()Real-world routing distribution: In a typical support chatbot, 50-60% of queries are SIMPLE, 25-30% are MEDIUM, and 10-20% are COMPLEX. If you route correctly, your blended cost is 60-70% lower than using the expensive model for everything.
Prompt Compression
Shorter prompts = fewer tokens = less money. Here are three practical compression techniques.
1. Shorter System Prompts
Most system prompts are bloated. They repeat instructions, use verbose language, and include examples that aren’t needed for every query.
# BEFORE: 850 tokens
VERBOSE_PROMPT = """
You are a helpful customer support assistant for TechCorp, a leading
technology solutions provider. Your role is to assist customers with
their inquiries about our products and services. When responding to
customers, you should always be polite, professional, and thorough.
When a customer asks about pricing, you should provide accurate pricing
information based on the current pricing tiers. If you do not know the
exact pricing, you should direct the customer to the pricing page on
our website at https://techcorp.com/pricing.
When a customer asks about technical issues, you should first try to
troubleshoot the issue by asking clarifying questions. If you cannot
resolve the issue, you should create a support ticket and provide the
customer with the ticket number.
Here are some examples of how to respond:
Customer: "What's the price of the Pro plan?"
You: "The Pro plan is $49/month when billed annually..."
(... 15 more examples ...)
"""
# AFTER: 180 tokens — same behavior
COMPRESSED_PROMPT = """TechCorp support agent. Be concise and accurate.
Rules:
- Pricing questions: give exact tier pricing. Unknown? Link to techcorp.com/pricing
- Technical issues: troubleshoot first, then create ticket with number
- Tone: professional, not verbose
Response format: direct answer, then next steps if any."""The compressed version produces the same quality responses. Test both with your eval suite to verify.
2. Conversation History Summarization
Conversation history grows linearly and becomes the biggest token sink in multi-turn applications. Summarize older turns instead of keeping them verbatim.
from openai import OpenAI
client = OpenAI()
def summarize_history(
messages: list[dict], keep_recent: int = 4
) -> list[dict]:
"""Summarize old messages, keep recent ones verbatim."""
if len(messages) <= keep_recent:
return messages
old_messages = messages[:-keep_recent]
recent_messages = messages[-keep_recent:]
# Summarize old messages with a cheap model
history_text = "\n".join(
f"{m['role']}: {m['content']}" for m in old_messages
)
summary_response = client.chat.completions.create(
model="gpt-4.1-nano",
messages=[
{
"role": "system",
"content": "Summarize this conversation in 2-3 sentences. Keep key facts, decisions, and unanswered questions.",
},
{"role": "user", "content": history_text},
],
max_tokens=150,
)
summary = summary_response.choices[0].message.content
return [
{"role": "system", "content": f"Previous conversation summary: {summary}"},
*recent_messages,
]
# Example: 20-turn conversation compressed to summary + last 4 turns
# Before: ~4,000 tokens of history
# After: ~600 tokens (150 summary + 4 recent messages)3. Selective Few-Shot Examples
Instead of including 10 examples in every prompt, select the most relevant 1-2 examples based on the query.
from openai import OpenAI
import numpy as np
client = OpenAI()
# Pre-computed examples with embeddings
FEW_SHOT_EXAMPLES = [
{
"query": "I want to cancel my subscription",
"response": "I can help with cancellation. Your current plan...",
"embedding": [0.1, 0.2, ...], # Pre-computed
"category": "cancellation",
},
{
"query": "The app crashes when I upload files",
"response": "Let's troubleshoot the upload issue. First...",
"embedding": [0.3, 0.4, ...],
"category": "technical",
},
# ... 50 more examples
]
def select_examples(query: str, examples: list[dict], top_k: int = 2) -> list[dict]:
"""Select the most relevant few-shot examples for the query."""
query_emb = get_embedding(query) # Reuse from earlier
scored = []
for ex in examples:
sim = cosine_similarity(query_emb, ex["embedding"])
scored.append((sim, ex))
scored.sort(key=lambda x: x[0], reverse=True)
return [ex for _, ex in scored[:top_k]]
def build_prompt_with_selected_examples(query: str, system_base: str) -> list[dict]:
"""Build prompt with only the most relevant examples."""
examples = select_examples(query, FEW_SHOT_EXAMPLES, top_k=2)
examples_text = "\n\n".join(
f"User: {ex['query']}\nAssistant: {ex['response']}"
for ex in examples
)
return [
{
"role": "system",
"content": f"{system_base}\n\nExamples:\n{examples_text}",
},
{"role": "user", "content": query},
]Batch APIs: 50% Off for Async Work
OpenAI’s Batch API processes requests asynchronously at a 50% discount. Perfect for workloads that don’t need real-time responses: nightly report generation, bulk classification, content moderation queues, embedding large document sets.
import json
import time
from openai import OpenAI
client = OpenAI()
def create_batch_file(requests: list[dict], filename: str = "/tmp/batch_input.jsonl"):
"""Create a JSONL file for batch processing."""
with open(filename, "w") as f:
for i, req in enumerate(requests):
batch_request = {
"custom_id": f"request-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": req.get("model", "gpt-4o-mini"),
"messages": req["messages"],
"max_tokens": req.get("max_tokens", 500),
"temperature": 0,
},
}
f.write(json.dumps(batch_request) + "\n")
return filename
def submit_batch(filename: str) -> str:
"""Upload file and submit batch job."""
# Upload the JSONL file
with open(filename, "rb") as f:
file_obj = client.files.create(file=f, purpose="batch")
# Create the batch
batch = client.batches.create(
input_file_id=file_obj.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
print(f"Batch submitted: {batch.id}")
return batch.id
def poll_batch(batch_id: str, interval: int = 30) -> list[dict]:
"""Poll until batch completes and return results."""
while True:
batch = client.batches.retrieve(batch_id)
print(f"Status: {batch.status} | Completed: {batch.request_counts.completed}/{batch.request_counts.total}")
if batch.status == "completed":
# Download results
result_file = client.files.content(batch.output_file_id)
results = []
for line in result_file.text.strip().split("\n"):
results.append(json.loads(line))
return results
if batch.status in ("failed", "expired", "cancelled"):
raise RuntimeError(f"Batch {batch.status}: {batch.errors}")
time.sleep(interval)
# Example: Classify 10,000 support tickets at 50% off
tickets = [
{"messages": [
{"role": "system", "content": "Classify this ticket as: billing, technical, feature_request, or other. Reply with one word."},
{"role": "user", "content": ticket_text},
]}
for ticket_text in load_tickets() # Your function to load tickets
]
batch_file = create_batch_file(tickets)
batch_id = submit_batch(batch_file)
results = poll_batch(batch_id)
# Cost: 10,000 tickets * ~200 tokens each * $0.15/1M (mini) * 0.5 (batch discount)
# = $0.15 instead of $0.30. At GPT-4o scale: $2.50 instead of $5.00Context Window Management
As conversations grow, you need strategies to keep the context window under control without losing important information.
import tiktoken
def manage_context_window(
messages: list[dict],
system_prompt: str,
max_context_tokens: int = 8000,
model: str = "gpt-4o",
) -> list[dict]:
"""Keep conversation within token budget using sliding window + summarization."""
enc = tiktoken.encoding_for_model(model)
system_tokens = len(enc.encode(system_prompt))
available = max_context_tokens - system_tokens - 500 # Reserve 500 for response
# Start from most recent, add messages until budget exhausted
selected = []
token_count = 0
for msg in reversed(messages):
msg_tokens = len(enc.encode(msg["content"]))
if token_count + msg_tokens > available:
break
selected.insert(0, msg)
token_count += msg_tokens
# If we dropped messages, add a summary of what was dropped
if len(selected) < len(messages):
dropped = messages[:len(messages) - len(selected)]
summary = summarize_history(dropped, keep_recent=0)
if summary:
selected.insert(0, {
"role": "system",
"content": f"Earlier conversation summary: {summary[0]['content']}"
})
return [{"role": "system", "content": system_prompt}] + selectedOutput Length Control
Output tokens are 2-5x more expensive than input tokens. Controlling output length is a direct cost lever.
# Strategy 1: Aggressive max_tokens
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
max_tokens=150, # Force concise responses
)
# Strategy 2: Stop sequences for structured output
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer in one paragraph. End with [DONE]."},
{"role": "user", "content": query},
],
stop=["[DONE]"], # Stop generating as soon as [DONE] is produced
)
# Strategy 3: Instruct conciseness in the system prompt
CONCISE_PROMPT = """Answer the user's question in 2-3 sentences maximum.
If a longer answer is needed, provide the key point first, then ask
if the user wants more detail. Never repeat the question back."""Cost impact: If your average output is 500 tokens but could be 150, you cut output costs by 70%. For GPT-4o at $10/1M output tokens, that’s significant at scale.
Smaller Models as First Pass
Use tiny models as classifiers, filters, and pre-processors before sending anything to expensive models.
def needs_llm_response(query: str) -> bool:
"""Use a tiny model to check if the query even needs an LLM response."""
response = client.chat.completions.create(
model="gpt-4.1-nano",
messages=[
{
"role": "system",
"content": """Classify if this query needs an LLM response or can be handled
by a simple lookup/redirect. Reply YES or NO.
NO examples: "hi", "thanks", "ok", greetings, single word messages.
YES examples: questions, requests, complaints, anything needing a real answer.""",
},
{"role": "user", "content": query},
],
max_tokens=5,
temperature=0,
)
return "YES" in response.choices[0].message.content.upper()
def prefilter_rag_results(query: str, chunks: list[str]) -> list[str]:
"""Use a cheap model to filter irrelevant RAG chunks before the expensive call."""
relevant = []
for chunk in chunks:
response = client.chat.completions.create(
model="gpt-4.1-nano",
messages=[
{
"role": "system",
"content": "Is this document chunk relevant to the query? Reply YES or NO only.",
},
{
"role": "user",
"content": f"Query: {query}\n\nChunk: {chunk[:500]}",
},
],
max_tokens=5,
temperature=0,
)
if "YES" in response.choices[0].message.content.upper():
relevant.append(chunk)
return relevantReal Cost Analysis: Before vs. After
Here are three real scenarios with actual math.
Scenario 1: Customer Support Chatbot
| Metric | Before | After | Savings |
|---|---|---|---|
| Model | GPT-4o for everything | Routed (60% nano, 30% mini, 10% 4o) | - |
| Requests/day | 10,000 | 10,000 | - |
| Avg input tokens | 3,500 (2K system + 1K history + 500 query) | 1,200 (compressed prompt + summarized history) | 66% fewer |
| Avg output tokens | 400 | 150 (concise mode) | 62% fewer |
| Semantic cache hit rate | 0% | 35% | 35% requests free |
| Daily input cost | $87.50 | $4.80 | 94% |
| Daily output cost | $40.00 | $5.20 | 87% |
| Daily total | $127.50 | $10.00 | 92% |
Scenario 2: RAG Document Q&A
| Metric | Before | After | Savings |
|---|---|---|---|
| Model | GPT-4o | GPT-4o with prompt caching | - |
| Requests/day | 5,000 | 5,000 | - |
| System prompt tokens | 2,000 x 5,000 = 10M | 2,000 x 5,000 = 10M (90% cached) | 90% cheaper |
| RAG context tokens | 2,500 x 5,000 = 12.5M | 1,500 x 5,000 = 7.5M (pre-filtered) | 40% fewer |
| Response cache hit | 0% | 20% | 20% free |
| Daily total | $86.25 | $22.50 | 74% |
Scenario 3: Batch Content Processing
| Metric | Before | After | Savings |
|---|---|---|---|
| Task | Categorize 100K articles/day | Same | - |
| Model | GPT-4o real-time | GPT-4o-mini batch API | - |
| Per-article tokens | 800 in + 50 out | 400 in + 20 out (compressed) | 50% fewer |
| API pricing | Standard | 50% batch discount | 50% off |
| Daily total | $250.00 | $6.00 | 97.6% |
Monitoring Costs: Dashboards and Alerts
Build cost monitoring into your application from day one.
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class CostTracker:
"""Track LLM costs with alerts and budget limits."""
daily_budget: float = 100.0 # USD
alert_threshold: float = 0.8 # Alert at 80% of budget
_daily_costs: dict = field(default_factory=lambda: defaultdict(float))
_request_counts: dict = field(default_factory=lambda: defaultdict(int))
def record(self, cost: float, model: str, endpoint: str):
today = time.strftime("%Y-%m-%d")
self._daily_costs[today] += cost
self._request_counts[f"{today}:{model}:{endpoint}"] += 1
# Check budget
if self._daily_costs[today] > self.daily_budget * self.alert_threshold:
self._send_alert(
f"LLM cost alert: ${self._daily_costs[today]:.2f} "
f"({self._daily_costs[today]/self.daily_budget*100:.0f}% of daily budget)"
)
if self._daily_costs[today] > self.daily_budget:
self._send_alert(f"BUDGET EXCEEDED: ${self._daily_costs[today]:.2f}")
# Optionally: switch to cheaper models or enable aggressive caching
return False # Signal to caller that budget is exceeded
return True
def get_daily_report(self) -> dict:
today = time.strftime("%Y-%m-%d")
return {
"date": today,
"total_cost": self._daily_costs[today],
"budget_remaining": self.daily_budget - self._daily_costs[today],
"budget_utilization": f"{self._daily_costs[today]/self.daily_budget*100:.1f}%",
}
def _send_alert(self, message: str):
# Integrate with Slack, PagerDuty, email, etc.
print(f"ALERT: {message}")
# Global cost tracker
cost_tracker = CostTracker(daily_budget=200.0)
# Use it after every LLM call
def tracked_llm_call(messages, model="gpt-4o", **kwargs):
response = client.chat.completions.create(
model=model, messages=messages, **kwargs
)
cost = calculate_cost(
model,
response.usage.prompt_tokens,
response.usage.completion_tokens,
)
within_budget = cost_tracker.record(cost, model, "chat")
if not within_budget:
# Fallback: switch to cheaper model for remaining requests
print("Budget exceeded — switching to cheaper model")
return responseThe Optimization Playbook
Apply these optimizations in order of effort vs. impact:
| Priority | Strategy | Effort | Cost Reduction | When to Use |
|---|---|---|---|---|
| 1 | Response caching (exact match) | Low | 20-40% | Any app with repeated queries |
| 2 | Prompt caching (provider-side) | None | 15-30% | System prompts > 1K tokens |
| 3 | Shorter system prompts | Low | 10-20% | Always — most prompts are bloated |
| 4 | Output length control | Low | 10-25% | Always |
| 5 | Model routing | Medium | 40-70% | Apps with varied query complexity |
| 6 | Batch API | Medium | 50% | Non-real-time workloads |
| 7 | Semantic caching | Medium | 20-40% | High-volume, repetitive query patterns |
| 8 | History summarization | Medium | 20-40% | Multi-turn conversations |
| 9 | RAG chunk filtering | Medium | 15-30% | RAG applications |
| 10 | Smaller models as filters | Medium | 10-30% | Pipelines with pre-processing steps |
Start with items 1-4. They require minimal code changes and deliver the biggest bang for the effort. Then move to routing and batching. Semantic caching and the rest are for when you’re already optimized and need to squeeze out more.
Key Takeaways
-
Measure first. Track per-request token counts and costs before optimizing anything. You’ll be surprised where the tokens actually go.
-
Provider prompt caching is free money. Structure your prompts so the stable prefix is long and identical across requests. OpenAI gives 50% off cached tokens, Anthropic gives 90% off.
-
Semantic caching saves 20-40% for applications with repetitive query patterns. Use it only with
temperature=0. -
Model routing is the biggest single lever. Most queries don’t need your most expensive model. A simple classifier routing 60% of traffic to a tiny model cuts costs by 60-70%.
-
Compress everything. Shorter system prompts, summarized history, fewer examples, filtered RAG chunks, concise output instructions. Every token costs money.
-
Batch APIs give 50% off for any workload that doesn’t need real-time responses. Nightly jobs, content processing, bulk classification — batch all of it.
-
Set budget alerts from day one. A single runaway prompt loop can burn through your monthly budget in hours. Monitor and alert before it becomes a CFO conversation.
-
Combine strategies. The real savings come from stacking: prompt caching + routing + compression + caching together can reduce costs by 90%+ compared to a naive implementation.