arrow_backBACK TO LLM ENGINEERING IN PRODUCTION
Lesson 05LLM Engineering in Production16 min read

Building a RAG Pipeline from Scratch

April 01, 2026

TL;DR

RAG = retrieve relevant docs → stuff them into the prompt → let the LLM answer with context. Build it in four stages: ingest (load + chunk + embed + store), retrieve (embed query + similarity search), augment (build prompt with context), generate (call LLM). Start simple with ChromaDB, add reranking and hybrid search once you have evaluation metrics. The retrieval step matters more than the generation step.

RAG is the most important pattern in production AI. Not because it is clever, but because it solves the fundamental limitation of LLMs: they do not know your data. Your internal docs, product catalog, support tickets, legal contracts, customer conversations — none of that is in the model’s training set. RAG bridges this gap by retrieving relevant documents and injecting them into the prompt as context. The model generates answers grounded in your actual data instead of hallucinating.

This lesson builds a complete RAG pipeline from scratch — document loading, chunking, embedding, storage, retrieval, augmentation, and generation. Every piece is production code you can deploy. We will use ChromaDB for simplicity here; Lesson 6 covers choosing between vector databases for production.

RAG Architecture Overview

The pipeline has two phases:

Ingestion (offline, runs once or on schedule):

Documents → Load → Chunk → Embed → Store in Vector DB

Query (online, runs per user request):

User Query → Embed → Search Vector DB → Rank Results → Build Prompt → LLM → Response

The ingestion phase is a batch job. You run it when your documents change. The query phase is a real-time API call. Optimizing the query phase is what matters for user experience.

Why RAG Beats Fine-Tuning

Factor RAG Fine-Tuning
Setup time Hours Days to weeks
Data updates Instant (re-index) Re-train required
Cost Embedding + storage + per-query retrieval Training compute + hosting
Accuracy on your data High (with good retrieval) High (if enough training data)
Hallucination control Good (model sees the source) Poor (no source attribution)
Interpretability Can show source documents Black box
Maintenance Update index when docs change Retrain periodically

Fine-tuning is the right choice when you need to change the model’s behavior (writing style, domain vocabulary, output format). RAG is the right choice when you need to give the model access to knowledge. Most production use cases need knowledge, not behavior changes.

Stage 1: Document Loading

Before you can build a pipeline, you need to load documents. Here are practical loaders for the most common formats.

PDF Loading

import fitz  # PyMuPDF — pip install pymupdf

def load_pdf(file_path: str) -> list[dict]:
    """Load a PDF and return pages as documents with metadata."""
    documents = []
    doc = fitz.open(file_path)
    
    for page_num in range(len(doc)):
        page = doc[page_num]
        text = page.get_text("text")
        
        if not text.strip():
            continue
        
        documents.append({
            "content": text.strip(),
            "metadata": {
                "source": file_path,
                "page": page_num + 1,
                "total_pages": len(doc),
            },
        })
    
    doc.close()
    return documents


# Usage
pages = load_pdf("company-handbook.pdf")
print(f"Loaded {len(pages)} pages")
print(f"First page preview: {pages[0]['content'][:200]}")

Markdown Loading

import os
import re

def load_markdown(file_path: str) -> list[dict]:
    """Load a markdown file, split by headers for natural sections."""
    with open(file_path, "r", encoding="utf-8") as f:
        content = f.read()
    
    # Split on headers (##, ###, etc.)
    sections = re.split(r'\n(#{1,4}\s+.+)\n', content)
    
    documents = []
    current_header = os.path.basename(file_path)
    
    for i, section in enumerate(sections):
        if re.match(r'^#{1,4}\s+', section):
            current_header = section.strip("# ").strip()
            continue
        
        text = section.strip()
        if not text or len(text) < 50:  # Skip very short sections
            continue
        
        documents.append({
            "content": text,
            "metadata": {
                "source": file_path,
                "section": current_header,
                "section_index": i,
            },
        })
    
    return documents

HTML Loading

from bs4 import BeautifulSoup  # pip install beautifulsoup4

def load_html(file_path: str, selector: str = "article") -> list[dict]:
    """Load an HTML file, extracting text from a specific element."""
    with open(file_path, "r", encoding="utf-8") as f:
        soup = BeautifulSoup(f.read(), "html.parser")
    
    # Remove scripts and styles
    for tag in soup(["script", "style", "nav", "footer"]):
        tag.decompose()
    
    # Try to find the main content area
    main_content = soup.select_one(selector)
    if not main_content:
        main_content = soup.body or soup
    
    text = main_content.get_text(separator="\n", strip=True)
    
    return [{
        "content": text,
        "metadata": {
            "source": file_path,
            "title": soup.title.string if soup.title else None,
        },
    }]

CSV Loading

import csv

def load_csv(file_path: str, content_columns: list[str], id_column: str = None) -> list[dict]:
    """Load a CSV file, combining specified columns into document content."""
    documents = []
    
    with open(file_path, "r", encoding="utf-8") as f:
        reader = csv.DictReader(f)
        
        for i, row in enumerate(reader):
            # Combine specified columns into content
            parts = []
            for col in content_columns:
                if col in row and row[col]:
                    parts.append(f"{col}: {row[col]}")
            
            content = "\n".join(parts)
            if not content.strip():
                continue
            
            doc_id = row.get(id_column, str(i)) if id_column else str(i)
            
            documents.append({
                "content": content,
                "metadata": {
                    "source": file_path,
                    "row_id": doc_id,
                    **{k: v for k, v in row.items() if k not in content_columns},
                },
            })
    
    return documents


# Usage: load product FAQs
docs = load_csv(
    "faqs.csv",
    content_columns=["question", "answer"],
    id_column="faq_id",
)

Universal Loader

from pathlib import Path

def load_documents(path: str) -> list[dict]:
    """Load documents from a file or directory, auto-detecting format."""
    p = Path(path)
    
    if p.is_file():
        return _load_single_file(p)
    
    if p.is_dir():
        documents = []
        for file_path in sorted(p.rglob("*")):
            if file_path.is_file() and file_path.suffix in LOADERS:
                try:
                    docs = _load_single_file(file_path)
                    documents.extend(docs)
                except Exception as e:
                    print(f"Warning: failed to load {file_path}: {e}")
        return documents
    
    raise FileNotFoundError(f"Path not found: {path}")

LOADERS = {
    ".pdf": load_pdf,
    ".md": load_markdown,
    ".html": load_html,
    ".htm": load_html,
}

def _load_single_file(path: Path) -> list[dict]:
    loader = LOADERS.get(path.suffix)
    if not loader:
        # Fallback: read as plain text
        with open(path, "r", encoding="utf-8") as f:
            return [{"content": f.read(), "metadata": {"source": str(path)}}]
    return loader(str(path))

Stage 2: Chunking

Documents are too long to embed and retrieve as single units. You need to split them into chunks — small enough to be relevant, large enough to be useful.

Lesson 7 covers chunking strategies in depth. Here is a production-ready recursive chunker to get you started:

def recursive_chunk(
    text: str,
    chunk_size: int = 512,
    chunk_overlap: int = 50,
    separators: list[str] = None,
) -> list[str]:
    """Split text into overlapping chunks using recursive character splitting.
    
    Tries to split on natural boundaries (paragraphs, sentences, words)
    before falling back to character-level splitting.
    """
    if separators is None:
        separators = ["\n\n", "\n", ". ", " ", ""]
    
    if len(text) <= chunk_size:
        return [text] if text.strip() else []
    
    # Find the best separator that creates meaningful splits
    for sep in separators:
        if sep and sep in text:
            parts = text.split(sep)
            break
    else:
        # No separator works — split by characters
        chunks = []
        for i in range(0, len(text), chunk_size - chunk_overlap):
            chunk = text[i:i + chunk_size]
            if chunk.strip():
                chunks.append(chunk.strip())
            if i + chunk_size >= len(text):
                break
        return chunks
    
    # Merge parts into chunks that respect the size limit
    chunks = []
    current = ""
    
    for part in parts:
        test = f"{current}{sep}{part}" if current else part
        
        if len(test) <= chunk_size:
            current = test
        else:
            if current.strip():
                chunks.append(current.strip())
            
            if len(part) > chunk_size:
                # Recurse on oversized parts
                sub_chunks = recursive_chunk(
                    part, chunk_size, chunk_overlap,
                    separators[separators.index(sep) + 1:]
                )
                chunks.extend(sub_chunks)
                current = ""
            else:
                current = part
    
    if current.strip():
        chunks.append(current.strip())
    
    # Add overlap between chunks
    if chunk_overlap > 0 and len(chunks) > 1:
        overlapped = [chunks[0]]
        for i in range(1, len(chunks)):
            prev_end = chunks[i - 1][-chunk_overlap:]
            overlapped.append(prev_end + " " + chunks[i])
        chunks = overlapped
    
    return chunks


def chunk_documents(documents: list[dict], chunk_size: int = 512) -> list[dict]:
    """Chunk a list of documents, preserving metadata."""
    chunked = []
    
    for doc in documents:
        chunks = recursive_chunk(doc["content"], chunk_size=chunk_size)
        
        for i, chunk in enumerate(chunks):
            chunked.append({
                "content": chunk,
                "metadata": {
                    **doc["metadata"],
                    "chunk_index": i,
                    "total_chunks": len(chunks),
                },
            })
    
    return chunked


# Usage
raw_docs = load_documents("./knowledge_base/")
chunks = chunk_documents(raw_docs, chunk_size=512)
print(f"Created {len(chunks)} chunks from {len(raw_docs)} documents")

Chunk Size Guidelines

Chunk Size Good For Problems
128-256 tokens Precise retrieval, FAQ matching Loses context, many chunks to store
256-512 tokens General-purpose, most RAG systems Good default balance
512-1024 tokens Technical docs, legal documents Less precise retrieval
1024+ tokens Long-form analysis, narratives Poor retrieval precision, expensive

Start with 512 tokens, 50-token overlap. Adjust based on your evaluation metrics.

Stage 3: Embedding

Embeddings convert text into vectors — lists of numbers that represent meaning. Similar texts produce similar vectors. This is what enables semantic search.

Embedding Model Comparison

Model Dimensions Speed Quality Cost
OpenAI text-embedding-3-small 1536 Fast Good $0.02/1M tokens
OpenAI text-embedding-3-large 3072 Fast Better $0.13/1M tokens
Cohere embed-english-v3.0 1024 Fast Good $0.10/1M tokens
sentence-transformers/all-MiniLM-L6-v2 384 Local, fast Decent Free
BAAI/bge-large-en-v1.5 1024 Local, slow Very good Free

For production, start with text-embedding-3-small. It is cheap, fast, and good enough. Switch to a local model if you have data privacy requirements or want to eliminate API costs.

Embedding Implementation

from openai import OpenAI

client = OpenAI()

def embed_texts(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
    """Embed a batch of texts using OpenAI's API.
    
    Handles batching for large inputs (API limit: 2048 texts per call).
    """
    all_embeddings = []
    batch_size = 2048
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        
        response = client.embeddings.create(
            model=model,
            input=batch,
        )
        
        batch_embeddings = [item.embedding for item in response.data]
        all_embeddings.extend(batch_embeddings)
    
    return all_embeddings


def embed_with_retry(
    texts: list[str],
    model: str = "text-embedding-3-small",
    max_retries: int = 3,
) -> list[list[float]]:
    """Embed texts with retry logic for production use."""
    import time
    
    for attempt in range(max_retries):
        try:
            return embed_texts(texts, model)
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            
            wait_time = 2 ** attempt  # Exponential backoff
            print(f"Embedding failed (attempt {attempt + 1}): {e}. Retrying in {wait_time}s...")
            time.sleep(wait_time)

Local Embeddings with sentence-transformers

from sentence_transformers import SentenceTransformer

# Load model once (downloads on first use, ~90MB for MiniLM)
model = SentenceTransformer("all-MiniLM-L6-v2")

def embed_texts_local(texts: list[str]) -> list[list[float]]:
    """Embed texts locally — no API calls, no cost, full privacy."""
    embeddings = model.encode(texts, show_progress_bar=True)
    return embeddings.tolist()


# Usage
chunks = ["How to reset password", "Billing FAQ", "API rate limits"]
vectors = embed_texts_local(chunks)
print(f"Embedded {len(vectors)} chunks, dimension: {len(vectors[0])}")
# Embedded 3 chunks, dimension: 384

Stage 4: Vector Storage with ChromaDB

ChromaDB is the simplest vector database to start with. Zero configuration, runs in-process, supports persistence. Perfect for prototyping and small to medium production workloads.

pip install chromadb

Complete Ingestion Pipeline

import chromadb
from chromadb.utils import embedding_functions

# Initialize ChromaDB with persistence
chroma_client = chromadb.PersistentClient(path="./chroma_db")

# Use OpenAI embeddings (or switch to a local model)
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
    api_key="your-openai-api-key",
    model_name="text-embedding-3-small",
)

# Create or get a collection
collection = chroma_client.get_or_create_collection(
    name="knowledge_base",
    embedding_function=openai_ef,
    metadata={"hnsw:space": "cosine"},  # Use cosine similarity
)

def ingest_documents(documents_path: str):
    """Full ingestion pipeline: load → chunk → embed → store."""
    
    # Step 1: Load documents
    raw_docs = load_documents(documents_path)
    print(f"Loaded {len(raw_docs)} documents")
    
    # Step 2: Chunk documents
    chunks = chunk_documents(raw_docs, chunk_size=512)
    print(f"Created {len(chunks)} chunks")
    
    # Step 3: Store in ChromaDB (embedding happens automatically)
    batch_size = 100
    
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        
        collection.add(
            ids=[f"chunk_{i+j}" for j in range(len(batch))],
            documents=[chunk["content"] for chunk in batch],
            metadatas=[chunk["metadata"] for chunk in batch],
        )
        
        print(f"Stored batch {i // batch_size + 1} ({len(batch)} chunks)")
    
    print(f"Ingestion complete. Collection has {collection.count()} documents.")


# Run ingestion
ingest_documents("./knowledge_base/")

Querying ChromaDB

def search(query: str, n_results: int = 5, where: dict = None) -> list[dict]:
    """Search the vector database for relevant chunks."""
    
    search_params = {
        "query_texts": [query],
        "n_results": n_results,
    }
    
    if where:
        search_params["where"] = where
    
    results = collection.query(**search_params)
    
    # Flatten results into a list of documents
    documents = []
    for i in range(len(results["documents"][0])):
        documents.append({
            "content": results["documents"][0][i],
            "metadata": results["metadatas"][0][i],
            "distance": results["distances"][0][i],
        })
    
    return documents


# Simple search
results = search("How do I reset my password?")
for r in results:
    print(f"[{r['distance']:.3f}] {r['content'][:100]}...")

# Filtered search (only search specific sources)
results = search(
    "refund policy",
    where={"source": "policies/refund-policy.md"},
)

The Query Pipeline: Retrieval + Augmentation + Generation

Now we wire everything together. This is the core of your RAG system.

import json
from openai import OpenAI

client = OpenAI()

class RAGPipeline:
    """Complete RAG pipeline: retrieve → augment → generate."""
    
    def __init__(self, collection, model: str = "gpt-4o"):
        self.collection = collection
        self.model = model
    
    def query(
        self,
        question: str,
        n_results: int = 5,
        where: dict = None,
    ) -> dict:
        """Answer a question using RAG."""
        
        # Step 1: Retrieve relevant chunks
        retrieval_results = self._retrieve(question, n_results, where)
        
        if not retrieval_results:
            return {
                "answer": "I couldn't find any relevant information to answer your question.",
                "sources": [],
                "context_used": 0,
            }
        
        # Step 2: Build augmented prompt
        prompt = self._build_prompt(question, retrieval_results)
        
        # Step 3: Generate answer
        answer = self._generate(prompt)
        
        return {
            "answer": answer,
            "sources": [
                {
                    "content": r["content"][:200] + "...",
                    "metadata": r["metadata"],
                    "relevance_score": 1 - r["distance"],
                }
                for r in retrieval_results
            ],
            "context_used": len(retrieval_results),
        }
    
    def _retrieve(self, query: str, n_results: int, where: dict = None) -> list[dict]:
        """Retrieve relevant documents from the vector store."""
        search_params = {
            "query_texts": [query],
            "n_results": n_results,
        }
        if where:
            search_params["where"] = where
        
        results = self.collection.query(**search_params)
        
        documents = []
        for i in range(len(results["documents"][0])):
            doc = {
                "content": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "distance": results["distances"][0][i],
            }
            
            # Filter out low-relevance results
            if doc["distance"] < 1.5:  # Cosine distance threshold
                documents.append(doc)
        
        return documents
    
    def _build_prompt(self, question: str, context_docs: list[dict]) -> list[dict]:
        """Build the augmented prompt with retrieved context."""
        
        # Format context with source attribution
        context_parts = []
        for i, doc in enumerate(context_docs, 1):
            source = doc["metadata"].get("source", "unknown")
            section = doc["metadata"].get("section", "")
            source_label = f"{source}"
            if section:
                source_label += f" > {section}"
            
            context_parts.append(f"[Source {i}: {source_label}]\n{doc['content']}")
        
        context_text = "\n\n---\n\n".join(context_parts)
        
        system_prompt = """You are a helpful assistant that answers questions based on the provided context.

Rules:
1. Only use information from the provided context to answer questions.
2. If the context doesn't contain enough information, say so clearly.
3. Cite your sources by referencing [Source N] when using information from a specific document.
4. Be concise and direct. Don't repeat the question.
5. If multiple sources provide conflicting information, note the discrepancy."""
        
        user_prompt = f"""Context:
{context_text}

---

Question: {question}

Answer based on the context above:"""
        
        return [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ]
    
    def _generate(self, messages: list[dict]) -> str:
        """Generate an answer using the LLM."""
        try:
            response = client.chat.completions.create(
                model=self.model,
                messages=messages,
                temperature=0,
                max_tokens=1024,
            )
            return response.choices[0].message.content
        except Exception as e:
            return f"Error generating response: {e}"


# Usage
rag = RAGPipeline(collection=collection)
result = rag.query("What is our refund policy for enterprise customers?")

print(result["answer"])
print(f"\n--- Sources ({result['context_used']} documents used) ---")
for source in result["sources"]:
    print(f"  [{source['relevance_score']:.2f}] {source['metadata'].get('source', 'unknown')}")

Prompt Templates for RAG

The prompt template is where most RAG systems succeed or fail. Here are battle-tested templates for different use cases.

Grounded Q&A (Strict — No Hallucination)

STRICT_QA_TEMPLATE = """You answer questions based ONLY on the provided documents.

RULES:
- If the answer is in the documents, provide it with source citations.
- If the answer is NOT in the documents, respond with: 
  "I don't have enough information in the provided documents to answer this question."
- NEVER make up information or use knowledge outside the provided documents.
- Quote directly from the documents when possible.

DOCUMENTS:
{context}

QUESTION: {question}

ANSWER:"""

Conversational RAG (Friendly, With Citations)

CONVERSATIONAL_TEMPLATE = """You are a helpful support agent. Use the knowledge base 
articles below to answer the customer's question.

If the articles contain the answer, explain it clearly and cite which article 
you used like this: (Source: article-name).

If the articles don't cover the topic, let the customer know and suggest 
they contact [email protected] for further help.

Knowledge Base Articles:
{context}

Customer Question: {question}

Your Response:"""

Analytical RAG (Compare, Synthesize)

ANALYTICAL_TEMPLATE = """You are a research analyst. Synthesize information from 
multiple sources to provide a comprehensive answer.

For each claim you make:
1. Cite the source(s) that support it
2. Note if sources disagree
3. Indicate your confidence level (high/medium/low)

Sources:
{context}

Research Question: {question}

Analysis:"""

Error Handling for Production

A production RAG system must handle failures gracefully. Here are the common failure modes and how to handle them.

class ProductionRAGPipeline(RAGPipeline):
    """RAG pipeline with production error handling."""
    
    def query(self, question: str, n_results: int = 5, where: dict = None) -> dict:
        """Query with comprehensive error handling."""
        
        # Validate input
        if not question or not question.strip():
            return self._error_response("Empty question provided")
        
        if len(question) > 10000:
            return self._error_response("Question too long (max 10,000 characters)")
        
        try:
            # Retrieve
            retrieval_results = self._retrieve(question, n_results, where)
        except Exception as e:
            return self._error_response(f"Retrieval failed: {e}", fallback=True)
        
        # Handle empty results
        if not retrieval_results:
            return {
                "answer": (
                    "I couldn't find any relevant documents for your question. "
                    "Try rephrasing or broadening your query."
                ),
                "sources": [],
                "context_used": 0,
                "status": "no_results",
            }
        
        # Check context window limits
        total_context_length = sum(len(doc["content"]) for doc in retrieval_results)
        max_context_chars = 30000  # ~7500 tokens, safe for most models
        
        if total_context_length > max_context_chars:
            # Truncate results to fit
            truncated = []
            running_length = 0
            for doc in retrieval_results:
                if running_length + len(doc["content"]) > max_context_chars:
                    break
                truncated.append(doc)
                running_length += len(doc["content"])
            retrieval_results = truncated
        
        # Build prompt and generate
        try:
            prompt = self._build_prompt(question, retrieval_results)
            answer = self._generate(prompt)
        except Exception as e:
            return self._error_response(
                f"Generation failed: {e}",
                sources=retrieval_results,
            )
        
        return {
            "answer": answer,
            "sources": [
                {
                    "content": r["content"][:200],
                    "metadata": r["metadata"],
                    "relevance_score": round(1 - r["distance"], 3),
                }
                for r in retrieval_results
            ],
            "context_used": len(retrieval_results),
            "status": "success",
        }
    
    def _error_response(
        self, error: str, fallback: bool = False, sources: list = None
    ) -> dict:
        """Generate a structured error response."""
        response = {
            "answer": "I'm having trouble answering your question right now. Please try again.",
            "sources": [],
            "context_used": 0,
            "status": "error",
            "error": error,
        }
        
        if fallback:
            # Could fall back to a direct LLM call without RAG
            response["answer"] = (
                "I'm unable to search the knowledge base right now. "
                "Please try again in a moment."
            )
        
        return response

Complete End-to-End Working Code

Here are two scripts you can run today: one for ingestion, one for querying.

Script 1: ingest.py

"""
Ingest documents into the RAG pipeline.

Usage:
    python ingest.py ./documents/

Requirements:
    pip install chromadb openai pymupdf beautifulsoup4
"""
import sys
import os
import re
from pathlib import Path
import fitz
import chromadb
from chromadb.utils import embedding_functions


def load_pdf(file_path: str) -> list[dict]:
    docs = []
    doc = fitz.open(file_path)
    for i in range(len(doc)):
        text = doc[i].get_text("text").strip()
        if text:
            docs.append({"content": text, "metadata": {"source": file_path, "page": i + 1}})
    doc.close()
    return docs


def load_text(file_path: str) -> list[dict]:
    with open(file_path, "r", encoding="utf-8") as f:
        return [{"content": f.read(), "metadata": {"source": file_path}}]


def load_documents(path: str) -> list[dict]:
    p = Path(path)
    loaders = {".pdf": load_pdf, ".md": load_text, ".txt": load_text}
    
    if p.is_file():
        loader = loaders.get(p.suffix, load_text)
        return loader(str(p))
    
    docs = []
    for fp in sorted(p.rglob("*")):
        if fp.is_file() and fp.suffix in loaders:
            try:
                docs.extend(loaders[fp.suffix](str(fp)))
            except Exception as e:
                print(f"  Warning: failed {fp}: {e}")
    return docs


def recursive_chunk(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
    if len(text) <= chunk_size:
        return [text] if text.strip() else []
    
    separators = ["\n\n", "\n", ". ", " "]
    for sep in separators:
        if sep in text:
            parts = text.split(sep)
            break
    else:
        return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size - overlap)]
    
    chunks, current = [], ""
    for part in parts:
        candidate = f"{current}{sep}{part}" if current else part
        if len(candidate) <= chunk_size:
            current = candidate
        else:
            if current.strip():
                chunks.append(current.strip())
            current = part if len(part) <= chunk_size else part[:chunk_size]
    
    if current.strip():
        chunks.append(current.strip())
    
    return chunks


def main():
    if len(sys.argv) < 2:
        print("Usage: python ingest.py <documents_path>")
        sys.exit(1)
    
    documents_path = sys.argv[1]
    
    # Load
    print(f"Loading documents from {documents_path}...")
    raw_docs = load_documents(documents_path)
    print(f"  Loaded {len(raw_docs)} documents")
    
    # Chunk
    chunks = []
    for doc in raw_docs:
        for i, chunk_text in enumerate(recursive_chunk(doc["content"])):
            chunks.append({
                "content": chunk_text,
                "metadata": {**doc["metadata"], "chunk_index": i},
            })
    print(f"  Created {len(chunks)} chunks")
    
    # Store
    print("Storing in ChromaDB...")
    db = chromadb.PersistentClient(path="./chroma_db")
    openai_ef = embedding_functions.OpenAIEmbeddingFunction(
        api_key=os.environ["OPENAI_API_KEY"],
        model_name="text-embedding-3-small",
    )
    collection = db.get_or_create_collection(
        name="knowledge_base",
        embedding_function=openai_ef,
    )
    
    batch_size = 100
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        collection.add(
            ids=[f"chunk_{i+j}" for j in range(len(batch))],
            documents=[c["content"] for c in batch],
            metadatas=[c["metadata"] for c in batch],
        )
        print(f"  Stored batch {i // batch_size + 1}")
    
    print(f"Done. Collection has {collection.count()} chunks.")


if __name__ == "__main__":
    main()

Script 2: query.py

"""
Query the RAG pipeline.

Usage:
    python query.py "What is our refund policy?"

Requirements:
    pip install chromadb openai
"""
import sys
import os
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI


def main():
    if len(sys.argv) < 2:
        print("Usage: python query.py <question>")
        sys.exit(1)
    
    question = sys.argv[1]
    
    # Connect to ChromaDB
    db = chromadb.PersistentClient(path="./chroma_db")
    openai_ef = embedding_functions.OpenAIEmbeddingFunction(
        api_key=os.environ["OPENAI_API_KEY"],
        model_name="text-embedding-3-small",
    )
    collection = db.get_collection(
        name="knowledge_base",
        embedding_function=openai_ef,
    )
    
    # Retrieve
    results = collection.query(query_texts=[question], n_results=5)
    
    if not results["documents"][0]:
        print("No relevant documents found.")
        return
    
    # Build context
    context_parts = []
    for i, (doc, meta) in enumerate(
        zip(results["documents"][0], results["metadatas"][0]), 1
    ):
        source = meta.get("source", "unknown")
        context_parts.append(f"[Source {i}: {source}]\n{doc}")
    
    context = "\n\n---\n\n".join(context_parts)
    
    # Generate
    client = OpenAI()
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {
                "role": "system",
                "content": (
                    "Answer based ONLY on the provided context. "
                    "Cite sources as [Source N]. If the context doesn't "
                    "contain the answer, say so."
                ),
            },
            {
                "role": "user",
                "content": f"Context:\n{context}\n\nQuestion: {question}",
            },
        ],
        temperature=0,
        max_tokens=1024,
    )
    
    answer = response.choices[0].message.content
    
    print(f"\nQuestion: {question}")
    print(f"\nAnswer: {answer}")
    print(f"\n--- Sources ---")
    for i, meta in enumerate(results["metadatas"][0], 1):
        score = 1 - results["distances"][0][i - 1]
        print(f"  [{i}] {meta.get('source', 'unknown')} (relevance: {score:.2f})")


if __name__ == "__main__":
    main()

Testing Your RAG Pipeline

Before you automate anything, test manually. Here is a simple evaluation script:

def evaluate_rag(pipeline, test_cases: list[dict]) -> dict:
    """Run test cases against your RAG pipeline and score results.
    
    Each test case: {"question": "...", "expected_keywords": [...], "expected_source": "..."}
    """
    results = []
    
    for test in test_cases:
        response = pipeline.query(test["question"])
        answer = response["answer"].lower()
        
        # Check if expected keywords appear in the answer
        keyword_hits = sum(
            1 for kw in test.get("expected_keywords", [])
            if kw.lower() in answer
        )
        keyword_score = (
            keyword_hits / len(test["expected_keywords"])
            if test.get("expected_keywords")
            else 0
        )
        
        # Check if the expected source was retrieved
        source_hit = any(
            test.get("expected_source", "") in s["metadata"].get("source", "")
            for s in response.get("sources", [])
        )
        
        results.append({
            "question": test["question"],
            "keyword_score": keyword_score,
            "source_hit": source_hit,
            "answer_preview": response["answer"][:200],
        })
    
    # Aggregate metrics
    avg_keyword_score = sum(r["keyword_score"] for r in results) / len(results)
    source_hit_rate = sum(1 for r in results if r["source_hit"]) / len(results)
    
    return {
        "avg_keyword_score": round(avg_keyword_score, 3),
        "source_hit_rate": round(source_hit_rate, 3),
        "total_tests": len(results),
        "details": results,
    }


# Define test cases
test_cases = [
    {
        "question": "What is our refund policy?",
        "expected_keywords": ["30 days", "full refund", "enterprise"],
        "expected_source": "refund-policy",
    },
    {
        "question": "How do I reset my API key?",
        "expected_keywords": ["settings", "regenerate", "api key"],
        "expected_source": "api-docs",
    },
    {
        "question": "What are the rate limits?",
        "expected_keywords": ["requests", "per minute", "limit"],
        "expected_source": "api-docs",
    },
]

# Run evaluation
metrics = evaluate_rag(rag, test_cases)
print(f"Keyword Score: {metrics['avg_keyword_score']:.1%}")
print(f"Source Hit Rate: {metrics['source_hit_rate']:.1%}")

When RAG Is Not the Answer

RAG is not a universal solution. Sometimes other approaches work better.

Context Stuffing

If your entire knowledge base fits in the context window (under ~100K tokens for modern models), just stuff it all in. No vector DB, no retrieval, no chunking. Simpler and often more accurate.

def context_stuffing_query(question: str, knowledge_base: str) -> str:
    """When your KB is small enough, just put it all in the prompt."""
    
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": f"Use this knowledge base:\n\n{knowledge_base}"},
            {"role": "user", "content": question},
        ],
        temperature=0,
    )
    return response.choices[0].message.content


# If your KB is under 50K tokens, this is simpler and often better
with open("small_knowledge_base.md") as f:
    kb = f.read()

answer = context_stuffing_query("What is the refund policy?", kb)

Fine-Tuning

When you need to change how the model behaves — writing style, domain terminology, consistent formatting — fine-tuning is better than RAG. RAG gives the model information; fine-tuning changes the model itself.

Hybrid: RAG + Fine-Tuning

For some use cases, you want both: a fine-tuned model that understands your domain vocabulary and writing style, augmented with RAG for up-to-date facts. This is the most powerful (and most expensive) approach.

Key Takeaways

  1. RAG has four stages: ingest, retrieve, augment, generate. Get each stage working independently before wiring them together. Most bugs live in ingestion and retrieval, not generation.

  2. Retrieval quality matters more than generation quality. If you retrieve the wrong chunks, the best model in the world cannot give you a good answer. Invest 80% of your optimization effort in retrieval.

  3. Start with ChromaDB and text-embedding-3-small. Both are simple, cheap, and good enough. You can always swap them out later. Do not over-engineer your first version.

  4. Chunk size of 512 tokens with 50-token overlap is a good default. Adjust based on your evaluation metrics. See Lesson 7 for a deep dive on chunking strategies.

  5. Your prompt template makes or breaks the system. Use grounding instructions (“only use the provided context”), require citations, and tell the model what to do when it does not have enough information.

  6. Test manually before automating. Build 10-20 test cases that cover your expected queries, edge cases, and failure modes. Run them by hand. Fix the failures. Then build your evaluation pipeline.

  7. Handle errors at every stage. Empty results, context overflow, embedding API failures, and LLM errors all happen in production. Build fallback responses for each failure mode.

  8. If your knowledge base is small, consider context stuffing first. No vector DB, no embeddings, no retrieval. If it fits in the context window, it is simpler and often more accurate than RAG.