arrow_backBACK TO LLM ENGINEERING IN PRODUCTION
Lesson 11LLM Engineering in Production16 min read

Securing Your LLM Application

April 01, 2026

TL;DR

LLM apps have a unique attack surface: the model can be manipulated through its input, and it can leak sensitive data through its output. Secure both sides. Input: rate limit, validate, sanitize, and detect PII before it reaches the model. Output: filter, validate structure, and scan for leaked secrets. Infrastructure: rotate API keys, use least-privilege access, log everything, and never trust the model's output as safe-to-execute code or SQL.

Traditional web applications have well-understood security models: validate input, sanitize output, authenticate users, authorize access. LLM applications inherit all of those concerns and add entirely new ones. The model itself is an attack surface — it can be tricked into ignoring instructions, leaking system prompts, generating harmful content, or exfiltrating data. Its output is unpredictable by design, which means you can’t just validate against a schema and call it a day.

This lesson covers the security fundamentals that every LLM application needs before going to production. The next lesson goes deep on prompt injection specifically — here we cover the broader security architecture.

The Unique Security Surface of LLM Applications

A traditional API processes structured input and returns structured output. An LLM application processes natural language (anything goes) and returns natural language (anything can come out). This creates two attack surfaces that don’t exist in traditional applications:

Input-side risks:

  • Users can manipulate the model’s behavior through crafted prompts
  • Sensitive data (PII, credentials) can leak into the LLM provider’s systems
  • Adversarial inputs can cause the model to generate harmful or policy-violating content
  • High token counts can be weaponized as a denial-of-wallet attack

Output-side risks:

  • The model can leak system prompt contents, internal URLs, or API patterns
  • Generated code or SQL can contain vulnerabilities or malicious payloads
  • The model can hallucinate PII, generating fake-but-realistic personal data
  • Unfiltered output can include harmful, biased, or legally problematic content

The core principle: treat the LLM as an untrusted component. Never assume its output is safe. Never send it data you wouldn’t want leaked. Build security layers around it, not inside it.

API Key Management

Your LLM API keys are the most expensive credentials in your system. A leaked OpenAI key can cost you thousands of dollars in minutes. Treat them with the same care as database root passwords.

Never Hardcode Keys

# NEVER do this
client = OpenAI(api_key="sk-proj-abc123...")

# WRONG: .env file committed to git
# .env
# OPENAI_API_KEY=sk-proj-abc123...

# CORRECT: Environment variable, set outside the application
import os
from openai import OpenAI

client = OpenAI()  # Reads OPENAI_API_KEY from environment automatically

Use a Secret Manager in Production

Environment variables are fine for development. In production, use a secrets manager that provides rotation, auditing, and access control.

import boto3
import json
from functools import lru_cache

@lru_cache(maxsize=1)
def get_llm_api_key() -> str:
    """Retrieve the LLM API key from AWS Secrets Manager."""
    client = boto3.client("secretsmanager", region_name="us-east-1")
    response = client.get_secret_value(SecretId="prod/llm/openai-api-key")
    secret = json.loads(response["SecretString"])
    return secret["api_key"]


# For HashiCorp Vault
import hvac

def get_api_key_from_vault() -> str:
    """Retrieve the LLM API key from HashiCorp Vault."""
    client = hvac.Client(url="https://vault.internal.company.com:8200")
    client.token = os.environ["VAULT_TOKEN"]  # Service account token

    secret = client.secrets.kv.v2.read_secret_version(
        path="llm/openai",
        mount_point="secret",
    )
    return secret["data"]["data"]["api_key"]

Key Rotation Strategy

import time
import threading

class RotatingAPIKeyManager:
    """Manage API keys with automatic rotation support."""

    def __init__(self, secret_name: str, refresh_interval: int = 300):
        self.secret_name = secret_name
        self.refresh_interval = refresh_interval  # Seconds
        self._current_key = None
        self._lock = threading.Lock()
        self._last_refresh = 0

    def get_key(self) -> str:
        """Get the current API key, refreshing if stale."""
        now = time.time()
        if now - self._last_refresh > self.refresh_interval:
            with self._lock:
                # Double-check after acquiring lock
                if now - self._last_refresh > self.refresh_interval:
                    self._current_key = self._fetch_from_secret_manager()
                    self._last_refresh = now
        return self._current_key

    def _fetch_from_secret_manager(self) -> str:
        """Fetch the latest key from the secrets manager."""
        client = boto3.client("secretsmanager")
        response = client.get_secret_value(SecretId=self.secret_name)
        return json.loads(response["SecretString"])["api_key"]


# Usage
key_manager = RotatingAPIKeyManager("prod/llm/openai-api-key")
client = OpenAI(api_key=key_manager.get_key())

Least-Privilege: Separate Keys per Environment

Environment Key Scope Rate Limit Budget
Development Tier 1, mini models only 10 RPM $5/day
Staging Tier 2, all models 100 RPM $50/day
Production Tier 4, all models 5,000 RPM $500/day
Batch jobs Tier 3, batch API only 1,000 RPM $200/day

Use separate API keys (or separate organization accounts) for each environment. A leaked dev key should not be usable against production.

Rate Limiting

Rate limiting protects against abuse, cost overruns, and denial-of-wallet attacks. Implement it at multiple layers.

Per-User Token-Based Rate Limiting

Standard request-per-minute limits aren’t enough for LLM applications. A single request with a 100K-token context costs more than 1,000 simple requests. Rate limit on tokens, not just requests.

import time
import redis

class TokenRateLimiter:
    """Rate limit based on token consumption, not just request count."""

    def __init__(
        self,
        redis_client: redis.Redis,
        max_tokens_per_minute: int = 50_000,
        max_requests_per_minute: int = 30,
    ):
        self.redis = redis_client
        self.max_tokens = max_tokens_per_minute
        self.max_requests = max_requests_per_minute

    def check_and_consume(
        self, user_id: str, estimated_tokens: int
    ) -> tuple[bool, dict]:
        """Check if the request is allowed and consume quota."""
        now = int(time.time())
        window_key = f"ratelimit:{user_id}:{now // 60}"  # 1-minute window

        pipe = self.redis.pipeline()
        pipe.hincrby(window_key, "tokens", 0)  # Get current value
        pipe.hincrby(window_key, "requests", 0)
        result = pipe.execute()

        current_tokens = int(result[0])
        current_requests = int(result[1])

        # Check limits
        if current_tokens + estimated_tokens > self.max_tokens:
            return False, {
                "error": "token_limit_exceeded",
                "current_tokens": current_tokens,
                "limit": self.max_tokens,
                "retry_after_seconds": 60 - (now % 60),
            }

        if current_requests + 1 > self.max_requests:
            return False, {
                "error": "request_limit_exceeded",
                "current_requests": current_requests,
                "limit": self.max_requests,
                "retry_after_seconds": 60 - (now % 60),
            }

        # Consume quota
        pipe = self.redis.pipeline()
        pipe.hincrby(window_key, "tokens", estimated_tokens)
        pipe.hincrby(window_key, "requests", 1)
        pipe.expire(window_key, 120)  # Expire after 2 minutes
        pipe.execute()

        return True, {
            "remaining_tokens": self.max_tokens - current_tokens - estimated_tokens,
            "remaining_requests": self.max_requests - current_requests - 1,
        }


# Usage in a FastAPI endpoint
from fastapi import FastAPI, HTTPException, Request
import tiktoken

app = FastAPI()
limiter = TokenRateLimiter(
    redis.Redis(), max_tokens_per_minute=50_000, max_requests_per_minute=30
)
enc = tiktoken.encoding_for_model("gpt-4o")

@app.post("/api/chat")
async def chat(request: Request, body: dict):
    user_id = request.state.user_id  # From auth middleware

    # Estimate input tokens
    estimated = sum(
        len(enc.encode(m["content"])) for m in body["messages"]
    )

    allowed, info = limiter.check_and_consume(user_id, estimated)
    if not allowed:
        raise HTTPException(
            status_code=429,
            detail=info,
            headers={"Retry-After": str(info["retry_after_seconds"])},
        )

    # Process the request...
    return {"response": "..."}

Tiered Rate Limits

Different users get different limits based on their plan.

TIER_LIMITS = {
    "free": {"tokens_per_minute": 10_000, "requests_per_minute": 10, "max_input_tokens": 4_000},
    "pro": {"tokens_per_minute": 100_000, "requests_per_minute": 60, "max_input_tokens": 32_000},
    "enterprise": {"tokens_per_minute": 500_000, "requests_per_minute": 300, "max_input_tokens": 128_000},
}

def get_limiter_for_user(user_id: str, user_tier: str) -> TokenRateLimiter:
    limits = TIER_LIMITS[user_tier]
    return TokenRateLimiter(
        redis.Redis(),
        max_tokens_per_minute=limits["tokens_per_minute"],
        max_requests_per_minute=limits["requests_per_minute"],
    )

Input Validation

Validate everything before it reaches the LLM.

import re
import tiktoken

class InputValidator:
    """Validate and sanitize user input before sending to the LLM."""

    def __init__(self, max_input_tokens: int = 4000, max_message_length: int = 10_000):
        self.max_input_tokens = max_input_tokens
        self.max_message_length = max_message_length
        self.encoder = tiktoken.encoding_for_model("gpt-4o")

    def validate(self, messages: list[dict]) -> tuple[bool, str | None]:
        """Validate a list of messages. Returns (is_valid, error_message)."""
        for msg in messages:
            content = msg.get("content", "")

            # Length check (characters)
            if len(content) > self.max_message_length:
                return False, f"Message exceeds {self.max_message_length} character limit"

            # Token count check
            tokens = len(self.encoder.encode(content))
            if tokens > self.max_input_tokens:
                return False, f"Message exceeds {self.max_input_tokens} token limit"

            # Role validation
            if msg.get("role") not in ("user", "assistant", "system"):
                return False, f"Invalid role: {msg.get('role')}"

            # Check for null bytes and control characters
            if re.search(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", content):
                return False, "Message contains invalid control characters"

            # Check for encoding attacks (unicode tricks)
            if self._has_unicode_tricks(content):
                return False, "Message contains suspicious unicode sequences"

        return True, None

    def _has_unicode_tricks(self, text: str) -> bool:
        """Detect common unicode obfuscation techniques."""
        # Right-to-left override characters
        if re.search(r"[\u200e\u200f\u202a-\u202e\u2066-\u2069]", text):
            return True
        # Zero-width characters used for hidden text
        if re.search(r"[\u200b\u200c\u200d\ufeff]{3,}", text):
            return True
        return False

    def sanitize(self, content: str) -> str:
        """Remove potentially dangerous content from user input."""
        # Strip null bytes
        content = content.replace("\x00", "")
        # Normalize unicode
        import unicodedata
        content = unicodedata.normalize("NFKC", content)
        # Remove excessive whitespace
        content = re.sub(r"\n{5,}", "\n\n\n", content)
        content = re.sub(r" {10,}", " ", content)
        return content.strip()


# Usage
validator = InputValidator(max_input_tokens=4000)

def handle_user_input(messages: list[dict]) -> list[dict]:
    is_valid, error = validator.validate(messages)
    if not is_valid:
        raise ValueError(f"Input validation failed: {error}")

    # Sanitize each message
    return [
        {**msg, "content": validator.sanitize(msg["content"])}
        for msg in messages
    ]

PII Detection and Redaction

Users will paste sensitive data into your LLM application. Credit card numbers, social security numbers, medical records, passwords — all of it. Detect and redact PII before it reaches the LLM provider.

Regex-Based PII Detection

import re
from dataclasses import dataclass

@dataclass
class PIIMatch:
    type: str
    value: str
    start: int
    end: int

class PIIDetector:
    """Detect common PII patterns in text."""

    PATTERNS = {
        "ssn": r"\b\d{3}-\d{2}-\d{4}\b",
        "credit_card": r"\b(?:\d{4}[-\s]?){3}\d{4}\b",
        "email": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
        "phone_us": r"\b(?:\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b",
        "ip_address": r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b",
        "aws_key": r"\b(?:AKIA|ASIA)[A-Z0-9]{16}\b",
        "api_key_generic": r"\b(?:sk-|pk_|rk_)[a-zA-Z0-9]{20,}\b",
        "date_of_birth": r"\b(?:0[1-9]|1[0-2])/(?:0[1-9]|[12]\d|3[01])/(?:19|20)\d{2}\b",
    }

    def detect(self, text: str) -> list[PIIMatch]:
        """Find all PII instances in text."""
        matches = []
        for pii_type, pattern in self.PATTERNS.items():
            for match in re.finditer(pattern, text):
                matches.append(PIIMatch(
                    type=pii_type,
                    value=match.group(),
                    start=match.start(),
                    end=match.end(),
                ))
        return matches

    def redact(self, text: str) -> tuple[str, list[PIIMatch]]:
        """Replace PII with type-specific placeholders."""
        matches = self.detect(text)

        # Sort by position (reverse) to maintain correct indices
        matches.sort(key=lambda m: m.start, reverse=True)

        redacted = text
        for match in matches:
            placeholder = f"[{match.type.upper()}_REDACTED]"
            redacted = redacted[:match.start] + placeholder + redacted[match.end:]

        return redacted, matches


# Usage
detector = PIIDetector()

user_input = "My SSN is 123-45-6789 and my card is 4111 1111 1111 1111. Email: [email protected]"
redacted, findings = detector.redact(user_input)

print(redacted)
# "My SSN is [SSN_REDACTED] and my card is [CREDIT_CARD_REDACTED]. Email: [EMAIL_REDACTED]"

print(f"Found {len(findings)} PII instances")
# Found 3 PII instances

Using Microsoft Presidio for Advanced PII Detection

Regex catches obvious patterns but misses names, addresses, and context-dependent PII. Microsoft Presidio uses NLP for better detection.

from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine

# Initialize Presidio
analyzer = AnalyzerEngine()
anonymizer = AnonymizerEngine()

# Add custom recognizers for your domain
api_key_recognizer = PatternRecognizer(
    supported_entity="API_KEY",
    patterns=[
        Pattern("OpenAI Key", r"sk-[a-zA-Z0-9]{20,}", 0.9),
        Pattern("Anthropic Key", r"sk-ant-[a-zA-Z0-9]{20,}", 0.95),
    ],
)
analyzer.registry.add_recognizer(api_key_recognizer)

def detect_and_redact_pii(text: str) -> tuple[str, list]:
    """Use Presidio for comprehensive PII detection and redaction."""
    # Analyze text for PII
    results = analyzer.analyze(
        text=text,
        language="en",
        entities=[
            "PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER",
            "CREDIT_CARD", "US_SSN", "LOCATION",
            "IP_ADDRESS", "API_KEY",
        ],
        score_threshold=0.7,  # Confidence threshold
    )

    if not results:
        return text, []

    # Anonymize detected PII
    anonymized = anonymizer.anonymize(text=text, analyzer_results=results)

    findings = [
        {"type": r.entity_type, "score": r.score, "start": r.start, "end": r.end}
        for r in results
    ]

    return anonymized.text, findings


# Use as middleware before any LLM call
def safe_llm_call(messages: list[dict], **kwargs) -> str:
    """Redact PII from messages before sending to the LLM."""
    cleaned_messages = []
    all_findings = []

    for msg in messages:
        if msg["role"] == "user":
            redacted_content, findings = detect_and_redact_pii(msg["content"])
            cleaned_messages.append({**msg, "content": redacted_content})
            all_findings.extend(findings)
        else:
            cleaned_messages.append(msg)

    if all_findings:
        print(f"Warning: Redacted {len(all_findings)} PII instances before LLM call")

    # Now safe to send to the LLM
    response = client.chat.completions.create(messages=cleaned_messages, **kwargs)
    return response.choices[0].message.content

Output Filtering

The model’s output is untrusted. It might leak system prompt fragments, internal URLs, PII (real or hallucinated), or generate harmful content. Filter everything.

import re
import json
from dataclasses import dataclass

@dataclass
class OutputFilterResult:
    clean_text: str
    violations: list[str]
    was_modified: bool

class OutputFilter:
    """Filter LLM output for security and compliance."""

    def __init__(self):
        self.pii_detector = PIIDetector()  # From earlier

        # Patterns that should never appear in output
        self.blocked_patterns = {
            "internal_url": r"https?://(?:internal|staging|dev|localhost)[^\s]*",
            "system_prompt_leak": r"(?:system prompt|my instructions|I was told to|my guidelines say)",
            "api_key": r"\b(?:sk-|pk_|AKIA)[a-zA-Z0-9]{16,}\b",
            "internal_ip": r"\b(?:10\.\d{1,3}|172\.(?:1[6-9]|2\d|3[01])|192\.168)\.\d{1,3}\.\d{1,3}\b",
            "file_path": r"(?:/home/\w+|/var/|/etc/|C:\\Users\\)\S+",
            "connection_string": r"(?:mongodb|postgresql|mysql|redis)://[^\s]+",
        }

    def filter(self, text: str) -> OutputFilterResult:
        """Filter the LLM output for security violations."""
        violations = []
        filtered = text

        # Check for PII in output
        pii_matches = self.pii_detector.detect(filtered)
        if pii_matches:
            filtered, _ = self.pii_detector.redact(filtered)
            violations.append(
                f"PII detected in output: {[m.type for m in pii_matches]}"
            )

        # Check for blocked patterns
        for name, pattern in self.blocked_patterns.items():
            matches = re.findall(pattern, filtered, re.IGNORECASE)
            if matches:
                for match in matches:
                    filtered = filtered.replace(match, f"[{name.upper()}_REDACTED]")
                violations.append(f"Blocked pattern '{name}' found in output")

        return OutputFilterResult(
            clean_text=filtered,
            violations=violations,
            was_modified=len(violations) > 0,
        )

    def validate_structured_output(
        self, text: str, expected_format: str = "json"
    ) -> tuple[bool, str | None]:
        """Validate that structured output matches expected format."""
        if expected_format == "json":
            try:
                json.loads(text)
                return True, None
            except json.JSONDecodeError as e:
                return False, f"Invalid JSON: {e}"

        return True, None


# Usage
output_filter = OutputFilter()

def filtered_llm_call(messages: list[dict], **kwargs) -> str:
    """Make an LLM call with output filtering."""
    response = client.chat.completions.create(messages=messages, **kwargs)
    raw_output = response.choices[0].message.content

    result = output_filter.filter(raw_output)

    if result.violations:
        # Log violations for security review
        print(f"Output violations detected: {result.violations}")

    return result.clean_text

Data Privacy: What Goes to the Provider

Every message you send to OpenAI or Anthropic is transmitted to their servers. Understand what this means for your data.

What the Providers See

Data OpenAI API Anthropic API
User messages Yes (not used for training on API) Yes (not used for training on API)
System prompts Yes Yes
File uploads Yes Yes
Conversation history Whatever you send in messages Whatever you send in messages
Your API key usage Yes Yes

Key facts:

  • Both OpenAI and Anthropic state they do not train on API data by default
  • Data may be retained for up to 30 days for abuse monitoring (check current policies)
  • Enterprise agreements can reduce or eliminate retention
  • Some compliance frameworks (HIPAA, SOC 2) require specific data processing agreements

Practical Data Privacy Measures

class PrivacyAwareLLMClient:
    """Wrapper that enforces data privacy policies."""

    def __init__(self, client, privacy_config: dict):
        self.client = client
        self.config = privacy_config
        self.pii_detector = PIIDetector()

    def chat(self, messages: list[dict], **kwargs) -> str:
        # Enforce PII redaction if configured
        if self.config.get("redact_pii", True):
            messages = self._redact_pii_in_messages(messages)

        # Enforce data residency by selecting the right endpoint
        if self.config.get("region") == "eu":
            # Some providers offer EU endpoints
            pass

        # Log what we're sending (without the actual content)
        self._audit_log(messages, kwargs)

        response = self.client.chat.completions.create(
            messages=messages, **kwargs
        )
        return response.choices[0].message.content

    def _redact_pii_in_messages(self, messages: list[dict]) -> list[dict]:
        cleaned = []
        for msg in messages:
            redacted, findings = self.pii_detector.redact(msg["content"])
            if findings:
                print(f"Redacted {len(findings)} PII items from {msg['role']} message")
            cleaned.append({**msg, "content": redacted})
        return cleaned

    def _audit_log(self, messages: list[dict], kwargs: dict):
        """Log metadata about the request without logging content."""
        import tiktoken
        enc = tiktoken.encoding_for_model(kwargs.get("model", "gpt-4o"))

        log_entry = {
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ"),
            "model": kwargs.get("model"),
            "message_count": len(messages),
            "total_tokens_estimate": sum(
                len(enc.encode(m["content"])) for m in messages
            ),
            # Do NOT log message content
        }
        print(f"LLM audit: {json.dumps(log_entry)}")

GDPR and CCPA Considerations

If your users are in the EU or California, you need to address:

  1. Data processing agreements (DPA): Sign a DPA with your LLM provider before processing personal data.
  2. Right to deletion: Users can request deletion of their data. You need to be able to delete conversation histories, and understand what the provider retains.
  3. Data minimization: Only send the minimum data necessary to the LLM. Don’t include user IDs, email addresses, or names in the prompt unless essential.
  4. Transparency: Your privacy policy should disclose that user queries may be processed by third-party AI providers.
  5. Consent: In some cases, explicit consent may be required before processing user data through an LLM.

SQL Injection Through LLMs

This is one of the most dangerous patterns in LLM applications. If your app generates SQL from natural language and executes it, you have a SQL injection vulnerability with an AI-powered attack surface.

# EXTREMELY DANGEROUS — never do this
def natural_language_query(user_question: str) -> list:
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Convert the user's question to SQL for our users table."},
            {"role": "user", "content": user_question},
        ],
    )
    sql = response.choices[0].message.content
    return db.execute(sql)  # Direct execution of LLM-generated SQL!

# An attacker asks: "Show me all users; DROP TABLE users; --"
# Or more subtly: "Show me users, and also list all table names in the database"

Safe pattern: parameterized queries with allowlists.

from enum import Enum

class AllowedTable(Enum):
    USERS = "users"
    ORDERS = "orders"
    PRODUCTS = "products"

class AllowedColumn(Enum):
    USER_NAME = "name"
    USER_EMAIL = "email"
    ORDER_DATE = "order_date"
    ORDER_TOTAL = "total"
    PRODUCT_NAME = "product_name"
    PRODUCT_PRICE = "price"

def safe_natural_language_query(user_question: str) -> list:
    """Generate SQL safely using structured output and allowlists."""
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": """Convert the user's question to a query specification.
Return JSON with these fields:
- table: one of [users, orders, products]
- columns: list of column names from [name, email, order_date, total, product_name, price]
- where_conditions: list of {column, operator, value} where operator is one of [=, >, <, >=, <=, LIKE]
- order_by: optional column name
- limit: optional integer (max 100)

ONLY return the JSON, nothing else.""",
            },
            {"role": "user", "content": user_question},
        ],
        temperature=0,
        response_format={"type": "json_object"},
    )

    spec = json.loads(response.choices[0].message.content)

    # Validate every field against allowlists
    if spec["table"] not in [t.value for t in AllowedTable]:
        raise ValueError(f"Table not allowed: {spec['table']}")

    for col in spec.get("columns", []):
        if col not in [c.value for c in AllowedColumn]:
            raise ValueError(f"Column not allowed: {col}")

    # Build parameterized query
    columns = ", ".join(spec["columns"])
    query = f"SELECT {columns} FROM {spec['table']}"

    params = []
    if spec.get("where_conditions"):
        conditions = []
        for cond in spec["where_conditions"]:
            if cond["column"] not in [c.value for c in AllowedColumn]:
                raise ValueError(f"Where column not allowed: {cond['column']}")
            if cond["operator"] not in ("=", ">", "<", ">=", "<=", "LIKE"):
                raise ValueError(f"Operator not allowed: {cond['operator']}")
            conditions.append(f"{cond['column']} {cond['operator']} %s")
            params.append(cond["value"])
        query += " WHERE " + " AND ".join(conditions)

    limit = min(spec.get("limit", 100), 100)
    query += f" LIMIT {limit}"

    # Execute with parameterized query — safe from injection
    return db.execute(query, params)

Logging and Auditing

Log everything about LLM interactions for debugging, security review, and compliance. But be careful about what you log.

import logging
import hashlib

# Configure structured logging
logger = logging.getLogger("llm_audit")

class LLMAuditLogger:
    """Secure audit logging for LLM interactions."""

    def __init__(self, log_content: bool = False):
        # In production, log_content should almost always be False
        self.log_content = log_content

    def log_request(self, request_id: str, user_id: str, messages: list[dict],
                    model: str):
        """Log request metadata (not content by default)."""
        entry = {
            "event": "llm_request",
            "request_id": request_id,
            "user_id_hash": hashlib.sha256(user_id.encode()).hexdigest()[:16],
            "model": model,
            "message_count": len(messages),
            "roles": [m["role"] for m in messages],
            "input_char_count": sum(len(m["content"]) for m in messages),
        }

        if self.log_content:
            # Only in debug environments — NEVER in production
            entry["messages"] = messages

        logger.info(json.dumps(entry))

    def log_response(self, request_id: str, output_tokens: int,
                     input_tokens: int, cost: float,
                     violations: list[str] | None = None):
        """Log response metadata."""
        entry = {
            "event": "llm_response",
            "request_id": request_id,
            "input_tokens": input_tokens,
            "output_tokens": output_tokens,
            "cost_usd": cost,
        }

        if violations:
            entry["security_violations"] = violations
            logger.warning(json.dumps(entry))
        else:
            logger.info(json.dumps(entry))

    def log_blocked_request(self, request_id: str, user_id: str,
                            reason: str):
        """Log when a request is blocked by security filters."""
        entry = {
            "event": "llm_blocked",
            "request_id": request_id,
            "user_id_hash": hashlib.sha256(user_id.encode()).hexdigest()[:16],
            "reason": reason,
        }
        logger.warning(json.dumps(entry))

What to log: Request metadata (model, token counts, costs, timing), security violations, rate limit hits, PII detection events.

What NOT to log: User message content (PII risk), system prompts (intellectual property), API keys, full response content. If you must log content for debugging, use a separate short-retention log with restricted access.

Content Moderation

Use moderation APIs to flag harmful content in both input and output.

from openai import OpenAI

client = OpenAI()

def moderate_content(text: str) -> tuple[bool, dict]:
    """Check content against OpenAI's moderation API."""
    response = client.moderations.create(
        model="omni-moderation-latest",
        input=text,
    )

    result = response.results[0]

    if result.flagged:
        flagged_categories = {
            cat: score
            for cat, score in result.category_scores.__dict__.items()
            if getattr(result.categories, cat, False)
        }
        return False, {
            "flagged": True,
            "categories": flagged_categories,
        }

    return True, {"flagged": False}


def moderated_llm_call(messages: list[dict], **kwargs) -> str:
    """LLM call with content moderation on input and output."""
    # Moderate user input
    for msg in messages:
        if msg["role"] == "user":
            is_safe, details = moderate_content(msg["content"])
            if not is_safe:
                return f"I can't process this request. Content policy violation: {list(details['categories'].keys())}"

    # Get LLM response
    response = client.chat.completions.create(messages=messages, **kwargs)
    output = response.choices[0].message.content

    # Moderate output
    is_safe, details = moderate_content(output)
    if not is_safe:
        return "I generated a response that didn't meet our content policy. Please rephrase your question."

    return output

Defense-in-Depth Architecture

No single security measure is sufficient. Layer your defenses so that a failure at one layer is caught by the next.

User Request
    |
    v
[Layer 1: Rate Limiting]          -- Block abuse early
    |
    v
[Layer 2: Input Validation]       -- Length, encoding, structure
    |
    v
[Layer 3: PII Detection]          -- Redact sensitive data
    |
    v
[Layer 4: Content Moderation]     -- Block harmful input
    |
    v
[Layer 5: Prompt Injection Check] -- Detect manipulation attempts
    |
    v
[LLM Call]                        -- The actual model inference
    |
    v
[Layer 6: Output Filtering]       -- Redact leaked secrets, PII
    |
    v
[Layer 7: Content Moderation]     -- Block harmful output
    |
    v
[Layer 8: Structured Validation]  -- Verify output format
    |
    v
User Response

Here’s a middleware that implements this full pipeline:

from dataclasses import dataclass
import uuid

@dataclass
class SecurityContext:
    request_id: str
    user_id: str
    user_tier: str

class LLMSecurityMiddleware:
    """Complete security middleware for LLM applications."""

    def __init__(self, config: dict):
        self.rate_limiter = TokenRateLimiter(
            redis.Redis(),
            max_tokens_per_minute=config.get("max_tokens_per_minute", 50_000),
        )
        self.input_validator = InputValidator(
            max_input_tokens=config.get("max_input_tokens", 4000),
        )
        self.pii_detector = PIIDetector()
        self.output_filter = OutputFilter()
        self.audit_logger = LLMAuditLogger()

    def process(
        self, messages: list[dict], ctx: SecurityContext, **llm_kwargs
    ) -> str:
        """Process a request through the full security pipeline."""
        request_id = ctx.request_id or str(uuid.uuid4())

        # Layer 1: Rate limiting
        estimated_tokens = sum(len(m["content"]) // 4 for m in messages)
        allowed, info = self.rate_limiter.check_and_consume(
            ctx.user_id, estimated_tokens
        )
        if not allowed:
            self.audit_logger.log_blocked_request(
                request_id, ctx.user_id, f"rate_limit: {info['error']}"
            )
            raise RateLimitError(info)

        # Layer 2: Input validation
        is_valid, error = self.input_validator.validate(messages)
        if not is_valid:
            self.audit_logger.log_blocked_request(
                request_id, ctx.user_id, f"validation: {error}"
            )
            raise ValidationError(error)

        # Layer 3: PII redaction
        cleaned_messages = []
        for msg in messages:
            if msg["role"] == "user":
                redacted, findings = self.pii_detector.redact(msg["content"])
                cleaned_messages.append({**msg, "content": redacted})
            else:
                cleaned_messages.append(msg)

        # Layer 4: Content moderation (input)
        for msg in cleaned_messages:
            if msg["role"] == "user":
                is_safe, details = moderate_content(msg["content"])
                if not is_safe:
                    self.audit_logger.log_blocked_request(
                        request_id, ctx.user_id, f"moderation: {details}"
                    )
                    raise ContentPolicyError("Input violates content policy")

        # Log the request
        self.audit_logger.log_request(
            request_id, ctx.user_id, cleaned_messages,
            llm_kwargs.get("model", "unknown"),
        )

        # Make the LLM call
        response = client.chat.completions.create(
            messages=cleaned_messages, **llm_kwargs
        )
        raw_output = response.choices[0].message.content

        # Layer 6: Output filtering
        filter_result = self.output_filter.filter(raw_output)

        # Layer 7: Content moderation (output)
        is_safe, details = moderate_content(filter_result.clean_text)
        if not is_safe:
            self.audit_logger.log_response(
                request_id,
                response.usage.completion_tokens,
                response.usage.prompt_tokens,
                0.0,
                violations=["output_moderation_failed"],
            )
            return "I wasn't able to generate an appropriate response. Please try rephrasing."

        # Log the response
        self.audit_logger.log_response(
            request_id,
            response.usage.completion_tokens,
            response.usage.prompt_tokens,
            calculate_cost(
                llm_kwargs.get("model", "gpt-4o"),
                response.usage.prompt_tokens,
                response.usage.completion_tokens,
            ),
            violations=filter_result.violations if filter_result.was_modified else None,
        )

        return filter_result.clean_text


# Custom exceptions
class RateLimitError(Exception):
    pass

class ValidationError(Exception):
    pass

class ContentPolicyError(Exception):
    pass

Security Checklist for Launch

Before your LLM application goes to production, verify every item on this list:

API Keys and Secrets

  • No API keys in source code or git history
  • Keys stored in a secrets manager (not just env vars in production)
  • Separate keys per environment (dev/staging/prod)
  • Key rotation process documented and tested
  • Budget limits set on API provider dashboards

Rate Limiting

  • Per-user rate limits (requests AND tokens)
  • Per-IP rate limits for unauthenticated endpoints
  • Tiered limits based on user plan
  • Graceful degradation when limits are hit (not just 429 errors)

Input Security

  • Maximum input length enforced (characters and tokens)
  • PII detection and redaction active
  • Content moderation on user input
  • Unicode and encoding attacks handled
  • Prompt injection defenses in place (see next lesson)

Output Security

  • Output filtering for PII, secrets, and internal URLs
  • Content moderation on LLM output
  • Structured output validation where applicable
  • LLM-generated code/SQL never executed directly

Data Privacy

  • Data processing agreement signed with LLM provider
  • Privacy policy updated to disclose AI processing
  • User data minimized in prompts
  • Audit logging active (without logging content)
  • Data retention policies defined and enforced

Monitoring

  • Cost alerts configured
  • Security violation alerts configured
  • Rate limit hit alerts configured
  • Audit logs shipped to central logging system
  • Incident response plan for security breaches

Key Takeaways

  1. Treat the LLM as untrusted. It processes arbitrary input and produces unpredictable output. Never execute its output directly, never trust it to be safe, and never assume it follows your instructions.

  2. Protect both sides. Input security prevents sensitive data from reaching the provider and blocks manipulation attempts. Output security catches leaked secrets, PII, and policy violations before they reach the user.

  3. API keys are your most expensive credentials. Use a secrets manager, rotate regularly, set budget limits, and use separate keys per environment. A leaked key can cost you thousands in minutes.

  4. Rate limit on tokens, not just requests. A single 100K-token request costs more than a thousand simple ones. Token-based rate limiting prevents denial-of-wallet attacks.

  5. PII will end up in your prompts. Users paste credit card numbers, SSNs, and passwords. Detect and redact automatically before sending anything to the provider.

  6. Never execute LLM-generated SQL or code directly. Use structured output, validate against allowlists, and parameterize queries. The LLM can be tricked into generating malicious payloads.

  7. Layer your defenses. No single security measure is reliable. Rate limiting, input validation, PII detection, content moderation, output filtering, and audit logging all work together. Each layer catches what the others miss.

  8. Log metadata, not content. Track token counts, costs, model usage, and security violations. Avoid logging user messages or model responses in production — that creates a data privacy liability.