Why Private RAG Matters in Healthcare & Enterprise
In 2026, data privacy isn't just a checkbox — it's a fundamental requirement. When building Retrieval-Augmented Generation (RAG) systems for healthcare, finance, or enterprise applications, sending sensitive data to third-party APIs isn't an option. This is where private, self-hosted RAG becomes critical.
In this comprehensive guide, I'll show you how to build a production-ready RAG system using Django, PostgreSQL with pgvector, and Hugging Face embeddings — all running on your own infrastructure.
What We're Building
A complete RAG pipeline that:
- Keeps all data on your servers (HIPAA/GDPR compliant)
- Uses pgvector for blazing-fast similarity search
- Leverages Hugging Face embeddings (no API keys needed)
- Scales to millions of documents
- Integrates seamlessly with Django ORM
Architecture Overview
User Query → Django View → Embedding Model → pgvector Search → Context Retrieval → LLM ResponsePrerequisites
# System requirements
Python 3.10+
PostgreSQL 15+ with pgvector extension
Django 5.0+Step 1: PostgreSQL Setup with pgvector
First, let's install and configure pgvector:
# Install PostgreSQL and development headers
sudo apt-get install postgresql postgresql-contrib postgresql-server-dev-15
# Clone and install pgvector
git clone https://github.com/pgvector/pgvector.git
cd pgvector
make
sudo make install
# Connect to PostgreSQL
sudo -u postgres psql
# Enable extension in your database
CREATE EXTENSION IF NOT EXISTS vector;Database Configuration
# settings.py
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': 'rag_db',
'USER': 'postgres',
'PASSWORD': 'your_password',
'HOST': 'localhost',
'PORT': '5432',
}
}
INSTALLED_APPS = [
# ... other apps
'pgvector',
'your_rag_app',
]Step 2: Django Models for Document Storage
Create models to store documents and their embeddings:
# models.py
from django.db import models
from pgvector.django import VectorField
class Document(models.Model):
"""Stores original documents with metadata"""
title = models.CharField(max_length=500)
content = models.TextField()
source = models.CharField(max_length=255, blank=True)
document_type = models.CharField(max_length=50)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
# Metadata for filtering
tags = models.JSONField(default=list, blank=True)
metadata = models.JSONField(default=dict, blank=True)
class Meta:
db_table = 'documents'
indexes = [
models.Index(fields=['document_type', 'is_active']),
models.Index(fields=['created_at']),
]
def __str__(self):
return f"{self.title} ({self.document_type})"
class DocumentChunk(models.Model):
"""Stores document chunks with embeddings for RAG"""
document = models.ForeignKey(
Document,
on_delete=models.CASCADE,
related_name='chunks'
)
chunk_text = models.TextField()
chunk_index = models.IntegerField()
# Vector embedding (384 dimensions for all-MiniLM-L6-v2)
embedding = VectorField(dimensions=384)
# Token count for context management
token_count = models.IntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'document_chunks'
indexes = [
models.Index(fields=['document', 'chunk_index']),
]
# Important: Create vector index for fast similarity search
# Add this in a migration:
# migrations.RunSQL(
# "CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);"
# )
def __str__(self):
return f"Chunk {self.chunk_index} of {self.document.title}"
class QueryHistory(models.Model):
"""Track queries for analytics and improvement"""
query_text = models.TextField()
query_embedding = VectorField(dimensions=384)
response = models.TextField(blank=True)
retrieved_chunks = models.JSONField(default=list)
relevance_score = models.FloatField(null=True, blank=True)
user_feedback = models.IntegerField(null=True, blank=True) # 1-5 rating
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'query_history'
indexes = [
models.Index(fields=['-created_at']),
]Migration for Vector Index
# migrations/0002_add_vector_index.py
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('your_rag_app', '0001_initial'),
]
operations = [
migrations.RunSQL(
# IVFFlat index for fast approximate nearest neighbor search
sql="""
CREATE INDEX document_chunks_embedding_idx
ON document_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
""",
reverse_sql="DROP INDEX IF EXISTS document_chunks_embedding_idx;"
),
]Step 3: Embedding Service with Hugging Face
Create a service to generate embeddings locally:
# services/embedding_service.py
from sentence_transformers import SentenceTransformer
from typing import List, Union
import numpy as np
from django.conf import settings
import logging
logger = logging.getLogger(__name__)
class EmbeddingService:
"""
Handles text embedding generation using Hugging Face models.
Runs entirely on your infrastructure—no API calls needed.
"""
def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
"""
Initialize the embedding model.
Popular models:
- all-MiniLM-L6-v2: Fast, 384 dims, great for most use cases
- all-mpnet-base-v2: Slower, 768 dims, higher quality
- multi-qa-MiniLM-L6-cos-v1: Optimized for question-answering
"""
self.model_name = model_name
self._model = None
self.dimension = 384 if "MiniLM-L6" in model_name else 768
@property
def model(self):
"""Lazy load the model to save memory"""
if self._model is None:
logger.info(f"Loading embedding model: {self.model_name}")
self._model = SentenceTransformer(self.model_name)
logger.info("Model loaded successfully")
return self._model
def generate_embedding(self, text: str) -> List[float]:
"""Generate embedding for a single text"""
try:
embedding = self.model.encode(text, convert_to_numpy=True)
return embedding.tolist()
except Exception as e:
logger.error(f"Error generating embedding: {e}")
raise
def generate_embeddings_batch(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
"""Generate embeddings for multiple texts efficiently"""
try:
embeddings = self.model.encode(
texts,
convert_to_numpy=True,
batch_size=batch_size,
show_progress_bar=True
)
return embeddings.tolist()
except Exception as e:
logger.error(f"Error generating batch embeddings: {e}")
raise
def similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
"""Calculate cosine similarity between two embeddings"""
vec1 = np.array(embedding1)
vec2 = np.array(embedding2)
return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))
# Singleton instance
embedding_service = EmbeddingService()Step 4: Document Chunking Strategy
Intelligent document chunking is crucial for RAG performance:
# services/chunking_service.py
from typing import List, Dict
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter
class ChunkingService:
"""
Handles intelligent document chunking with overlap for context preservation.
"""
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
"""
Args:
chunk_size: Target size in characters
chunk_overlap: Overlap between chunks to preserve context
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# RecursiveCharacterTextSplitter tries to split on natural boundaries
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
separators=["\n\n", "\n", ". ", " ", ""],
length_function=len,
)
def chunk_document(self, text: str, metadata: Dict = None) -> List[Dict]:
"""
Chunk a document into smaller pieces with metadata.
Returns:
List of dicts with 'text', 'index', and 'metadata'
"""
chunks = self.text_splitter.split_text(text)
chunked_data = []
for idx, chunk_text in enumerate(chunks):
chunk_data = {
'text': chunk_text.strip(),
'index': idx,
'token_count': len(chunk_text.split()),
'metadata': metadata or {}
}
chunked_data.append(chunk_data)
return chunked_data
def chunk_with_semantic_splitting(self, text: str) -> List[str]:
"""
Advanced: Split on semantic boundaries (paragraphs, sections)
"""
# Split by double newlines (paragraphs)
paragraphs = text.split('\n\n')
chunks = []
current_chunk = ""
for para in paragraphs:
para = para.strip()
if not para:
continue
# If adding this paragraph exceeds chunk_size, save current chunk
if len(current_chunk) + len(para) > self.chunk_size and current_chunk:
chunks.append(current_chunk.strip())
current_chunk = para
else:
current_chunk += "\n\n" + para if current_chunk else para
# Add remaining text
if current_chunk:
chunks.append(current_chunk.strip())
return chunks
chunking_service = ChunkingService()Step 5: Document Ingestion Pipeline
Create a service to ingest and process documents:
# services/ingestion_service.py
from django.db import transaction
from typing import List, Dict
import logging
from ..models import Document, DocumentChunk
from .embedding_service import embedding_service
from .chunking_service import chunking_service
logger = logging.getLogger(__name__)
class IngestionService:
"""
Handles document ingestion, chunking, and embedding generation.
"""
@transaction.atomic
def ingest_document(self, title: str, content: str, document_type: str,
metadata: Dict = None, tags: List[str] = None) -> Document:
"""
Ingest a single document with automatic chunking and embedding.
Args:
title: Document title
content: Full document text
document_type: Type classification (e.g., 'medical_record', 'policy')
metadata: Additional metadata
tags: List of tags for filtering
Returns:
Created Document instance with embedded chunks
"""
try:
# Create document
document = Document.objects.create(
title=title,
content=content,
document_type=document_type,
metadata=metadata or {},
tags=tags or []
)
logger.info(f"Created document: {document.id} - {title}")
# Chunk the document
chunks = chunking_service.chunk_document(content, metadata)
logger.info(f"Generated {len(chunks)} chunks")
# Generate embeddings for all chunks
chunk_texts = [chunk['text'] for chunk in chunks]
embeddings = embedding_service.generate_embeddings_batch(chunk_texts)
# Create DocumentChunk instances
chunk_objects = []
for chunk_data, embedding in zip(chunks, embeddings):
chunk_obj = DocumentChunk(
document=document,
chunk_text=chunk_data['text'],
chunk_index=chunk_data['index'],
embedding=embedding,
token_count=chunk_data['token_count']
)
chunk_objects.append(chunk_obj)
# Bulk create for efficiency
DocumentChunk.objects.bulk_create(chunk_objects)
logger.info(f"Created {len(chunk_objects)} chunk embeddings")
return document
except Exception as e:
logger.error(f"Error ingesting document: {e}")
raise
@transaction.atomic
def ingest_batch(self, documents: List[Dict]) -> List[Document]:
"""
Ingest multiple documents in batch.
Args:
documents: List of dicts with 'title', 'content', 'document_type', etc.
"""
created_docs = []
for doc_data in documents:
try:
doc = self.ingest_document(**doc_data)
created_docs.append(doc)
except Exception as e:
logger.error(f"Failed to ingest document {doc_data.get('title')}: {e}")
continue
return created_docs
@transaction.atomic
def update_document(self, document_id: int, content: str) -> Document:
"""
Update document content and regenerate embeddings.
"""
document = Document.objects.get(id=document_id)
# Delete old chunks
document.chunks.all().delete()
# Update content
document.content = content
document.save()
# Regenerate chunks and embeddings
chunks = chunking_service.chunk_document(content)
chunk_texts = [chunk['text'] for chunk in chunks]
embeddings = embedding_service.generate_embeddings_batch(chunk_texts)
chunk_objects = [
DocumentChunk(
document=document,
chunk_text=chunk_data['text'],
chunk_index=chunk_data['index'],
embedding=embedding,
token_count=chunk_data['token_count']
)
for chunk_data, embedding in zip(chunks, embeddings)
]
DocumentChunk.objects.bulk_create(chunk_objects)
return document
ingestion_service = IngestionService()Step 6: RAG Retrieval Service
The core RAG retrieval logic:
# services/retrieval_service.py
from typing import List, Dict, Optional
from django.db.models import F
from pgvector.django import CosineDistance
from ..models import DocumentChunk, QueryHistory
from .embedding_service import embedding_service
import logging
logger = logging.getLogger(__name__)
class RetrievalService:
"""
Handles semantic search and context retrieval for RAG.
"""
def __init__(self, top_k: int = 5, similarity_threshold: float = 0.7):
"""
Args:
top_k: Number of chunks to retrieve
similarity_threshold: Minimum similarity score (0-1)
"""
self.top_k = top_k
self.similarity_threshold = similarity_threshold
def retrieve_context(self, query: str, document_type: Optional[str] = None,
tags: Optional[List[str]] = None,
save_history: bool = True) -> Dict:
"""
Retrieve relevant context for a query using semantic search.
Args:
query: User query text
document_type: Optional filter by document type
tags: Optional filter by tags
save_history: Whether to save query to history
Returns:
Dict with 'chunks', 'query_embedding', 'scores'
"""
try:
# Generate query embedding
query_embedding = embedding_service.generate_embedding(query)
# Build queryset with filters
queryset = DocumentChunk.objects.filter(
document__is_active=True
).select_related('document')
if document_type:
queryset = queryset.filter(document__document_type=document_type)
if tags:
# Filter documents that have ANY of the specified tags
queryset = queryset.filter(document__tags__overlap=tags)
# Perform vector similarity search
similar_chunks = queryset.annotate(
distance=CosineDistance('embedding', query_embedding)
).order_by('distance')[:self.top_k]
# Filter by similarity threshold
# Note: CosineDistance returns distance (lower is better)
# Convert to similarity: similarity = 1 - distance
results = []
for chunk in similar_chunks:
similarity = 1 - chunk.distance
if similarity >= self.similarity_threshold:
results.append({
'chunk_id': chunk.id,
'document_id': chunk.document.id,
'document_title': chunk.document.title,
'text': chunk.chunk_text,
'similarity': similarity,
'metadata': chunk.document.metadata,
'chunk_index': chunk.chunk_index,
})
# Save to query history
if save_history:
QueryHistory.objects.create(
query_text=query,
query_embedding=query_embedding,
retrieved_chunks=[r['chunk_id'] for r in results]
)
logger.info(f"Retrieved {len(results)} chunks for query: {query[:50]}...")
return {
'chunks': results,
'query_embedding': query_embedding,
'query': query
}
except Exception as e:
logger.error(f"Error during retrieval: {e}")
raise
def hybrid_search(self, query: str, top_k: int = None) -> List[Dict]:
"""
Hybrid search combining vector similarity + keyword matching.
Useful for queries with specific terms.
"""
if top_k is None:
top_k = self.top_k
query_embedding = embedding_service.generate_embedding(query)
# Vector search
vector_results = DocumentChunk.objects.filter(
document__is_active=True
).annotate(
distance=CosineDistance('embedding', query_embedding)
).order_by('distance')[:top_k * 2]
# Keyword search (full-text search in PostgreSQL)
from django.contrib.postgres.search import SearchQuery, SearchRank, SearchVector
search_vector = SearchVector('chunk_text', weight='A') + SearchVector('document__title', weight='B')
search_query = SearchQuery(query)
keyword_results = DocumentChunk.objects.filter(
document__is_active=True
).annotate(
rank=SearchRank(search_vector, search_query)
).filter(rank__gte=0.1).order_by('-rank')[:top_k * 2]
# Combine and deduplicate
combined_ids = set()
combined_results = []
for chunk in vector_results:
if chunk.id not in combined_ids:
combined_ids.add(chunk.id)
similarity = 1 - chunk.distance
combined_results.append({
'chunk_id': chunk.id,
'text': chunk.chunk_text,
'similarity': similarity,
'document_title': chunk.document.title,
'score_type': 'vector'
})
for chunk in keyword_results:
if chunk.id not in combined_ids:
combined_ids.add(chunk.id)
combined_results.append({
'chunk_id': chunk.id,
'text': chunk.chunk_text,
'similarity': float(chunk.rank),
'document_title': chunk.document.title,
'score_type': 'keyword'
})
# Re-sort by combined score
combined_results.sort(key=lambda x: x['similarity'], reverse=True)
return combined_results[:top_k]
def get_similar_queries(self, query: str, top_k: int = 5) -> List[Dict]:
"""
Find similar past queries for analytics or suggestion.
"""
query_embedding = embedding_service.generate_embedding(query)
similar_queries = QueryHistory.objects.annotate(
distance=CosineDistance('query_embedding', query_embedding)
).order_by('distance')[:top_k]
return [
{
'query': q.query_text,
'similarity': 1 - q.distance,
'created_at': q.created_at,
'response': q.response
}
for q in similar_queries
]
retrieval_service = RetrievalService()Step 7: Django REST API Views
Create API endpoints for the RAG system:
# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from django.core.paginator import Paginator
from .services.ingestion_service import ingestion_service
from .services.retrieval_service import retrieval_service
from .models import Document, DocumentChunk
from .serializers import DocumentSerializer, ChunkSerializer
class DocumentIngestionView(APIView):
"""
API endpoint for ingesting documents.
POST /api/documents/ingest/
"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""
Ingest a new document.
Body:
{
"title": "Patient Medical History",
"content": "Full document text...",
"document_type": "medical_record",
"tags": ["cardiology", "patient-123"],
"metadata": {"patient_id": "123", "date": "2026-01-01"}
}
"""
try:
title = request.data.get('title')
content = request.data.get('content')
document_type = request.data.get('document_type', 'general')
tags = request.data.get('tags', [])
metadata = request.data.get('metadata', {})
if not title or not content:
return Response(
{'error': 'Title and content are required'},
status=status.HTTP_400_BAD_REQUEST
)
document = ingestion_service.ingest_document(
title=title,
content=content,
document_type=document_type,
tags=tags,
metadata=metadata
)
serializer = DocumentSerializer(document)
return Response({
'message': 'Document ingested successfully',
'document': serializer.data
}, status=status.HTTP_201_CREATED)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class BatchIngestionView(APIView):
"""
API endpoint for batch document ingestion.
POST /api/documents/ingest-batch/
"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""
Ingest multiple documents.
Body:
{
"documents": [
{"title": "...", "content": "...", "document_type": "..."},
...
]
}
"""
try:
documents = request.data.get('documents', [])
if not documents:
return Response(
{'error': 'No documents provided'},
status=status.HTTP_400_BAD_REQUEST
)
created_docs = ingestion_service.ingest_batch(documents)
return Response({
'message': f'Successfully ingested {len(created_docs)} documents',
'document_count': len(created_docs)
}, status=status.HTTP_201_CREATED)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class RAGQueryView(APIView):
"""
API endpoint for RAG queries.
POST /api/rag/query/
"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""
Perform RAG retrieval for a query.
Body:
{
"query": "What are the symptoms of hypertension?",
"document_type": "medical_record", # optional
"tags": ["cardiology"], # optional
"top_k": 5 # optional
}
"""
try:
query = request.data.get('query')
document_type = request.data.get('document_type')
tags = request.data.get('tags')
top_k = request.data.get('top_k', 5)
if not query:
return Response(
{'error': 'Query is required'},
status=status.HTTP_400_BAD_REQUEST
)
# Override default top_k if provided
original_top_k = retrieval_service.top_k
retrieval_service.top_k = top_k
results = retrieval_service.retrieve_context(
query=query,
document_type=document_type,
tags=tags
)
# Restore original top_k
retrieval_service.top_k = original_top_k
return Response({
'query': query,
'results': results['chunks'],
'retrieved_count': len(results['chunks'])
}, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class HybridSearchView(APIView):
"""
API endpoint for hybrid search.
POST /api/rag/hybrid-search/
"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""
Perform hybrid search (vector + keyword).
"""
try:
query = request.data.get('query')
top_k = request.data.get('top_k', 5)
if not query:
return Response(
{'error': 'Query is required'},
status=status.HTTP_400_BAD_REQUEST
)
results = retrieval_service.hybrid_search(query, top_k)
return Response({
'query': query,
'results': results,
'retrieved_count': len(results)
}, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class DocumentListView(APIView):
"""
API endpoint to list documents.
GET /api/documents/
"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""
List all documents with pagination and filtering.
Query params:
- page: Page number
- page_size: Items per page
- document_type: Filter by type
- tags: Filter by tags (comma-separated)
"""
try:
page_num = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 20))
document_type = request.query_params.get('document_type')
tags = request.query_params.get('tags')
queryset = Document.objects.filter(is_active=True)
if document_type:
queryset = queryset.filter(document_type=document_type)
if tags:
tag_list = [t.strip() for t in tags.split(',')]
queryset = queryset.filter(tags__overlap=tag_list)
queryset = queryset.order_by('-created_at')
paginator = Paginator(queryset, page_size)
page = paginator.get_page(page_num)
serializer = DocumentSerializer(page, many=True)
return Response({
'count': paginator.count,
'total_pages': paginator.num_pages,
'current_page': page_num,
'results': serializer.data
}, status=status.HTTP_200_OK)
except Exception as e:
return Response(
{'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR
)
class DocumentDetailView(APIView):
"""
API endpoint for document details.
GET /api/documents/<id>/
"""
permission_classes = [IsAuthenticated]
def get(self, request, document_id):
"""Get document with all chunks."""
try:
document = Document.objects.prefetch_related('chunks').get(id=document_id)
serializer = DocumentSerializer(document)
return Response(serializer.data, status=status.HTTP_200_OK)
except Document.DoesNotExist:
return Response(
{'error': 'Document not found'},
status=status.HTTP_404_NOT_FOUND
)
def delete(self, request, document_id):
"""Soft delete a document."""
try:
document = Document.objects.get(id=document_id)
document.is_active = False
document.save()
return Response(
{'message': 'Document deleted successfully'},
status=status.HTTP_200_OK
)
except Document.DoesNotExist:
return Response(
{'error': 'Document not found'},
status=status.HTTP_404_NOT_FOUND
)Step 8: Serializers
# serializers.py
from rest_framework import serializers
from .models import Document, DocumentChunk
class ChunkSerializer(serializers.ModelSerializer):
class Meta:
model = DocumentChunk
fields = ['id', 'chunk_text', 'chunk_index', 'token_count']
class DocumentSerializer(serializers.ModelSerializer):
chunk_count = serializers.SerializerMethodField()
chunks = ChunkSerializer(many=True, read_only=True)
class Meta:
model = Document
fields = [
'id', 'title', 'content', 'source', 'document_type',
'tags', 'metadata', 'is_active', 'created_at', 'updated_at',
'chunk_count', 'chunks'
]
def get_chunk_count(self, obj):
return obj.chunks.count()Step 9: URL Configuration
# urls.py
from django.urls import path
from .views import (
DocumentIngestionView,
BatchIngestionView,
RAGQueryView,
HybridSearchView,
DocumentListView,
DocumentDetailView
)
urlpatterns = [
# Ingestion endpoints
path('documents/ingest/', DocumentIngestionView.as_view(), name='ingest-document'),
path('documents/ingest-batch/', BatchIngestionView.as_view(), name='ingest-batch'),
# Query endpoints
path('rag/query/', RAGQueryView.as_view(), name='rag-query'),
path('rag/hybrid-search/', HybridSearchView.as_view(), name='hybrid-search'),
# Document management
path('documents/', DocumentListView.as_view(), name='document-list'),
path('documents/<int:document_id>/', DocumentDetailView.as_view(), name='document-detail'),
]Step 10: Requirements & Dependencies
# requirements.txt
Django==5.0.1
djangorestframework==3.14.0
psycopg2-binary==2.9.9
pgvector==0.2.4
sentence-transformers==2.3.1
torch==2.1.2
transformers==4.36.2
langchain==0.1.0
numpy==1.26.3Performance Optimization Tips
1. Vector Index Tuning
-- IVFFlat index (faster inserts, approximate search)
CREATE INDEX ON document_chunks
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- For exact search (slower but accurate)
CREATE INDEX ON document_chunks
USING hnsw (embedding vector_cosine_ops);2. Batch Processing
# Process large documents in batches
def ingest_large_document(file_path, batch_size=100):
with open(file_path, 'r') as f:
content = f.read()
chunks = chunking_service.chunk_document(content)
for i in range(0, len(chunks), batch_size):
batch = chunks[i:i + batch_size]
# Process batch...3. Caching Embeddings
# Use Redis for query embedding cache
from django.core.cache import cache
def get_cached_embedding(text):
cache_key = f"embedding:{hash(text)}"
embedding = cache.get(cache_key)
if embedding is None:
embedding = embedding_service.generate_embedding(text)
cache.set(cache_key, embedding, timeout=3600)
return embeddingTesting Your RAG System
# tests/test_rag.py
from django.test import TestCase
from ..services.ingestion_service import ingestion_service
from ..services.retrieval_service import retrieval_service
class RAGSystemTest(TestCase):
def setUp(self):
# Ingest test documents
self.doc1 = ingestion_service.ingest_document(
title="Hypertension Guidelines",
content="Hypertension is defined as blood pressure >140/90...",
document_type="medical_guideline"
)
def test_retrieval(self):
results = retrieval_service.retrieve_context(
query="What is high blood pressure?"
)
self.assertGreater(len(results['chunks']), 0)
self.assertIn('similarity', results['chunks'][0])
def test_filtering(self):
results = retrieval_service.retrieve_context(
query="treatment guidelines",
document_type="medical_guideline"
)
for chunk in results['chunks']:
self.assertEqual(chunk['document_type'], 'medical_guideline')Production Deployment Checklist
Database
- Enable pgvector extension
- Create appropriate indexes
- Set up connection pooling (pgBouncer)
Model Management
- Download and cache Hugging Face models
- Use GPU if available (CUDA)
- Implement model versioning
Monitoring
- Track query latency
- Monitor similarity scores
- Log retrieval failures
Security
- Implement rate limiting
- Validate input sanitization
- Encrypt embeddings at rest
- Use HTTPS for all endpoints
Scaling
- Use Celery for async ingestion
- Implement read replicas for queries
- Consider vector database sharding
Real-World Use Cases
Healthcare Example
# Ingest patient records
ingestion_service.ingest_document(
title="Patient Chart - John Doe",
content="Patient presents with chest pain...",
document_type="medical_record",
tags=["cardiology", "emergency"],
metadata={"patient_id": "P12345", "visit_date": "2026-01-15"}
)
# Query for similar cases
results = retrieval_service.retrieve_context(
query="chest pain with elevated troponin",
document_type="medical_record",
tags=["cardiology"]
)Legal Document Search
# Ingest contracts
ingestion_service.ingest_document(
title="Software License Agreement",
content="This agreement is made between...",
document_type="contract",
tags=["software", "licensing"]
)
# Find relevant clauses
results = retrieval_service.retrieve_context(
query="liability limitations in software contracts"
)Advanced Features to Add
- Re-ranking: Add a cross-encoder for better result ranking
- Query expansion: Use synonyms and related terms
- Multi-modal: Support images and PDFs with vision models
- Feedback loop: Learn from user interactions
- Version control: Track document changes
Conclusion
You now have a production-ready, privacy-first RAG system that:
- Runs entirely on your infrastructure
- Scales to millions of documents
- Provides fast semantic search
- Maintains HIPAA/GDPR compliance
The beauty of this architecture is its flexibility — swap out embedding models, add custom chunking strategies, or integrate with any LLM of your choice.
Resources
Have questions about implementing RAG in your Django application? Drop a comment below!