Every serious AI application I've seen ship in the last two years has RAG at its core. Here's the complete blueprint.
I remember the exact meeting where RAG saved our project.
We'd built a customer support chatbot backed by a fine-tuned model. It was clever, well-trained, and confidently wrong about half the time. It would answer questions about product features that had changed six months ago, cite policies that no longer existed, and occasionally hallucinate workflows that had never existed. Our users were frustrated. Our support team was more frustrated — they were fielding escalations from users who'd followed the bot's bad advice.
A colleague suggested RAG. I was skeptical — it felt like a workaround, not a real solution. We spent a week rebuilding the system. The hallucination rate dropped by over 80%. The bot started citing exact sentences from our documentation. Users began trusting it.
That was the moment I understood RAG isn't just an architecture pattern. It's the bridge between what LLMs are good at (reasoning, synthesis, natural language) and what they're bad at (staying current, staying grounded, staying accurate).
This article is the complete guide I wish I'd had before that project.
Why RAG Exists: The Core Problem
Language models are trained on static snapshots of the world. Once training ends, their knowledge freezes. Ask GPT-4 about something that happened after its training cutoff, or about your company's internal policies, or about a product feature you shipped last month — and you'll get either a hallucination or an honest "I don't know."
Even within their training window, models don't cite — they synthesize. They can't tell you which page of your 500-page knowledge base their answer came from. They can't be updated without expensive retraining. And they confidently state things that are wrong with the same tone as things that are right.
RAG solves this by separating knowledge storage from reasoning. You store your knowledge externally (in a vector database, a document store, an API). At inference time, you retrieve the relevant pieces and hand them to the model as context. The model reasons over what you give it — it doesn't have to remember it.
The result: a system that's always current, always grounded, and always citable.
The RAG Architecture: A Mental Model
At its simplest, RAG has two phases:
Indexing (offline): Documents are chunked, embedded, and stored in a vector database.
Retrieval + Generation (online): When a query arrives, it's embedded, the most relevant chunks are retrieved, and those chunks are injected into the LLM's prompt alongside the query.
┌─────────────────────────────────────────────────────────┐
│ INDEXING PIPELINE │
│ │
│ Documents → Chunker → Embedder → Vector Store │
│ (PDFs, docs, (split (encode (Pinecone, │
│ web pages, text) meaning) pgvector, Chroma) │
│ databases) │
└─────────────────────────────────────────────────────────┘
↓ (one-time or scheduled)
┌─────────────────────────────────────────────────────────┐
│ INFERENCE PIPELINE │
│ │
│ User Query → Embed Query → Vector Search │
│ ↓ │
│ Top-K Chunks Retrieved │
│ ↓ │
│ [Query + Retrieved Chunks] → LLM │
│ ↓ │
│ Grounded Answer │
└─────────────────────────────────────────────────────────┘This is the "naive RAG" baseline. It works surprisingly well. And it's the foundation for every more sophisticated variant.
Building a Production RAG System: Step by Step
Let's build this properly, with real code.
Step 1: Document ingestion and chunking
import hashlib
from dataclasses import dataclass, field
from typing import Optional
from pathlib import Path
@dataclass
class Document:
content: str
source: str
chunk_index: int
total_chunks: int
metadata: dict = field(default_factory=dict)
doc_id: str = field(init=False)
def __post_init__(self):
# Deterministic ID based on content - enables deduplication
self.doc_id = hashlib.md5(
f"{self.source}:{self.chunk_index}:{self.content[:100]}".encode()
).hexdigest()
def chunk_document(
text: str,
source: str,
chunk_size: int = 400,
overlap: int = 80,
) -> list[Document]:
"""
Chunk a document into overlapping windows.
chunk_size: Target tokens per chunk (approximate - we use words as proxy)
overlap: Words shared between adjacent chunks to preserve context at boundaries
"""
# Split at paragraph boundaries first, then by size
paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
chunks = []
current_words = []
for para in paragraphs:
para_words = para.split()
if len(current_words) + len(para_words) > chunk_size and current_words:
# Emit current chunk
chunk_text = ' '.join(current_words)
chunks.append(chunk_text)
# Carry over overlap words
current_words = current_words[-overlap:] + para_words
else:
current_words.extend(para_words)
if current_words:
chunks.append(' '.join(current_words))
return [
Document(
content=chunk,
source=source,
chunk_index=i,
total_chunks=len(chunks),
)
for i, chunk in enumerate(chunks)
]Step 2: The embedding layer
import anthropic
import numpy as np
from typing import Protocol
class EmbeddingModel(Protocol):
"""Protocol for any embedding provider."""
def embed(self, texts: list[str]) -> list[list[float]]: ...
class VoyageEmbedder:
"""
Production-ready embedder with batching and retry logic.
Swap for OpenAI, Cohere, or sentence-transformers as needed.
"""
def __init__(self, model: str = "voyage-3", batch_size: int = 128):
import voyageai
self.client = voyageai.Client()
self.model = model
self.batch_size = batch_size
def embed(self, texts: list[str]) -> list[list[float]]:
all_embeddings = []
for i in range(0, len(texts), self.batch_size):
batch = texts[i:i + self.batch_size]
result = self.client.embed(batch, model=self.model)
all_embeddings.extend(result.embeddings)
return all_embeddings
def embed_query(self, query: str) -> list[float]:
"""Queries often get a different instruction than documents."""
result = self.client.embed(
[query],
model=self.model,
input_type="query" # Asymmetric embedding for better retrieval
)
return result.embeddings[0]Step 3: The vector store
import psycopg2
from psycopg2.extras import execute_values
import json
class VectorStore:
"""
pgvector-backed document store.
For hosted alternatives: Pinecone, Weaviate, Qdrant, or Chroma.
"""
def __init__(self, connection_string: str, embedding_dim: int = 1024):
self.conn = psycopg2.connect(connection_string)
self.embedding_dim = embedding_dim
self._initialize_schema()
def _initialize_schema(self):
with self.conn.cursor() as cur:
cur.execute("CREATE EXTENSION IF NOT EXISTS vector")
cur.execute(f"""
CREATE TABLE IF NOT EXISTS rag_documents (
id VARCHAR PRIMARY KEY,
content TEXT NOT NULL,
source VARCHAR NOT NULL,
chunk_index INTEGER,
metadata JSONB DEFAULT '{{}}',
embedding vector({self.embedding_dim}),
created_at TIMESTAMP DEFAULT NOW()
)
""")
# IVFFlat index for approximate nearest neighbor (fast at scale)
cur.execute("""
CREATE INDEX IF NOT EXISTS rag_embedding_idx
ON rag_documents USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100)
""")
self.conn.commit()
def upsert(self, documents: list[Document], embeddings: list[list[float]]):
"""Upsert documents - idempotent based on doc_id."""
records = [
(
doc.doc_id,
doc.content,
doc.source,
doc.chunk_index,
json.dumps(doc.metadata),
embedding
)
for doc, embedding in zip(documents, embeddings)
]
with self.conn.cursor() as cur:
execute_values(cur, """
INSERT INTO rag_documents (id, content, source, chunk_index, metadata, embedding)
VALUES %s
ON CONFLICT (id) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
metadata = EXCLUDED.metadata
""", records)
self.conn.commit()
print(f"Upserted {len(records)} documents.")
def search(
self,
query_embedding: list[float],
top_k: int = 5,
source_filter: Optional[str] = None
) -> list[dict]:
"""Retrieve top-k most semantically similar chunks."""
where_clause = "WHERE source = %(source)s" if source_filter else ""
with self.conn.cursor() as cur:
cur.execute(f"""
SELECT
id,
content,
source,
chunk_index,
metadata,
1 - (embedding <=> %(embedding)s::vector) AS similarity
FROM rag_documents
{where_clause}
ORDER BY embedding <=> %(embedding)s::vector
LIMIT %(top_k)s
""", {
"embedding": query_embedding,
"top_k": top_k,
"source": source_filter
})
rows = cur.fetchall()
return [
{
"id": row[0],
"content": row[1],
"source": row[2],
"chunk_index": row[3],
"metadata": row[4],
"similarity": float(row[5])
}
for row in rows
]Step 4: The generation layer
import anthropic
from typing import Optional
class RAGPipeline:
"""
End-to-end RAG pipeline: retrieve relevant context, generate grounded answer.
"""
SYSTEM_PROMPT = """You are a helpful assistant that answers questions based on provided context.
Rules:
1. Answer ONLY based on the provided context. Do not use outside knowledge.
2. If the context doesn't contain enough information, say so clearly.
3. Always cite your sources using [Source: filename, chunk N] format.
4. Never fabricate information not present in the context.
5. If multiple sources conflict, acknowledge the conflict and cite both."""
def __init__(
self,
vector_store: VectorStore,
embedder: VoyageEmbedder,
top_k: int = 5,
similarity_threshold: float = 0.6,
):
self.vector_store = vector_store
self.embedder = embedder
self.client = anthropic.Anthropic()
self.top_k = top_k
self.similarity_threshold = similarity_threshold
def retrieve(self, query: str) -> list[dict]:
"""Retrieve relevant chunks for a query."""
query_embedding = self.embedder.embed_query(query)
results = self.vector_store.search(query_embedding, top_k=self.top_k)
# Filter out low-relevance results
return [r for r in results if r["similarity"] >= self.similarity_threshold]
def build_context(self, retrieved_chunks: list[dict]) -> str:
"""Format retrieved chunks into a structured context block."""
if not retrieved_chunks:
return "No relevant context found."
sections = []
for i, chunk in enumerate(retrieved_chunks, 1):
source_label = f"{chunk['source']} (chunk {chunk['chunk_index']})"
sections.append(
f"[Source {i}: {source_label}]\n{chunk['content']}"
)
return "\n\n---\n\n".join(sections)
def answer(
self,
query: str,
conversation_history: Optional[list[dict]] = None
) -> dict:
"""
Generate a grounded answer for a query.
Returns:
{
"answer": str,
"sources": list[dict],
"context_used": str,
"had_relevant_context": bool
}
"""
# Retrieve
retrieved = self.retrieve(query)
context = self.build_context(retrieved)
had_context = bool(retrieved)
# Build messages
messages = conversation_history or []
messages = messages + [{
"role": "user",
"content": f"""Context:
{context}
---
Question: {query}"""
}]
# Generate
response = self.client.messages.create(
model="claude-opus-4-5",
max_tokens=1024,
system=self.SYSTEM_PROMPT,
messages=messages
)
return {
"answer": response.content[0].text,
"sources": retrieved,
"context_used": context,
"had_relevant_context": had_context,
}
# Usage
pipeline = RAGPipeline(
vector_store=VectorStore("postgresql://localhost/ragdb"),
embedder=VoyageEmbedder(),
top_k=5,
similarity_threshold=0.65
)
result = pipeline.answer("What is our refund policy for annual subscriptions?")
print(result["answer"])
print("\nSources used:")
for source in result["sources"]:
print(f" - {source['source']} (similarity: {source['similarity']:.3f})")Advanced RAG: Beyond the Naive Baseline
Naive RAG works. Advanced RAG works better. Here are the three most impactful upgrades.
1. Query rewriting
User queries are often ambiguous, conversational, or missing context. Rewrite them before embedding.
def rewrite_query_for_retrieval(
original_query: str,
conversation_history: list[dict],
client: anthropic.Anthropic
) -> str:
"""
Rewrite a potentially ambiguous query into a self-contained search query.
Critical for multi-turn conversations where "it" or "that" refers to earlier context.
"""
if not conversation_history:
return original_query
history_text = "\n".join([
f"{m['role'].upper()}: {m['content'][:200]}"
for m in conversation_history[-4:] # Last 2 turns
])
response = client.messages.create(
model="claude-haiku-4-5-20251001",
max_tokens=150,
messages=[{
"role": "user",
"content": f"""Given this conversation history:
{history_text}Rewrite this follow-up question as a complete, standalone search query that contains all necessary context:
"{original_query}"
Return ONLY the rewritten query, nothing else."""
}]
)
return response.content[0].text.strip()
# Example
history = [
{"role": "user", "content": "What's your cancellation policy?"},
{"role": "assistant", "content": "You can cancel anytime from your account settings..."},
]
query = "What about refunds?"
# Without rewriting: searches for "refunds" - misses context
# After rewriting: "What is the refund policy when cancelling a subscription?"
rewritten = rewrite_query_for_retrieval(query, history, client)
2. Hybrid search (semantic + keyword)
Pure semantic search misses exact matches. Combine it with BM25 (keyword) search for better coverage.
from rank_bm25 import BM25Okapi
class HybridSearchEngine:
"""
Combines dense vector search (semantic) with BM25 (keyword) using RRF fusion.
Best of both worlds: semantic understanding + exact term matching.
"""
def __init__(self, vector_store: VectorStore, embedder):
self.vector_store = vector_store
self.embedder = embedder
self.bm25 = None
self.corpus_docs = []
def build_bm25_index(self, documents: list[Document]):
"""Build BM25 index from document corpus."""
self.corpus_docs = documents
tokenized_corpus = [doc.content.lower().split() for doc in documents]
self.bm25 = BM25Okapi(tokenized_corpus)
def reciprocal_rank_fusion(
self,
semantic_results: list[dict],
bm25_results: list[tuple[int, float]],
k: int = 60,
semantic_weight: float = 0.6,
bm25_weight: float = 0.4
) -> list[dict]:
"""
Fuse rankings from semantic and BM25 using Reciprocal Rank Fusion.
RRF is robust to scale differences between scoring methods.
"""
doc_scores: dict[str, float] = {}
doc_data: dict[str, dict] = {}
# Semantic ranking contribution
for rank, result in enumerate(semantic_results):
doc_id = result["id"]
doc_scores[doc_id] = doc_scores.get(doc_id, 0) + \
semantic_weight * (1 / (k + rank + 1))
doc_data[doc_id] = result
# BM25 ranking contribution
for rank, (corpus_idx, _score) in enumerate(bm25_results):
doc = self.corpus_docs[corpus_idx]
doc_scores[doc.doc_id] = doc_scores.get(doc.doc_id, 0) + \
bm25_weight * (1 / (k + rank + 1))
# Sort by fused score
ranked = sorted(doc_scores.items(), key=lambda x: x[1], reverse=True)
return [
{**doc_data[doc_id], "rrf_score": score}
for doc_id, score in ranked
if doc_id in doc_data
]
def search(self, query: str, top_k: int = 5) -> list[dict]:
# Semantic search
query_embedding = self.embedder.embed_query(query)
semantic_results = self.vector_store.search(query_embedding, top_k=top_k * 2)
# BM25 search
tokenized_query = query.lower().split()
bm25_scores = self.bm25.get_scores(tokenized_query)
bm25_ranked = sorted(
enumerate(bm25_scores), key=lambda x: x[1], reverse=True
)[:top_k * 2]
# Fuse and return top results
fused = self.reciprocal_rank_fusion(semantic_results, bm25_ranked)
return fused[:top_k]3. Re-ranking
After retrieval, re-rank candidates using a cross-encoder model (which sees query and document together, giving more accurate relevance scores than bi-encoder embeddings).
from sentence_transformers import CrossEncoder
class ReRanker:
"""
Re-rank retrieved chunks using a cross-encoder.
Cross-encoders are slower than bi-encoders but more accurate -
appropriate for the small set of candidates after initial retrieval.
"""
def __init__(self, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
self.model = CrossEncoder(model_name)
def rerank(
self,
query: str,
candidates: list[dict],
top_k: int = 3
) -> list[dict]:
"""Re-rank retrieved candidates and return the top-k."""
if not candidates:
return []
# Score each (query, candidate) pair
pairs = [(query, c["content"]) for c in candidates]
scores = self.model.predict(pairs)
# Sort by cross-encoder score
ranked = sorted(
zip(candidates, scores),
key=lambda x: x[1],
reverse=True
)
return [
{**candidate, "rerank_score": float(score)}
for candidate, score in ranked[:top_k]
]The Advanced RAG Architecture
Put it all together:
Query
↓
Query Rewriting (for conversational context)
↓
Hybrid Search (semantic + BM25) → Top-20 candidates
↓
Re-ranking (cross-encoder) → Top-5 candidates
↓
Context Assembly + Prompt Construction
↓
LLM Generation
↓
Answer + CitationsEach step adds latency (roughly 50–200ms each). For real-time applications, profile your pipeline and optimize the bottlenecks. Query rewriting and re-ranking are typically the most expensive — consider making them conditional on query complexity.
Indexing Strategy: The Decisions That Matter Most
How you chunk and index your documents determines retrieval quality more than almost anything else.
Chunk size: Smaller chunks (128–256 tokens) give more precise retrieval but lose context. Larger chunks (512–1024 tokens) preserve context but reduce precision. The sweet spot for most applications is 300–500 tokens with 15–20% overlap.
Metadata enrichment: Add as much structured metadata as possible at index time — document title, section headers, creation date, document type, tags. Use metadata filters to narrow search before vector similarity.
Hierarchical indexing: Index both summaries and full chunks. Use summary embeddings for initial retrieval, then expand to the full parent chunk for generation. This is the "parent-child" chunking pattern.
def index_with_metadata_enrichment(
text: str,
source: str,
document_type: str,
section_title: str = ""
) -> list[Document]:
"""Index with enriched metadata for better filtering and retrieval."""
chunks = chunk_document(text, source)
for chunk in chunks:
chunk.metadata = {
"document_type": document_type,
"section_title": section_title,
"source": source,
"word_count": len(chunk.content.split()),
}
# Prepend context to each chunk for better standalone retrieval
# ("Contextual retrieval" — Anthropic research, 2024)
chunk.content = f"[From: {source} | Section: {section_title}]\n\n{chunk.content}"
return chunksWhen NOT to Use RAG
RAG is powerful but not a universal solution.
Don't use RAG for reasoning-heavy tasks. If the answer requires synthesizing across 50 documents simultaneously, RAG's context window limits will bite you. Consider fine-tuning or purpose-built retrieval pipelines instead.
Don't use RAG when latency is critical. A full RAG pipeline adds 200–600ms. For real-time applications (voice, live chat), profile carefully.
Don't use RAG for small, static knowledge bases. If your entire knowledge base fits in a context window (< 100k tokens), just inject it all. No retrieval needed.
Don't assume RAG eliminates hallucinations. It dramatically reduces them, but a model can still misinterpret retrieved context or fill gaps with invented information. Always evaluate.
Real-World Impact Metrics
Teams moving from pure LLM to RAG-based systems typically report:
- 70–85% reduction in factual errors on domain-specific questions
- Near-zero "knowledge cutoff" failures for current documentation
- 40–60% improvement in user trust scores when citations are shown
- Support ticket deflection rate increases of 25–40% for self-service chatbots
Practical Recommendations
Start with naive RAG and measure before adding complexity. Query rewriting, hybrid search, and re-ranking each add engineering overhead — only add them if your eval metrics show they're needed.
Invest heavily in your chunking strategy before anything else. It's the highest-leverage decision in your entire pipeline.
Always show citations. Even imperfect citations build user trust and give you a debugging surface when the model gets something wrong.
Build an evaluation set before you go to production. You need ground truth to know if your changes are helping or hurting.
Monitor similarity scores in production. A sudden drop in average retrieval similarity is an early warning that your query distribution has shifted.
The Bottom Line
RAG is the architecture that made enterprise AI practical. It solves the three problems that kill LLM deployments in production: stale knowledge, hallucinations, and untraceability.
The pattern is simple. The engineering decisions — chunking strategy, retrieval method, re-ranking, context assembly — are where the craft lives.
Build it right and you get a system that's always current, always grounded, and always honest about what it knows.