RAG is the most important pattern in production AI. Not because it is clever, but because it solves the fundamental limitation of LLMs: they do not know your data. Your internal docs, product catalog, support tickets, legal contracts, customer conversations — none of that is in the model’s training set. RAG bridges this gap by retrieving relevant documents and injecting them into the prompt as context. The model generates answers grounded in your actual data instead of hallucinating.
This lesson builds a complete RAG pipeline from scratch — document loading, chunking, embedding, storage, retrieval, augmentation, and generation. Every piece is production code you can deploy. We will use ChromaDB for simplicity here; Lesson 6 covers choosing between vector databases for production.
RAG Architecture Overview
The pipeline has two phases:
Ingestion (offline, runs once or on schedule):
Documents → Load → Chunk → Embed → Store in Vector DBQuery (online, runs per user request):
User Query → Embed → Search Vector DB → Rank Results → Build Prompt → LLM → ResponseThe ingestion phase is a batch job. You run it when your documents change. The query phase is a real-time API call. Optimizing the query phase is what matters for user experience.
Why RAG Beats Fine-Tuning
| Factor | RAG | Fine-Tuning |
|---|---|---|
| Setup time | Hours | Days to weeks |
| Data updates | Instant (re-index) | Re-train required |
| Cost | Embedding + storage + per-query retrieval | Training compute + hosting |
| Accuracy on your data | High (with good retrieval) | High (if enough training data) |
| Hallucination control | Good (model sees the source) | Poor (no source attribution) |
| Interpretability | Can show source documents | Black box |
| Maintenance | Update index when docs change | Retrain periodically |
Fine-tuning is the right choice when you need to change the model’s behavior (writing style, domain vocabulary, output format). RAG is the right choice when you need to give the model access to knowledge. Most production use cases need knowledge, not behavior changes.
Stage 1: Document Loading
Before you can build a pipeline, you need to load documents. Here are practical loaders for the most common formats.
PDF Loading
import fitz # PyMuPDF — pip install pymupdf
def load_pdf(file_path: str) -> list[dict]:
"""Load a PDF and return pages as documents with metadata."""
documents = []
doc = fitz.open(file_path)
for page_num in range(len(doc)):
page = doc[page_num]
text = page.get_text("text")
if not text.strip():
continue
documents.append({
"content": text.strip(),
"metadata": {
"source": file_path,
"page": page_num + 1,
"total_pages": len(doc),
},
})
doc.close()
return documents
# Usage
pages = load_pdf("company-handbook.pdf")
print(f"Loaded {len(pages)} pages")
print(f"First page preview: {pages[0]['content'][:200]}")Markdown Loading
import os
import re
def load_markdown(file_path: str) -> list[dict]:
"""Load a markdown file, split by headers for natural sections."""
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
# Split on headers (##, ###, etc.)
sections = re.split(r'\n(#{1,4}\s+.+)\n', content)
documents = []
current_header = os.path.basename(file_path)
for i, section in enumerate(sections):
if re.match(r'^#{1,4}\s+', section):
current_header = section.strip("# ").strip()
continue
text = section.strip()
if not text or len(text) < 50: # Skip very short sections
continue
documents.append({
"content": text,
"metadata": {
"source": file_path,
"section": current_header,
"section_index": i,
},
})
return documentsHTML Loading
from bs4 import BeautifulSoup # pip install beautifulsoup4
def load_html(file_path: str, selector: str = "article") -> list[dict]:
"""Load an HTML file, extracting text from a specific element."""
with open(file_path, "r", encoding="utf-8") as f:
soup = BeautifulSoup(f.read(), "html.parser")
# Remove scripts and styles
for tag in soup(["script", "style", "nav", "footer"]):
tag.decompose()
# Try to find the main content area
main_content = soup.select_one(selector)
if not main_content:
main_content = soup.body or soup
text = main_content.get_text(separator="\n", strip=True)
return [{
"content": text,
"metadata": {
"source": file_path,
"title": soup.title.string if soup.title else None,
},
}]CSV Loading
import csv
def load_csv(file_path: str, content_columns: list[str], id_column: str = None) -> list[dict]:
"""Load a CSV file, combining specified columns into document content."""
documents = []
with open(file_path, "r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for i, row in enumerate(reader):
# Combine specified columns into content
parts = []
for col in content_columns:
if col in row and row[col]:
parts.append(f"{col}: {row[col]}")
content = "\n".join(parts)
if not content.strip():
continue
doc_id = row.get(id_column, str(i)) if id_column else str(i)
documents.append({
"content": content,
"metadata": {
"source": file_path,
"row_id": doc_id,
**{k: v for k, v in row.items() if k not in content_columns},
},
})
return documents
# Usage: load product FAQs
docs = load_csv(
"faqs.csv",
content_columns=["question", "answer"],
id_column="faq_id",
)Universal Loader
from pathlib import Path
def load_documents(path: str) -> list[dict]:
"""Load documents from a file or directory, auto-detecting format."""
p = Path(path)
if p.is_file():
return _load_single_file(p)
if p.is_dir():
documents = []
for file_path in sorted(p.rglob("*")):
if file_path.is_file() and file_path.suffix in LOADERS:
try:
docs = _load_single_file(file_path)
documents.extend(docs)
except Exception as e:
print(f"Warning: failed to load {file_path}: {e}")
return documents
raise FileNotFoundError(f"Path not found: {path}")
LOADERS = {
".pdf": load_pdf,
".md": load_markdown,
".html": load_html,
".htm": load_html,
}
def _load_single_file(path: Path) -> list[dict]:
loader = LOADERS.get(path.suffix)
if not loader:
# Fallback: read as plain text
with open(path, "r", encoding="utf-8") as f:
return [{"content": f.read(), "metadata": {"source": str(path)}}]
return loader(str(path))Stage 2: Chunking
Documents are too long to embed and retrieve as single units. You need to split them into chunks — small enough to be relevant, large enough to be useful.
Lesson 7 covers chunking strategies in depth. Here is a production-ready recursive chunker to get you started:
def recursive_chunk(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
separators: list[str] = None,
) -> list[str]:
"""Split text into overlapping chunks using recursive character splitting.
Tries to split on natural boundaries (paragraphs, sentences, words)
before falling back to character-level splitting.
"""
if separators is None:
separators = ["\n\n", "\n", ". ", " ", ""]
if len(text) <= chunk_size:
return [text] if text.strip() else []
# Find the best separator that creates meaningful splits
for sep in separators:
if sep and sep in text:
parts = text.split(sep)
break
else:
# No separator works — split by characters
chunks = []
for i in range(0, len(text), chunk_size - chunk_overlap):
chunk = text[i:i + chunk_size]
if chunk.strip():
chunks.append(chunk.strip())
if i + chunk_size >= len(text):
break
return chunks
# Merge parts into chunks that respect the size limit
chunks = []
current = ""
for part in parts:
test = f"{current}{sep}{part}" if current else part
if len(test) <= chunk_size:
current = test
else:
if current.strip():
chunks.append(current.strip())
if len(part) > chunk_size:
# Recurse on oversized parts
sub_chunks = recursive_chunk(
part, chunk_size, chunk_overlap,
separators[separators.index(sep) + 1:]
)
chunks.extend(sub_chunks)
current = ""
else:
current = part
if current.strip():
chunks.append(current.strip())
# Add overlap between chunks
if chunk_overlap > 0 and len(chunks) > 1:
overlapped = [chunks[0]]
for i in range(1, len(chunks)):
prev_end = chunks[i - 1][-chunk_overlap:]
overlapped.append(prev_end + " " + chunks[i])
chunks = overlapped
return chunks
def chunk_documents(documents: list[dict], chunk_size: int = 512) -> list[dict]:
"""Chunk a list of documents, preserving metadata."""
chunked = []
for doc in documents:
chunks = recursive_chunk(doc["content"], chunk_size=chunk_size)
for i, chunk in enumerate(chunks):
chunked.append({
"content": chunk,
"metadata": {
**doc["metadata"],
"chunk_index": i,
"total_chunks": len(chunks),
},
})
return chunked
# Usage
raw_docs = load_documents("./knowledge_base/")
chunks = chunk_documents(raw_docs, chunk_size=512)
print(f"Created {len(chunks)} chunks from {len(raw_docs)} documents")Chunk Size Guidelines
| Chunk Size | Good For | Problems |
|---|---|---|
| 128-256 tokens | Precise retrieval, FAQ matching | Loses context, many chunks to store |
| 256-512 tokens | General-purpose, most RAG systems | Good default balance |
| 512-1024 tokens | Technical docs, legal documents | Less precise retrieval |
| 1024+ tokens | Long-form analysis, narratives | Poor retrieval precision, expensive |
Start with 512 tokens, 50-token overlap. Adjust based on your evaluation metrics.
Stage 3: Embedding
Embeddings convert text into vectors — lists of numbers that represent meaning. Similar texts produce similar vectors. This is what enables semantic search.
Embedding Model Comparison
| Model | Dimensions | Speed | Quality | Cost |
|---|---|---|---|---|
| OpenAI text-embedding-3-small | 1536 | Fast | Good | $0.02/1M tokens |
| OpenAI text-embedding-3-large | 3072 | Fast | Better | $0.13/1M tokens |
| Cohere embed-english-v3.0 | 1024 | Fast | Good | $0.10/1M tokens |
| sentence-transformers/all-MiniLM-L6-v2 | 384 | Local, fast | Decent | Free |
| BAAI/bge-large-en-v1.5 | 1024 | Local, slow | Very good | Free |
For production, start with text-embedding-3-small. It is cheap, fast, and good enough. Switch to a local model if you have data privacy requirements or want to eliminate API costs.
Embedding Implementation
from openai import OpenAI
client = OpenAI()
def embed_texts(texts: list[str], model: str = "text-embedding-3-small") -> list[list[float]]:
"""Embed a batch of texts using OpenAI's API.
Handles batching for large inputs (API limit: 2048 texts per call).
"""
all_embeddings = []
batch_size = 2048
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
response = client.embeddings.create(
model=model,
input=batch,
)
batch_embeddings = [item.embedding for item in response.data]
all_embeddings.extend(batch_embeddings)
return all_embeddings
def embed_with_retry(
texts: list[str],
model: str = "text-embedding-3-small",
max_retries: int = 3,
) -> list[list[float]]:
"""Embed texts with retry logic for production use."""
import time
for attempt in range(max_retries):
try:
return embed_texts(texts, model)
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt # Exponential backoff
print(f"Embedding failed (attempt {attempt + 1}): {e}. Retrying in {wait_time}s...")
time.sleep(wait_time)Local Embeddings with sentence-transformers
from sentence_transformers import SentenceTransformer
# Load model once (downloads on first use, ~90MB for MiniLM)
model = SentenceTransformer("all-MiniLM-L6-v2")
def embed_texts_local(texts: list[str]) -> list[list[float]]:
"""Embed texts locally — no API calls, no cost, full privacy."""
embeddings = model.encode(texts, show_progress_bar=True)
return embeddings.tolist()
# Usage
chunks = ["How to reset password", "Billing FAQ", "API rate limits"]
vectors = embed_texts_local(chunks)
print(f"Embedded {len(vectors)} chunks, dimension: {len(vectors[0])}")
# Embedded 3 chunks, dimension: 384Stage 4: Vector Storage with ChromaDB
ChromaDB is the simplest vector database to start with. Zero configuration, runs in-process, supports persistence. Perfect for prototyping and small to medium production workloads.
pip install chromadbComplete Ingestion Pipeline
import chromadb
from chromadb.utils import embedding_functions
# Initialize ChromaDB with persistence
chroma_client = chromadb.PersistentClient(path="./chroma_db")
# Use OpenAI embeddings (or switch to a local model)
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key="your-openai-api-key",
model_name="text-embedding-3-small",
)
# Create or get a collection
collection = chroma_client.get_or_create_collection(
name="knowledge_base",
embedding_function=openai_ef,
metadata={"hnsw:space": "cosine"}, # Use cosine similarity
)
def ingest_documents(documents_path: str):
"""Full ingestion pipeline: load → chunk → embed → store."""
# Step 1: Load documents
raw_docs = load_documents(documents_path)
print(f"Loaded {len(raw_docs)} documents")
# Step 2: Chunk documents
chunks = chunk_documents(raw_docs, chunk_size=512)
print(f"Created {len(chunks)} chunks")
# Step 3: Store in ChromaDB (embedding happens automatically)
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
collection.add(
ids=[f"chunk_{i+j}" for j in range(len(batch))],
documents=[chunk["content"] for chunk in batch],
metadatas=[chunk["metadata"] for chunk in batch],
)
print(f"Stored batch {i // batch_size + 1} ({len(batch)} chunks)")
print(f"Ingestion complete. Collection has {collection.count()} documents.")
# Run ingestion
ingest_documents("./knowledge_base/")Querying ChromaDB
def search(query: str, n_results: int = 5, where: dict = None) -> list[dict]:
"""Search the vector database for relevant chunks."""
search_params = {
"query_texts": [query],
"n_results": n_results,
}
if where:
search_params["where"] = where
results = collection.query(**search_params)
# Flatten results into a list of documents
documents = []
for i in range(len(results["documents"][0])):
documents.append({
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i],
})
return documents
# Simple search
results = search("How do I reset my password?")
for r in results:
print(f"[{r['distance']:.3f}] {r['content'][:100]}...")
# Filtered search (only search specific sources)
results = search(
"refund policy",
where={"source": "policies/refund-policy.md"},
)The Query Pipeline: Retrieval + Augmentation + Generation
Now we wire everything together. This is the core of your RAG system.
import json
from openai import OpenAI
client = OpenAI()
class RAGPipeline:
"""Complete RAG pipeline: retrieve → augment → generate."""
def __init__(self, collection, model: str = "gpt-4o"):
self.collection = collection
self.model = model
def query(
self,
question: str,
n_results: int = 5,
where: dict = None,
) -> dict:
"""Answer a question using RAG."""
# Step 1: Retrieve relevant chunks
retrieval_results = self._retrieve(question, n_results, where)
if not retrieval_results:
return {
"answer": "I couldn't find any relevant information to answer your question.",
"sources": [],
"context_used": 0,
}
# Step 2: Build augmented prompt
prompt = self._build_prompt(question, retrieval_results)
# Step 3: Generate answer
answer = self._generate(prompt)
return {
"answer": answer,
"sources": [
{
"content": r["content"][:200] + "...",
"metadata": r["metadata"],
"relevance_score": 1 - r["distance"],
}
for r in retrieval_results
],
"context_used": len(retrieval_results),
}
def _retrieve(self, query: str, n_results: int, where: dict = None) -> list[dict]:
"""Retrieve relevant documents from the vector store."""
search_params = {
"query_texts": [query],
"n_results": n_results,
}
if where:
search_params["where"] = where
results = self.collection.query(**search_params)
documents = []
for i in range(len(results["documents"][0])):
doc = {
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"distance": results["distances"][0][i],
}
# Filter out low-relevance results
if doc["distance"] < 1.5: # Cosine distance threshold
documents.append(doc)
return documents
def _build_prompt(self, question: str, context_docs: list[dict]) -> list[dict]:
"""Build the augmented prompt with retrieved context."""
# Format context with source attribution
context_parts = []
for i, doc in enumerate(context_docs, 1):
source = doc["metadata"].get("source", "unknown")
section = doc["metadata"].get("section", "")
source_label = f"{source}"
if section:
source_label += f" > {section}"
context_parts.append(f"[Source {i}: {source_label}]\n{doc['content']}")
context_text = "\n\n---\n\n".join(context_parts)
system_prompt = """You are a helpful assistant that answers questions based on the provided context.
Rules:
1. Only use information from the provided context to answer questions.
2. If the context doesn't contain enough information, say so clearly.
3. Cite your sources by referencing [Source N] when using information from a specific document.
4. Be concise and direct. Don't repeat the question.
5. If multiple sources provide conflicting information, note the discrepancy."""
user_prompt = f"""Context:
{context_text}
---
Question: {question}
Answer based on the context above:"""
return [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
def _generate(self, messages: list[dict]) -> str:
"""Generate an answer using the LLM."""
try:
response = client.chat.completions.create(
model=self.model,
messages=messages,
temperature=0,
max_tokens=1024,
)
return response.choices[0].message.content
except Exception as e:
return f"Error generating response: {e}"
# Usage
rag = RAGPipeline(collection=collection)
result = rag.query("What is our refund policy for enterprise customers?")
print(result["answer"])
print(f"\n--- Sources ({result['context_used']} documents used) ---")
for source in result["sources"]:
print(f" [{source['relevance_score']:.2f}] {source['metadata'].get('source', 'unknown')}")Prompt Templates for RAG
The prompt template is where most RAG systems succeed or fail. Here are battle-tested templates for different use cases.
Grounded Q&A (Strict — No Hallucination)
STRICT_QA_TEMPLATE = """You answer questions based ONLY on the provided documents.
RULES:
- If the answer is in the documents, provide it with source citations.
- If the answer is NOT in the documents, respond with:
"I don't have enough information in the provided documents to answer this question."
- NEVER make up information or use knowledge outside the provided documents.
- Quote directly from the documents when possible.
DOCUMENTS:
{context}
QUESTION: {question}
ANSWER:"""Conversational RAG (Friendly, With Citations)
CONVERSATIONAL_TEMPLATE = """You are a helpful support agent. Use the knowledge base
articles below to answer the customer's question.
If the articles contain the answer, explain it clearly and cite which article
you used like this: (Source: article-name).
If the articles don't cover the topic, let the customer know and suggest
they contact [email protected] for further help.
Knowledge Base Articles:
{context}
Customer Question: {question}
Your Response:"""Analytical RAG (Compare, Synthesize)
ANALYTICAL_TEMPLATE = """You are a research analyst. Synthesize information from
multiple sources to provide a comprehensive answer.
For each claim you make:
1. Cite the source(s) that support it
2. Note if sources disagree
3. Indicate your confidence level (high/medium/low)
Sources:
{context}
Research Question: {question}
Analysis:"""Error Handling for Production
A production RAG system must handle failures gracefully. Here are the common failure modes and how to handle them.
class ProductionRAGPipeline(RAGPipeline):
"""RAG pipeline with production error handling."""
def query(self, question: str, n_results: int = 5, where: dict = None) -> dict:
"""Query with comprehensive error handling."""
# Validate input
if not question or not question.strip():
return self._error_response("Empty question provided")
if len(question) > 10000:
return self._error_response("Question too long (max 10,000 characters)")
try:
# Retrieve
retrieval_results = self._retrieve(question, n_results, where)
except Exception as e:
return self._error_response(f"Retrieval failed: {e}", fallback=True)
# Handle empty results
if not retrieval_results:
return {
"answer": (
"I couldn't find any relevant documents for your question. "
"Try rephrasing or broadening your query."
),
"sources": [],
"context_used": 0,
"status": "no_results",
}
# Check context window limits
total_context_length = sum(len(doc["content"]) for doc in retrieval_results)
max_context_chars = 30000 # ~7500 tokens, safe for most models
if total_context_length > max_context_chars:
# Truncate results to fit
truncated = []
running_length = 0
for doc in retrieval_results:
if running_length + len(doc["content"]) > max_context_chars:
break
truncated.append(doc)
running_length += len(doc["content"])
retrieval_results = truncated
# Build prompt and generate
try:
prompt = self._build_prompt(question, retrieval_results)
answer = self._generate(prompt)
except Exception as e:
return self._error_response(
f"Generation failed: {e}",
sources=retrieval_results,
)
return {
"answer": answer,
"sources": [
{
"content": r["content"][:200],
"metadata": r["metadata"],
"relevance_score": round(1 - r["distance"], 3),
}
for r in retrieval_results
],
"context_used": len(retrieval_results),
"status": "success",
}
def _error_response(
self, error: str, fallback: bool = False, sources: list = None
) -> dict:
"""Generate a structured error response."""
response = {
"answer": "I'm having trouble answering your question right now. Please try again.",
"sources": [],
"context_used": 0,
"status": "error",
"error": error,
}
if fallback:
# Could fall back to a direct LLM call without RAG
response["answer"] = (
"I'm unable to search the knowledge base right now. "
"Please try again in a moment."
)
return responseComplete End-to-End Working Code
Here are two scripts you can run today: one for ingestion, one for querying.
Script 1: ingest.py
"""
Ingest documents into the RAG pipeline.
Usage:
python ingest.py ./documents/
Requirements:
pip install chromadb openai pymupdf beautifulsoup4
"""
import sys
import os
import re
from pathlib import Path
import fitz
import chromadb
from chromadb.utils import embedding_functions
def load_pdf(file_path: str) -> list[dict]:
docs = []
doc = fitz.open(file_path)
for i in range(len(doc)):
text = doc[i].get_text("text").strip()
if text:
docs.append({"content": text, "metadata": {"source": file_path, "page": i + 1}})
doc.close()
return docs
def load_text(file_path: str) -> list[dict]:
with open(file_path, "r", encoding="utf-8") as f:
return [{"content": f.read(), "metadata": {"source": file_path}}]
def load_documents(path: str) -> list[dict]:
p = Path(path)
loaders = {".pdf": load_pdf, ".md": load_text, ".txt": load_text}
if p.is_file():
loader = loaders.get(p.suffix, load_text)
return loader(str(p))
docs = []
for fp in sorted(p.rglob("*")):
if fp.is_file() and fp.suffix in loaders:
try:
docs.extend(loaders[fp.suffix](str(fp)))
except Exception as e:
print(f" Warning: failed {fp}: {e}")
return docs
def recursive_chunk(text: str, chunk_size: int = 512, overlap: int = 50) -> list[str]:
if len(text) <= chunk_size:
return [text] if text.strip() else []
separators = ["\n\n", "\n", ". ", " "]
for sep in separators:
if sep in text:
parts = text.split(sep)
break
else:
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size - overlap)]
chunks, current = [], ""
for part in parts:
candidate = f"{current}{sep}{part}" if current else part
if len(candidate) <= chunk_size:
current = candidate
else:
if current.strip():
chunks.append(current.strip())
current = part if len(part) <= chunk_size else part[:chunk_size]
if current.strip():
chunks.append(current.strip())
return chunks
def main():
if len(sys.argv) < 2:
print("Usage: python ingest.py <documents_path>")
sys.exit(1)
documents_path = sys.argv[1]
# Load
print(f"Loading documents from {documents_path}...")
raw_docs = load_documents(documents_path)
print(f" Loaded {len(raw_docs)} documents")
# Chunk
chunks = []
for doc in raw_docs:
for i, chunk_text in enumerate(recursive_chunk(doc["content"])):
chunks.append({
"content": chunk_text,
"metadata": {**doc["metadata"], "chunk_index": i},
})
print(f" Created {len(chunks)} chunks")
# Store
print("Storing in ChromaDB...")
db = chromadb.PersistentClient(path="./chroma_db")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.environ["OPENAI_API_KEY"],
model_name="text-embedding-3-small",
)
collection = db.get_or_create_collection(
name="knowledge_base",
embedding_function=openai_ef,
)
batch_size = 100
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
collection.add(
ids=[f"chunk_{i+j}" for j in range(len(batch))],
documents=[c["content"] for c in batch],
metadatas=[c["metadata"] for c in batch],
)
print(f" Stored batch {i // batch_size + 1}")
print(f"Done. Collection has {collection.count()} chunks.")
if __name__ == "__main__":
main()Script 2: query.py
"""
Query the RAG pipeline.
Usage:
python query.py "What is our refund policy?"
Requirements:
pip install chromadb openai
"""
import sys
import os
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
def main():
if len(sys.argv) < 2:
print("Usage: python query.py <question>")
sys.exit(1)
question = sys.argv[1]
# Connect to ChromaDB
db = chromadb.PersistentClient(path="./chroma_db")
openai_ef = embedding_functions.OpenAIEmbeddingFunction(
api_key=os.environ["OPENAI_API_KEY"],
model_name="text-embedding-3-small",
)
collection = db.get_collection(
name="knowledge_base",
embedding_function=openai_ef,
)
# Retrieve
results = collection.query(query_texts=[question], n_results=5)
if not results["documents"][0]:
print("No relevant documents found.")
return
# Build context
context_parts = []
for i, (doc, meta) in enumerate(
zip(results["documents"][0], results["metadatas"][0]), 1
):
source = meta.get("source", "unknown")
context_parts.append(f"[Source {i}: {source}]\n{doc}")
context = "\n\n---\n\n".join(context_parts)
# Generate
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": (
"Answer based ONLY on the provided context. "
"Cite sources as [Source N]. If the context doesn't "
"contain the answer, say so."
),
},
{
"role": "user",
"content": f"Context:\n{context}\n\nQuestion: {question}",
},
],
temperature=0,
max_tokens=1024,
)
answer = response.choices[0].message.content
print(f"\nQuestion: {question}")
print(f"\nAnswer: {answer}")
print(f"\n--- Sources ---")
for i, meta in enumerate(results["metadatas"][0], 1):
score = 1 - results["distances"][0][i - 1]
print(f" [{i}] {meta.get('source', 'unknown')} (relevance: {score:.2f})")
if __name__ == "__main__":
main()Testing Your RAG Pipeline
Before you automate anything, test manually. Here is a simple evaluation script:
def evaluate_rag(pipeline, test_cases: list[dict]) -> dict:
"""Run test cases against your RAG pipeline and score results.
Each test case: {"question": "...", "expected_keywords": [...], "expected_source": "..."}
"""
results = []
for test in test_cases:
response = pipeline.query(test["question"])
answer = response["answer"].lower()
# Check if expected keywords appear in the answer
keyword_hits = sum(
1 for kw in test.get("expected_keywords", [])
if kw.lower() in answer
)
keyword_score = (
keyword_hits / len(test["expected_keywords"])
if test.get("expected_keywords")
else 0
)
# Check if the expected source was retrieved
source_hit = any(
test.get("expected_source", "") in s["metadata"].get("source", "")
for s in response.get("sources", [])
)
results.append({
"question": test["question"],
"keyword_score": keyword_score,
"source_hit": source_hit,
"answer_preview": response["answer"][:200],
})
# Aggregate metrics
avg_keyword_score = sum(r["keyword_score"] for r in results) / len(results)
source_hit_rate = sum(1 for r in results if r["source_hit"]) / len(results)
return {
"avg_keyword_score": round(avg_keyword_score, 3),
"source_hit_rate": round(source_hit_rate, 3),
"total_tests": len(results),
"details": results,
}
# Define test cases
test_cases = [
{
"question": "What is our refund policy?",
"expected_keywords": ["30 days", "full refund", "enterprise"],
"expected_source": "refund-policy",
},
{
"question": "How do I reset my API key?",
"expected_keywords": ["settings", "regenerate", "api key"],
"expected_source": "api-docs",
},
{
"question": "What are the rate limits?",
"expected_keywords": ["requests", "per minute", "limit"],
"expected_source": "api-docs",
},
]
# Run evaluation
metrics = evaluate_rag(rag, test_cases)
print(f"Keyword Score: {metrics['avg_keyword_score']:.1%}")
print(f"Source Hit Rate: {metrics['source_hit_rate']:.1%}")When RAG Is Not the Answer
RAG is not a universal solution. Sometimes other approaches work better.
Context Stuffing
If your entire knowledge base fits in the context window (under ~100K tokens for modern models), just stuff it all in. No vector DB, no retrieval, no chunking. Simpler and often more accurate.
def context_stuffing_query(question: str, knowledge_base: str) -> str:
"""When your KB is small enough, just put it all in the prompt."""
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": f"Use this knowledge base:\n\n{knowledge_base}"},
{"role": "user", "content": question},
],
temperature=0,
)
return response.choices[0].message.content
# If your KB is under 50K tokens, this is simpler and often better
with open("small_knowledge_base.md") as f:
kb = f.read()
answer = context_stuffing_query("What is the refund policy?", kb)Fine-Tuning
When you need to change how the model behaves — writing style, domain terminology, consistent formatting — fine-tuning is better than RAG. RAG gives the model information; fine-tuning changes the model itself.
Hybrid: RAG + Fine-Tuning
For some use cases, you want both: a fine-tuned model that understands your domain vocabulary and writing style, augmented with RAG for up-to-date facts. This is the most powerful (and most expensive) approach.
Key Takeaways
-
RAG has four stages: ingest, retrieve, augment, generate. Get each stage working independently before wiring them together. Most bugs live in ingestion and retrieval, not generation.
-
Retrieval quality matters more than generation quality. If you retrieve the wrong chunks, the best model in the world cannot give you a good answer. Invest 80% of your optimization effort in retrieval.
-
Start with ChromaDB and text-embedding-3-small. Both are simple, cheap, and good enough. You can always swap them out later. Do not over-engineer your first version.
-
Chunk size of 512 tokens with 50-token overlap is a good default. Adjust based on your evaluation metrics. See Lesson 7 for a deep dive on chunking strategies.
-
Your prompt template makes or breaks the system. Use grounding instructions (“only use the provided context”), require citations, and tell the model what to do when it does not have enough information.
-
Test manually before automating. Build 10-20 test cases that cover your expected queries, edge cases, and failure modes. Run them by hand. Fix the failures. Then build your evaluation pipeline.
-
Handle errors at every stage. Empty results, context overflow, embedding API failures, and LLM errors all happen in production. Build fallback responses for each failure mode.
-
If your knowledge base is small, consider context stuffing first. No vector DB, no embeddings, no retrieval. If it fits in the context window, it is simpler and often more accurate than RAG.