Traditional web applications have well-understood security models: validate input, sanitize output, authenticate users, authorize access. LLM applications inherit all of those concerns and add entirely new ones. The model itself is an attack surface — it can be tricked into ignoring instructions, leaking system prompts, generating harmful content, or exfiltrating data. Its output is unpredictable by design, which means you can’t just validate against a schema and call it a day.
This lesson covers the security fundamentals that every LLM application needs before going to production. The next lesson goes deep on prompt injection specifically — here we cover the broader security architecture.
The Unique Security Surface of LLM Applications
A traditional API processes structured input and returns structured output. An LLM application processes natural language (anything goes) and returns natural language (anything can come out). This creates two attack surfaces that don’t exist in traditional applications:
Input-side risks:
- Users can manipulate the model’s behavior through crafted prompts
- Sensitive data (PII, credentials) can leak into the LLM provider’s systems
- Adversarial inputs can cause the model to generate harmful or policy-violating content
- High token counts can be weaponized as a denial-of-wallet attack
Output-side risks:
- The model can leak system prompt contents, internal URLs, or API patterns
- Generated code or SQL can contain vulnerabilities or malicious payloads
- The model can hallucinate PII, generating fake-but-realistic personal data
- Unfiltered output can include harmful, biased, or legally problematic content
The core principle: treat the LLM as an untrusted component. Never assume its output is safe. Never send it data you wouldn’t want leaked. Build security layers around it, not inside it.
API Key Management
Your LLM API keys are the most expensive credentials in your system. A leaked OpenAI key can cost you thousands of dollars in minutes. Treat them with the same care as database root passwords.
Never Hardcode Keys
# NEVER do this
client = OpenAI(api_key="sk-proj-abc123...")
# WRONG: .env file committed to git
# .env
# OPENAI_API_KEY=sk-proj-abc123...
# CORRECT: Environment variable, set outside the application
import os
from openai import OpenAI
client = OpenAI() # Reads OPENAI_API_KEY from environment automaticallyUse a Secret Manager in Production
Environment variables are fine for development. In production, use a secrets manager that provides rotation, auditing, and access control.
import boto3
import json
from functools import lru_cache
@lru_cache(maxsize=1)
def get_llm_api_key() -> str:
"""Retrieve the LLM API key from AWS Secrets Manager."""
client = boto3.client("secretsmanager", region_name="us-east-1")
response = client.get_secret_value(SecretId="prod/llm/openai-api-key")
secret = json.loads(response["SecretString"])
return secret["api_key"]
# For HashiCorp Vault
import hvac
def get_api_key_from_vault() -> str:
"""Retrieve the LLM API key from HashiCorp Vault."""
client = hvac.Client(url="https://vault.internal.company.com:8200")
client.token = os.environ["VAULT_TOKEN"] # Service account token
secret = client.secrets.kv.v2.read_secret_version(
path="llm/openai",
mount_point="secret",
)
return secret["data"]["data"]["api_key"]Key Rotation Strategy
import time
import threading
class RotatingAPIKeyManager:
"""Manage API keys with automatic rotation support."""
def __init__(self, secret_name: str, refresh_interval: int = 300):
self.secret_name = secret_name
self.refresh_interval = refresh_interval # Seconds
self._current_key = None
self._lock = threading.Lock()
self._last_refresh = 0
def get_key(self) -> str:
"""Get the current API key, refreshing if stale."""
now = time.time()
if now - self._last_refresh > self.refresh_interval:
with self._lock:
# Double-check after acquiring lock
if now - self._last_refresh > self.refresh_interval:
self._current_key = self._fetch_from_secret_manager()
self._last_refresh = now
return self._current_key
def _fetch_from_secret_manager(self) -> str:
"""Fetch the latest key from the secrets manager."""
client = boto3.client("secretsmanager")
response = client.get_secret_value(SecretId=self.secret_name)
return json.loads(response["SecretString"])["api_key"]
# Usage
key_manager = RotatingAPIKeyManager("prod/llm/openai-api-key")
client = OpenAI(api_key=key_manager.get_key())Least-Privilege: Separate Keys per Environment
| Environment | Key Scope | Rate Limit | Budget |
|---|---|---|---|
| Development | Tier 1, mini models only | 10 RPM | $5/day |
| Staging | Tier 2, all models | 100 RPM | $50/day |
| Production | Tier 4, all models | 5,000 RPM | $500/day |
| Batch jobs | Tier 3, batch API only | 1,000 RPM | $200/day |
Use separate API keys (or separate organization accounts) for each environment. A leaked dev key should not be usable against production.
Rate Limiting
Rate limiting protects against abuse, cost overruns, and denial-of-wallet attacks. Implement it at multiple layers.
Per-User Token-Based Rate Limiting
Standard request-per-minute limits aren’t enough for LLM applications. A single request with a 100K-token context costs more than 1,000 simple requests. Rate limit on tokens, not just requests.
import time
import redis
class TokenRateLimiter:
"""Rate limit based on token consumption, not just request count."""
def __init__(
self,
redis_client: redis.Redis,
max_tokens_per_minute: int = 50_000,
max_requests_per_minute: int = 30,
):
self.redis = redis_client
self.max_tokens = max_tokens_per_minute
self.max_requests = max_requests_per_minute
def check_and_consume(
self, user_id: str, estimated_tokens: int
) -> tuple[bool, dict]:
"""Check if the request is allowed and consume quota."""
now = int(time.time())
window_key = f"ratelimit:{user_id}:{now // 60}" # 1-minute window
pipe = self.redis.pipeline()
pipe.hincrby(window_key, "tokens", 0) # Get current value
pipe.hincrby(window_key, "requests", 0)
result = pipe.execute()
current_tokens = int(result[0])
current_requests = int(result[1])
# Check limits
if current_tokens + estimated_tokens > self.max_tokens:
return False, {
"error": "token_limit_exceeded",
"current_tokens": current_tokens,
"limit": self.max_tokens,
"retry_after_seconds": 60 - (now % 60),
}
if current_requests + 1 > self.max_requests:
return False, {
"error": "request_limit_exceeded",
"current_requests": current_requests,
"limit": self.max_requests,
"retry_after_seconds": 60 - (now % 60),
}
# Consume quota
pipe = self.redis.pipeline()
pipe.hincrby(window_key, "tokens", estimated_tokens)
pipe.hincrby(window_key, "requests", 1)
pipe.expire(window_key, 120) # Expire after 2 minutes
pipe.execute()
return True, {
"remaining_tokens": self.max_tokens - current_tokens - estimated_tokens,
"remaining_requests": self.max_requests - current_requests - 1,
}
# Usage in a FastAPI endpoint
from fastapi import FastAPI, HTTPException, Request
import tiktoken
app = FastAPI()
limiter = TokenRateLimiter(
redis.Redis(), max_tokens_per_minute=50_000, max_requests_per_minute=30
)
enc = tiktoken.encoding_for_model("gpt-4o")
@app.post("/api/chat")
async def chat(request: Request, body: dict):
user_id = request.state.user_id # From auth middleware
# Estimate input tokens
estimated = sum(
len(enc.encode(m["content"])) for m in body["messages"]
)
allowed, info = limiter.check_and_consume(user_id, estimated)
if not allowed:
raise HTTPException(
status_code=429,
detail=info,
headers={"Retry-After": str(info["retry_after_seconds"])},
)
# Process the request...
return {"response": "..."}Tiered Rate Limits
Different users get different limits based on their plan.
TIER_LIMITS = {
"free": {"tokens_per_minute": 10_000, "requests_per_minute": 10, "max_input_tokens": 4_000},
"pro": {"tokens_per_minute": 100_000, "requests_per_minute": 60, "max_input_tokens": 32_000},
"enterprise": {"tokens_per_minute": 500_000, "requests_per_minute": 300, "max_input_tokens": 128_000},
}
def get_limiter_for_user(user_id: str, user_tier: str) -> TokenRateLimiter:
limits = TIER_LIMITS[user_tier]
return TokenRateLimiter(
redis.Redis(),
max_tokens_per_minute=limits["tokens_per_minute"],
max_requests_per_minute=limits["requests_per_minute"],
)Input Validation
Validate everything before it reaches the LLM.
import re
import tiktoken
class InputValidator:
"""Validate and sanitize user input before sending to the LLM."""
def __init__(self, max_input_tokens: int = 4000, max_message_length: int = 10_000):
self.max_input_tokens = max_input_tokens
self.max_message_length = max_message_length
self.encoder = tiktoken.encoding_for_model("gpt-4o")
def validate(self, messages: list[dict]) -> tuple[bool, str | None]:
"""Validate a list of messages. Returns (is_valid, error_message)."""
for msg in messages:
content = msg.get("content", "")
# Length check (characters)
if len(content) > self.max_message_length:
return False, f"Message exceeds {self.max_message_length} character limit"
# Token count check
tokens = len(self.encoder.encode(content))
if tokens > self.max_input_tokens:
return False, f"Message exceeds {self.max_input_tokens} token limit"
# Role validation
if msg.get("role") not in ("user", "assistant", "system"):
return False, f"Invalid role: {msg.get('role')}"
# Check for null bytes and control characters
if re.search(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", content):
return False, "Message contains invalid control characters"
# Check for encoding attacks (unicode tricks)
if self._has_unicode_tricks(content):
return False, "Message contains suspicious unicode sequences"
return True, None
def _has_unicode_tricks(self, text: str) -> bool:
"""Detect common unicode obfuscation techniques."""
# Right-to-left override characters
if re.search(r"[\u200e\u200f\u202a-\u202e\u2066-\u2069]", text):
return True
# Zero-width characters used for hidden text
if re.search(r"[\u200b\u200c\u200d\ufeff]{3,}", text):
return True
return False
def sanitize(self, content: str) -> str:
"""Remove potentially dangerous content from user input."""
# Strip null bytes
content = content.replace("\x00", "")
# Normalize unicode
import unicodedata
content = unicodedata.normalize("NFKC", content)
# Remove excessive whitespace
content = re.sub(r"\n{5,}", "\n\n\n", content)
content = re.sub(r" {10,}", " ", content)
return content.strip()
# Usage
validator = InputValidator(max_input_tokens=4000)
def handle_user_input(messages: list[dict]) -> list[dict]:
is_valid, error = validator.validate(messages)
if not is_valid:
raise ValueError(f"Input validation failed: {error}")
# Sanitize each message
return [
{**msg, "content": validator.sanitize(msg["content"])}
for msg in messages
]PII Detection and Redaction
Users will paste sensitive data into your LLM application. Credit card numbers, social security numbers, medical records, passwords — all of it. Detect and redact PII before it reaches the LLM provider.
Regex-Based PII Detection
import re
from dataclasses import dataclass
@dataclass
class PIIMatch:
type: str
value: str
start: int
end: int
class PIIDetector:
"""Detect common PII patterns in text."""
PATTERNS = {
"ssn": r"\b\d{3}-\d{2}-\d{4}\b",
"credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
"email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"phone_us": r"\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
"ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
"aws_key": r"\b(?:AKIA|ASIA)[A-Z0-9]{16}\b",
"api_key_generic": r"\b(?:sk-|pk_|rk_)[a-zA-Z0-9]{20,}\b",
"date_of_birth": r"\b(?:0[1-9]|1[0-2])/(?:0[1-9]|[12]\d|3[01])/(?:19|20)\d{2}\b",
}
def detect(self, text: str) -> list[PIIMatch]:
"""Find all PII instances in text."""
matches = []
for pii_type, pattern in self.PATTERNS.items():
for match in re.finditer(pattern, text):
matches.append(PIIMatch(
type=pii_type,
value=match.group(),
start=match.start(),
end=match.end(),
))
return matches
def redact(self, text: str) -> tuple[str, list[PIIMatch]]:
"""Replace PII with type-specific placeholders."""
matches = self.detect(text)
# Sort by position (reverse) to maintain correct indices
matches.sort(key=lambda m: m.start, reverse=True)
redacted = text
for match in matches:
placeholder = f"[{match.type.upper()}_REDACTED]"
redacted = redacted[:match.start] + placeholder + redacted[match.end:]
return redacted, matches
# Usage
detector = PIIDetector()
user_input = "My SSN is 123-45-6789 and my card is 4111 1111 1111 1111. Email: [email protected]"
redacted, findings = detector.redact(user_input)
print(redacted)
# "My SSN is [SSN_REDACTED] and my card is [CREDIT_CARD_REDACTED]. Email: [EMAIL_REDACTED]"
print(f"Found {len(findings)} PII instances")
# Found 3 PII instancesUsing Microsoft Presidio for Advanced PII Detection
Regex catches obvious patterns but misses names, addresses, and context-dependent PII. Microsoft Presidio uses NLP for better detection.
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
# Initialize Presidio
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()
# Add custom recognizers for your domain
api_key_recognizer = PatternRecognizer(
supported_entity="API_KEY",
patterns=[
Pattern("OpenAI Key", r"sk-[a-zA-Z0-9]{20,}", 0.9),
Pattern("Anthropic Key", r"sk-ant-[a-zA-Z0-9]{20,}", 0.95),
],
)
analyzer.registry.add_recognizer(api_key_recognizer)
def detect_and_redact_pii(text: str) -> tuple[str, list]:
"""Use Presidio for comprehensive PII detection and redaction."""
# Analyze text for PII
results = analyzer.analyze(
text=text,
language="en",
entities=[
"PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
"CREDIT_CARD", "US_SSN", "LOCATION",
"IP_ADDRESS", "API_KEY",
],
score_threshold=0.7, # Confidence threshold
)
if not results:
return text, []
# Anonymize detected PII
anonymized = anonymizer.anonymize(text=text, analyzer_results=results)
findings = [
{"type": r.entity_type, "score": r.score, "start": r.start, "end": r.end}
for r in results
]
return anonymized.text, findings
# Use as middleware before any LLM call
def safe_llm_call(messages: list[dict], **kwargs) -> str:
"""Redact PII from messages before sending to the LLM."""
cleaned_messages = []
all_findings = []
for msg in messages:
if msg["role"] == "user":
redacted_content, findings = detect_and_redact_pii(msg["content"])
cleaned_messages.append({**msg, "content": redacted_content})
all_findings.extend(findings)
else:
cleaned_messages.append(msg)
if all_findings:
print(f"Warning: Redacted {len(all_findings)} PII instances before LLM call")
# Now safe to send to the LLM
response = client.chat.completions.create(messages=cleaned_messages, **kwargs)
return response.choices[0].message.contentOutput Filtering
The model’s output is untrusted. It might leak system prompt fragments, internal URLs, PII (real or hallucinated), or generate harmful content. Filter everything.
import re
import json
from dataclasses import dataclass
@dataclass
class OutputFilterResult:
clean_text: str
violations: list[str]
was_modified: bool
class OutputFilter:
"""Filter LLM output for security and compliance."""
def __init__(self):
self.pii_detector = PIIDetector() # From earlier
# Patterns that should never appear in output
self.blocked_patterns = {
"internal_url": r"https?://(?:internal|staging|dev|localhost)[^\s]*",
"system_prompt_leak": r"(?:system prompt|my instructions|I was told to|my guidelines say)",
"api_key": r"\b(?:sk-|pk_|AKIA)[a-zA-Z0-9]{16,}\b",
"internal_ip": r"\b(?:10\.\d{1,3}|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b",
"file_path": r"(?:/home/\w+|/var/|/etc/|C:\\Users\\)\S+",
"connection_string": r"(?:mongodb|postgresql|mysql|redis)://[^\s]+",
}
def filter(self, text: str) -> OutputFilterResult:
"""Filter the LLM output for security violations."""
violations = []
filtered = text
# Check for PII in output
pii_matches = self.pii_detector.detect(filtered)
if pii_matches:
filtered, _ = self.pii_detector.redact(filtered)
violations.append(
f"PII detected in output: {[m.type for m in pii_matches]}"
)
# Check for blocked patterns
for name, pattern in self.blocked_patterns.items():
matches = re.findall(pattern, filtered, re.IGNORECASE)
if matches:
for match in matches:
filtered = filtered.replace(match, f"[{name.upper()}_REDACTED]")
violations.append(f"Blocked pattern '{name}' found in output")
return OutputFilterResult(
clean_text=filtered,
violations=violations,
was_modified=len(violations) > 0,
)
def validate_structured_output(
self, text: str, expected_format: str = "json"
) -> tuple[bool, str | None]:
"""Validate that structured output matches expected format."""
if expected_format == "json":
try:
json.loads(text)
return True, None
except json.JSONDecodeError as e:
return False, f"Invalid JSON: {e}"
return True, None
# Usage
output_filter = OutputFilter()
def filtered_llm_call(messages: list[dict], **kwargs) -> str:
"""Make an LLM call with output filtering."""
response = client.chat.completions.create(messages=messages, **kwargs)
raw_output = response.choices[0].message.content
result = output_filter.filter(raw_output)
if result.violations:
# Log violations for security review
print(f"Output violations detected: {result.violations}")
return result.clean_textData Privacy: What Goes to the Provider
Every message you send to OpenAI or Anthropic is transmitted to their servers. Understand what this means for your data.
What the Providers See
| Data | OpenAI API | Anthropic API |
|---|---|---|
| User messages | Yes (not used for training on API) | Yes (not used for training on API) |
| System prompts | Yes | Yes |
| File uploads | Yes | Yes |
| Conversation history | Whatever you send in messages |
Whatever you send in messages |
| Your API key usage | Yes | Yes |
Key facts:
- Both OpenAI and Anthropic state they do not train on API data by default
- Data may be retained for up to 30 days for abuse monitoring (check current policies)
- Enterprise agreements can reduce or eliminate retention
- Some compliance frameworks (HIPAA, SOC 2) require specific data processing agreements
Practical Data Privacy Measures
class PrivacyAwareLLMClient:
"""Wrapper that enforces data privacy policies."""
def __init__(self, client, privacy_config: dict):
self.client = client
self.config = privacy_config
self.pii_detector = PIIDetector()
def chat(self, messages: list[dict], **kwargs) -> str:
# Enforce PII redaction if configured
if self.config.get("redact_pii", True):
messages = self._redact_pii_in_messages(messages)
# Enforce data residency by selecting the right endpoint
if self.config.get("region") == "eu":
# Some providers offer EU endpoints
pass
# Log what we're sending (without the actual content)
self._audit_log(messages, kwargs)
response = self.client.chat.completions.create(
messages=messages, **kwargs
)
return response.choices[0].message.content
def _redact_pii_in_messages(self, messages: list[dict]) -> list[dict]:
cleaned = []
for msg in messages:
redacted, findings = self.pii_detector.redact(msg["content"])
if findings:
print(f"Redacted {len(findings)} PII items from {msg['role']} message")
cleaned.append({**msg, "content": redacted})
return cleaned
def _audit_log(self, messages: list[dict], kwargs: dict):
"""Log metadata about the request without logging content."""
import tiktoken
enc = tiktoken.encoding_for_model(kwargs.get("model", "gpt-4o"))
log_entry = {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
"model": kwargs.get("model"),
"message_count": len(messages),
"total_tokens_estimate": sum(
len(enc.encode(m["content"])) for m in messages
),
# Do NOT log message content
}
print(f"LLM audit: {json.dumps(log_entry)}")GDPR and CCPA Considerations
If your users are in the EU or California, you need to address:
- Data processing agreements (DPA): Sign a DPA with your LLM provider before processing personal data.
- Right to deletion: Users can request deletion of their data. You need to be able to delete conversation histories, and understand what the provider retains.
- Data minimization: Only send the minimum data necessary to the LLM. Don’t include user IDs, email addresses, or names in the prompt unless essential.
- Transparency: Your privacy policy should disclose that user queries may be processed by third-party AI providers.
- Consent: In some cases, explicit consent may be required before processing user data through an LLM.
SQL Injection Through LLMs
This is one of the most dangerous patterns in LLM applications. If your app generates SQL from natural language and executes it, you have a SQL injection vulnerability with an AI-powered attack surface.
# EXTREMELY DANGEROUS — never do this
def natural_language_query(user_question: str) -> list:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Convert the user's question to SQL for our users table."},
{"role": "user", "content": user_question},
],
)
sql = response.choices[0].message.content
return db.execute(sql) # Direct execution of LLM-generated SQL!
# An attacker asks: "Show me all users; DROP TABLE users; --"
# Or more subtly: "Show me users, and also list all table names in the database"Safe pattern: parameterized queries with allowlists.
from enum import Enum
class AllowedTable(Enum):
USERS = "users"
ORDERS = "orders"
PRODUCTS = "products"
class AllowedColumn(Enum):
USER_NAME = "name"
USER_EMAIL = "email"
ORDER_DATE = "order_date"
ORDER_TOTAL = "total"
PRODUCT_NAME = "product_name"
PRODUCT_PRICE = "price"
def safe_natural_language_query(user_question: str) -> list:
"""Generate SQL safely using structured output and allowlists."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """Convert the user's question to a query specification.
Return JSON with these fields:
- table: one of [users, orders, products]
- columns: list of column names from [name, email, order_date, total, product_name, price]
- where_conditions: list of {column, operator, value} where operator is one of [=, >, <, >=, <=, LIKE]
- order_by: optional column name
- limit: optional integer (max 100)
ONLY return the JSON, nothing else.""",
},
{"role": "user", "content": user_question},
],
temperature=0,
response_format={"type": "json_object"},
)
spec = json.loads(response.choices[0].message.content)
# Validate every field against allowlists
if spec["table"] not in [t.value for t in AllowedTable]:
raise ValueError(f"Table not allowed: {spec['table']}")
for col in spec.get("columns", []):
if col not in [c.value for c in AllowedColumn]:
raise ValueError(f"Column not allowed: {col}")
# Build parameterized query
columns = ", ".join(spec["columns"])
query = f"SELECT {columns} FROM {spec['table']}"
params = []
if spec.get("where_conditions"):
conditions = []
for cond in spec["where_conditions"]:
if cond["column"] not in [c.value for c in AllowedColumn]:
raise ValueError(f"Where column not allowed: {cond['column']}")
if cond["operator"] not in ("=", ">", "<", ">=", "<=", "LIKE"):
raise ValueError(f"Operator not allowed: {cond['operator']}")
conditions.append(f"{cond['column']} {cond['operator']} %s")
params.append(cond["value"])
query += " WHERE " + " AND ".join(conditions)
limit = min(spec.get("limit", 100), 100)
query += f" LIMIT {limit}"
# Execute with parameterized query — safe from injection
return db.execute(query, params)Logging and Auditing
Log everything about LLM interactions for debugging, security review, and compliance. But be careful about what you log.
import logging
import hashlib
# Configure structured logging
logger = logging.getLogger("llm_audit")
class LLMAuditLogger:
"""Secure audit logging for LLM interactions."""
def __init__(self, log_content: bool = False):
# In production, log_content should almost always be False
self.log_content = log_content
def log_request(self, request_id: str, user_id: str, messages: list[dict],
model: str):
"""Log request metadata (not content by default)."""
entry = {
"event": "llm_request",
"request_id": request_id,
"user_id_hash": hashlib.sha256(user_id.encode()).hexdigest()[:16],
"model": model,
"message_count": len(messages),
"roles": [m["role"] for m in messages],
"input_char_count": sum(len(m["content"]) for m in messages),
}
if self.log_content:
# Only in debug environments — NEVER in production
entry["messages"] = messages
logger.info(json.dumps(entry))
def log_response(self, request_id: str, output_tokens: int,
input_tokens: int, cost: float,
violations: list[str] | None = None):
"""Log response metadata."""
entry = {
"event": "llm_response",
"request_id": request_id,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"cost_usd": cost,
}
if violations:
entry["security_violations"] = violations
logger.warning(json.dumps(entry))
else:
logger.info(json.dumps(entry))
def log_blocked_request(self, request_id: str, user_id: str,
reason: str):
"""Log when a request is blocked by security filters."""
entry = {
"event": "llm_blocked",
"request_id": request_id,
"user_id_hash": hashlib.sha256(user_id.encode()).hexdigest()[:16],
"reason": reason,
}
logger.warning(json.dumps(entry))What to log: Request metadata (model, token counts, costs, timing), security violations, rate limit hits, PII detection events.
What NOT to log: User message content (PII risk), system prompts (intellectual property), API keys, full response content. If you must log content for debugging, use a separate short-retention log with restricted access.
Content Moderation
Use moderation APIs to flag harmful content in both input and output.
from openai import OpenAI
client = OpenAI()
def moderate_content(text: str) -> tuple[bool, dict]:
"""Check content against OpenAI's moderation API."""
response = client.moderations.create(
model="omni-moderation-latest",
input=text,
)
result = response.results[0]
if result.flagged:
flagged_categories = {
cat: score
for cat, score in result.category_scores.__dict__.items()
if getattr(result.categories, cat, False)
}
return False, {
"flagged": True,
"categories": flagged_categories,
}
return True, {"flagged": False}
def moderated_llm_call(messages: list[dict], **kwargs) -> str:
"""LLM call with content moderation on input and output."""
# Moderate user input
for msg in messages:
if msg["role"] == "user":
is_safe, details = moderate_content(msg["content"])
if not is_safe:
return f"I can't process this request. Content policy violation: {list(details['categories'].keys())}"
# Get LLM response
response = client.chat.completions.create(messages=messages, **kwargs)
output = response.choices[0].message.content
# Moderate output
is_safe, details = moderate_content(output)
if not is_safe:
return "I generated a response that didn't meet our content policy. Please rephrase your question."
return outputDefense-in-Depth Architecture
No single security measure is sufficient. Layer your defenses so that a failure at one layer is caught by the next.
User Request
|
v
[Layer 1: Rate Limiting] -- Block abuse early
|
v
[Layer 2: Input Validation] -- Length, encoding, structure
|
v
[Layer 3: PII Detection] -- Redact sensitive data
|
v
[Layer 4: Content Moderation] -- Block harmful input
|
v
[Layer 5: Prompt Injection Check] -- Detect manipulation attempts
|
v
[LLM Call] -- The actual model inference
|
v
[Layer 6: Output Filtering] -- Redact leaked secrets, PII
|
v
[Layer 7: Content Moderation] -- Block harmful output
|
v
[Layer 8: Structured Validation] -- Verify output format
|
v
User ResponseHere’s a middleware that implements this full pipeline:
from dataclasses import dataclass
import uuid
@dataclass
class SecurityContext:
request_id: str
user_id: str
user_tier: str
class LLMSecurityMiddleware:
"""Complete security middleware for LLM applications."""
def __init__(self, config: dict):
self.rate_limiter = TokenRateLimiter(
redis.Redis(),
max_tokens_per_minute=config.get("max_tokens_per_minute", 50_000),
)
self.input_validator = InputValidator(
max_input_tokens=config.get("max_input_tokens", 4000),
)
self.pii_detector = PIIDetector()
self.output_filter = OutputFilter()
self.audit_logger = LLMAuditLogger()
def process(
self, messages: list[dict], ctx: SecurityContext, **llm_kwargs
) -> str:
"""Process a request through the full security pipeline."""
request_id = ctx.request_id or str(uuid.uuid4())
# Layer 1: Rate limiting
estimated_tokens = sum(len(m["content"]) // 4 for m in messages)
allowed, info = self.rate_limiter.check_and_consume(
ctx.user_id, estimated_tokens
)
if not allowed:
self.audit_logger.log_blocked_request(
request_id, ctx.user_id, f"rate_limit: {info['error']}"
)
raise RateLimitError(info)
# Layer 2: Input validation
is_valid, error = self.input_validator.validate(messages)
if not is_valid:
self.audit_logger.log_blocked_request(
request_id, ctx.user_id, f"validation: {error}"
)
raise ValidationError(error)
# Layer 3: PII redaction
cleaned_messages = []
for msg in messages:
if msg["role"] == "user":
redacted, findings = self.pii_detector.redact(msg["content"])
cleaned_messages.append({**msg, "content": redacted})
else:
cleaned_messages.append(msg)
# Layer 4: Content moderation (input)
for msg in cleaned_messages:
if msg["role"] == "user":
is_safe, details = moderate_content(msg["content"])
if not is_safe:
self.audit_logger.log_blocked_request(
request_id, ctx.user_id, f"moderation: {details}"
)
raise ContentPolicyError("Input violates content policy")
# Log the request
self.audit_logger.log_request(
request_id, ctx.user_id, cleaned_messages,
llm_kwargs.get("model", "unknown"),
)
# Make the LLM call
response = client.chat.completions.create(
messages=cleaned_messages, **llm_kwargs
)
raw_output = response.choices[0].message.content
# Layer 6: Output filtering
filter_result = self.output_filter.filter(raw_output)
# Layer 7: Content moderation (output)
is_safe, details = moderate_content(filter_result.clean_text)
if not is_safe:
self.audit_logger.log_response(
request_id,
response.usage.completion_tokens,
response.usage.prompt_tokens,
0.0,
violations=["output_moderation_failed"],
)
return "I wasn't able to generate an appropriate response. Please try rephrasing."
# Log the response
self.audit_logger.log_response(
request_id,
response.usage.completion_tokens,
response.usage.prompt_tokens,
calculate_cost(
llm_kwargs.get("model", "gpt-4o"),
response.usage.prompt_tokens,
response.usage.completion_tokens,
),
violations=filter_result.violations if filter_result.was_modified else None,
)
return filter_result.clean_text
# Custom exceptions
class RateLimitError(Exception):
pass
class ValidationError(Exception):
pass
class ContentPolicyError(Exception):
passSecurity Checklist for Launch
Before your LLM application goes to production, verify every item on this list:
API Keys and Secrets
- No API keys in source code or git history
- Keys stored in a secrets manager (not just env vars in production)
- Separate keys per environment (dev/staging/prod)
- Key rotation process documented and tested
- Budget limits set on API provider dashboards
Rate Limiting
- Per-user rate limits (requests AND tokens)
- Per-IP rate limits for unauthenticated endpoints
- Tiered limits based on user plan
- Graceful degradation when limits are hit (not just 429 errors)
Input Security
- Maximum input length enforced (characters and tokens)
- PII detection and redaction active
- Content moderation on user input
- Unicode and encoding attacks handled
- Prompt injection defenses in place (see next lesson)
Output Security
- Output filtering for PII, secrets, and internal URLs
- Content moderation on LLM output
- Structured output validation where applicable
- LLM-generated code/SQL never executed directly
Data Privacy
- Data processing agreement signed with LLM provider
- Privacy policy updated to disclose AI processing
- User data minimized in prompts
- Audit logging active (without logging content)
- Data retention policies defined and enforced
Monitoring
- Cost alerts configured
- Security violation alerts configured
- Rate limit hit alerts configured
- Audit logs shipped to central logging system
- Incident response plan for security breaches
Key Takeaways
-
Treat the LLM as untrusted. It processes arbitrary input and produces unpredictable output. Never execute its output directly, never trust it to be safe, and never assume it follows your instructions.
-
Protect both sides. Input security prevents sensitive data from reaching the provider and blocks manipulation attempts. Output security catches leaked secrets, PII, and policy violations before they reach the user.
-
API keys are your most expensive credentials. Use a secrets manager, rotate regularly, set budget limits, and use separate keys per environment. A leaked key can cost you thousands in minutes.
-
Rate limit on tokens, not just requests. A single 100K-token request costs more than a thousand simple ones. Token-based rate limiting prevents denial-of-wallet attacks.
-
PII will end up in your prompts. Users paste credit card numbers, SSNs, and passwords. Detect and redact automatically before sending anything to the provider.
-
Never execute LLM-generated SQL or code directly. Use structured output, validate against allowlists, and parameterize queries. The LLM can be tricked into generating malicious payloads.
-
Layer your defenses. No single security measure is reliable. Rate limiting, input validation, PII detection, content moderation, output filtering, and audit logging all work together. Each layer catches what the others miss.
-
Log metadata, not content. Track token counts, costs, model usage, and security violations. Avoid logging user messages or model responses in production — that creates a data privacy liability.