Why Private RAG Matters in Healthcare & Enterprise

In 2026, data privacy isn't just a checkbox — it's a fundamental requirement. When building Retrieval-Augmented Generation (RAG) systems for healthcare, finance, or enterprise applications, sending sensitive data to third-party APIs isn't an option. This is where private, self-hosted RAG becomes critical.

In this comprehensive guide, I'll show you how to build a production-ready RAG system using Django, PostgreSQL with pgvector, and Hugging Face embeddings — all running on your own infrastructure.

What We're Building

A complete RAG pipeline that:

  • Keeps all data on your servers (HIPAA/GDPR compliant)
  • Uses pgvector for blazing-fast similarity search
  • Leverages Hugging Face embeddings (no API keys needed)
  • Scales to millions of documents
  • Integrates seamlessly with Django ORM

Architecture Overview

User Query → Django View → Embedding Model → pgvector Search → Context Retrieval → LLM Response

Prerequisites

# System requirements
Python 3.10+
PostgreSQL 15+ with pgvector extension
Django 5.0+

Step 1: PostgreSQL Setup with pgvector

First, let's install and configure pgvector:

# Install PostgreSQL and development headers
sudo apt-get install postgresql postgresql-contrib postgresql-server-dev-15
# Clone and install pgvector
git clone https://github.com/pgvector/pgvector.git
cd pgvector
make
sudo make install
# Connect to PostgreSQL
sudo -u postgres psql
# Enable extension in your database
CREATE EXTENSION IF NOT EXISTS vector;

Database Configuration

# settings.py
DATABASES = {
    'default': {
        'ENGINE': 'django.db.backends.postgresql',
        'NAME': 'rag_db',
        'USER': 'postgres',
        'PASSWORD': 'your_password',
        'HOST': 'localhost',
        'PORT': '5432',
    }
}
INSTALLED_APPS = [
    # ... other apps
    'pgvector',
    'your_rag_app',
]

Step 2: Django Models for Document Storage

Create models to store documents and their embeddings:

# models.py
from django.db import models
from pgvector.django import VectorField
class Document(models.Model):
    """Stores original documents with metadata"""
    title = models.CharField(max_length=500)
    content = models.TextField()
    source = models.CharField(max_length=255, blank=True)
    document_type = models.CharField(max_length=50)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)
    is_active = models.BooleanField(default=True)
    
    # Metadata for filtering
    tags = models.JSONField(default=list, blank=True)
    metadata = models.JSONField(default=dict, blank=True)
    
    class Meta:
        db_table = 'documents'
        indexes = [
            models.Index(fields=['document_type', 'is_active']),
            models.Index(fields=['created_at']),
        ]
    
    def __str__(self):
        return f"{self.title} ({self.document_type})"

class DocumentChunk(models.Model):
    """Stores document chunks with embeddings for RAG"""
    document = models.ForeignKey(
        Document, 
        on_delete=models.CASCADE, 
        related_name='chunks'
    )
    chunk_text = models.TextField()
    chunk_index = models.IntegerField()
    
    # Vector embedding (384 dimensions for all-MiniLM-L6-v2)
    embedding = VectorField(dimensions=384)
    
    # Token count for context management
    token_count = models.IntegerField(default=0)
    
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'document_chunks'
        indexes = [
            models.Index(fields=['document', 'chunk_index']),
        ]
        # Important: Create vector index for fast similarity search
        # Add this in a migration:
        # migrations.RunSQL(
        #     "CREATE INDEX ON document_chunks USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);"
        # )
    
    def __str__(self):
        return f"Chunk {self.chunk_index} of {self.document.title}"

class QueryHistory(models.Model):
    """Track queries for analytics and improvement"""
    query_text = models.TextField()
    query_embedding = VectorField(dimensions=384)
    response = models.TextField(blank=True)
    retrieved_chunks = models.JSONField(default=list)
    relevance_score = models.FloatField(null=True, blank=True)
    user_feedback = models.IntegerField(null=True, blank=True)  # 1-5 rating
    created_at = models.DateTimeField(auto_now_add=True)
    
    class Meta:
        db_table = 'query_history'
        indexes = [
            models.Index(fields=['-created_at']),
        ]

Migration for Vector Index

# migrations/0002_add_vector_index.py
from django.db import migrations
class Migration(migrations.Migration):
    dependencies = [
        ('your_rag_app', '0001_initial'),
    ]
    operations = [
        migrations.RunSQL(
            # IVFFlat index for fast approximate nearest neighbor search
            sql="""
                CREATE INDEX document_chunks_embedding_idx 
                ON document_chunks 
                USING ivfflat (embedding vector_cosine_ops) 
                WITH (lists = 100);
            """,
            reverse_sql="DROP INDEX IF EXISTS document_chunks_embedding_idx;"
        ),
    ]

Step 3: Embedding Service with Hugging Face

Create a service to generate embeddings locally:

# services/embedding_service.py
from sentence_transformers import SentenceTransformer
from typing import List, Union
import numpy as np
from django.conf import settings
import logging
logger = logging.getLogger(__name__)

class EmbeddingService:
    """
    Handles text embedding generation using Hugging Face models.
    Runs entirely on your infrastructure—no API calls needed.
    """
    
    def __init__(self, model_name: str = "sentence-transformers/all-MiniLM-L6-v2"):
        """
        Initialize the embedding model.
        
        Popular models:
        - all-MiniLM-L6-v2: Fast, 384 dims, great for most use cases
        - all-mpnet-base-v2: Slower, 768 dims, higher quality
        - multi-qa-MiniLM-L6-cos-v1: Optimized for question-answering
        """
        self.model_name = model_name
        self._model = None
        self.dimension = 384 if "MiniLM-L6" in model_name else 768
    
    @property
    def model(self):
        """Lazy load the model to save memory"""
        if self._model is None:
            logger.info(f"Loading embedding model: {self.model_name}")
            self._model = SentenceTransformer(self.model_name)
            logger.info("Model loaded successfully")
        return self._model
    
    def generate_embedding(self, text: str) -> List[float]:
        """Generate embedding for a single text"""
        try:
            embedding = self.model.encode(text, convert_to_numpy=True)
            return embedding.tolist()
        except Exception as e:
            logger.error(f"Error generating embedding: {e}")
            raise
    
    def generate_embeddings_batch(self, texts: List[str], batch_size: int = 32) -> List[List[float]]:
        """Generate embeddings for multiple texts efficiently"""
        try:
            embeddings = self.model.encode(
                texts, 
                convert_to_numpy=True,
                batch_size=batch_size,
                show_progress_bar=True
            )
            return embeddings.tolist()
        except Exception as e:
            logger.error(f"Error generating batch embeddings: {e}")
            raise
    
    def similarity(self, embedding1: List[float], embedding2: List[float]) -> float:
        """Calculate cosine similarity between two embeddings"""
        vec1 = np.array(embedding1)
        vec2 = np.array(embedding2)
        return float(np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2)))

# Singleton instance
embedding_service = EmbeddingService()

Step 4: Document Chunking Strategy

Intelligent document chunking is crucial for RAG performance:

# services/chunking_service.py
from typing import List, Dict
import re
from langchain.text_splitter import RecursiveCharacterTextSplitter

class ChunkingService:
    """
    Handles intelligent document chunking with overlap for context preservation.
    """
    
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        """
        Args:
            chunk_size: Target size in characters
            chunk_overlap: Overlap between chunks to preserve context
        """
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # RecursiveCharacterTextSplitter tries to split on natural boundaries
        self.text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=["\n\n", "\n", ". ", " ", ""],
            length_function=len,
        )
    
    def chunk_document(self, text: str, metadata: Dict = None) -> List[Dict]:
        """
        Chunk a document into smaller pieces with metadata.
        
        Returns:
            List of dicts with 'text', 'index', and 'metadata'
        """
        chunks = self.text_splitter.split_text(text)
        
        chunked_data = []
        for idx, chunk_text in enumerate(chunks):
            chunk_data = {
                'text': chunk_text.strip(),
                'index': idx,
                'token_count': len(chunk_text.split()),
                'metadata': metadata or {}
            }
            chunked_data.append(chunk_data)
        
        return chunked_data
    
    def chunk_with_semantic_splitting(self, text: str) -> List[str]:
        """
        Advanced: Split on semantic boundaries (paragraphs, sections)
        """
        # Split by double newlines (paragraphs)
        paragraphs = text.split('\n\n')
        
        chunks = []
        current_chunk = ""
        
        for para in paragraphs:
            para = para.strip()
            if not para:
                continue
            
            # If adding this paragraph exceeds chunk_size, save current chunk
            if len(current_chunk) + len(para) > self.chunk_size and current_chunk:
                chunks.append(current_chunk.strip())
                current_chunk = para
            else:
                current_chunk += "\n\n" + para if current_chunk else para
        
        # Add remaining text
        if current_chunk:
            chunks.append(current_chunk.strip())
        
        return chunks

chunking_service = ChunkingService()

Step 5: Document Ingestion Pipeline

Create a service to ingest and process documents:

# services/ingestion_service.py
from django.db import transaction
from typing import List, Dict
import logging
from ..models import Document, DocumentChunk
from .embedding_service import embedding_service
from .chunking_service import chunking_service
logger = logging.getLogger(__name__)

class IngestionService:
    """
    Handles document ingestion, chunking, and embedding generation.
    """
    
    @transaction.atomic
    def ingest_document(self, title: str, content: str, document_type: str, 
                       metadata: Dict = None, tags: List[str] = None) -> Document:
        """
        Ingest a single document with automatic chunking and embedding.
        
        Args:
            title: Document title
            content: Full document text
            document_type: Type classification (e.g., 'medical_record', 'policy')
            metadata: Additional metadata
            tags: List of tags for filtering
        
        Returns:
            Created Document instance with embedded chunks
        """
        try:
            # Create document
            document = Document.objects.create(
                title=title,
                content=content,
                document_type=document_type,
                metadata=metadata or {},
                tags=tags or []
            )
            
            logger.info(f"Created document: {document.id} - {title}")
            
            # Chunk the document
            chunks = chunking_service.chunk_document(content, metadata)
            logger.info(f"Generated {len(chunks)} chunks")
            
            # Generate embeddings for all chunks
            chunk_texts = [chunk['text'] for chunk in chunks]
            embeddings = embedding_service.generate_embeddings_batch(chunk_texts)
            
            # Create DocumentChunk instances
            chunk_objects = []
            for chunk_data, embedding in zip(chunks, embeddings):
                chunk_obj = DocumentChunk(
                    document=document,
                    chunk_text=chunk_data['text'],
                    chunk_index=chunk_data['index'],
                    embedding=embedding,
                    token_count=chunk_data['token_count']
                )
                chunk_objects.append(chunk_obj)
            
            # Bulk create for efficiency
            DocumentChunk.objects.bulk_create(chunk_objects)
            logger.info(f"Created {len(chunk_objects)} chunk embeddings")
            
            return document
            
        except Exception as e:
            logger.error(f"Error ingesting document: {e}")
            raise
    
    @transaction.atomic
    def ingest_batch(self, documents: List[Dict]) -> List[Document]:
        """
        Ingest multiple documents in batch.
        
        Args:
            documents: List of dicts with 'title', 'content', 'document_type', etc.
        """
        created_docs = []
        for doc_data in documents:
            try:
                doc = self.ingest_document(**doc_data)
                created_docs.append(doc)
            except Exception as e:
                logger.error(f"Failed to ingest document {doc_data.get('title')}: {e}")
                continue
        
        return created_docs
    
    @transaction.atomic
    def update_document(self, document_id: int, content: str) -> Document:
        """
        Update document content and regenerate embeddings.
        """
        document = Document.objects.get(id=document_id)
        
        # Delete old chunks
        document.chunks.all().delete()
        
        # Update content
        document.content = content
        document.save()
        
        # Regenerate chunks and embeddings
        chunks = chunking_service.chunk_document(content)
        chunk_texts = [chunk['text'] for chunk in chunks]
        embeddings = embedding_service.generate_embeddings_batch(chunk_texts)
        
        chunk_objects = [
            DocumentChunk(
                document=document,
                chunk_text=chunk_data['text'],
                chunk_index=chunk_data['index'],
                embedding=embedding,
                token_count=chunk_data['token_count']
            )
            for chunk_data, embedding in zip(chunks, embeddings)
        ]
        
        DocumentChunk.objects.bulk_create(chunk_objects)
        
        return document

ingestion_service = IngestionService()

Step 6: RAG Retrieval Service

The core RAG retrieval logic:

# services/retrieval_service.py
from typing import List, Dict, Optional
from django.db.models import F
from pgvector.django import CosineDistance
from ..models import DocumentChunk, QueryHistory
from .embedding_service import embedding_service
import logging
logger = logging.getLogger(__name__)

class RetrievalService:
    """
    Handles semantic search and context retrieval for RAG.
    """
    
    def __init__(self, top_k: int = 5, similarity_threshold: float = 0.7):
        """
        Args:
            top_k: Number of chunks to retrieve
            similarity_threshold: Minimum similarity score (0-1)
        """
        self.top_k = top_k
        self.similarity_threshold = similarity_threshold
    
    def retrieve_context(self, query: str, document_type: Optional[str] = None,
                        tags: Optional[List[str]] = None,
                        save_history: bool = True) -> Dict:
        """
        Retrieve relevant context for a query using semantic search.
        
        Args:
            query: User query text
            document_type: Optional filter by document type
            tags: Optional filter by tags
            save_history: Whether to save query to history
        
        Returns:
            Dict with 'chunks', 'query_embedding', 'scores'
        """
        try:
            # Generate query embedding
            query_embedding = embedding_service.generate_embedding(query)
            
            # Build queryset with filters
            queryset = DocumentChunk.objects.filter(
                document__is_active=True
            ).select_related('document')
            
            if document_type:
                queryset = queryset.filter(document__document_type=document_type)
            
            if tags:
                # Filter documents that have ANY of the specified tags
                queryset = queryset.filter(document__tags__overlap=tags)
            
            # Perform vector similarity search
            similar_chunks = queryset.annotate(
                distance=CosineDistance('embedding', query_embedding)
            ).order_by('distance')[:self.top_k]
            
            # Filter by similarity threshold
            # Note: CosineDistance returns distance (lower is better)
            # Convert to similarity: similarity = 1 - distance
            results = []
            for chunk in similar_chunks:
                similarity = 1 - chunk.distance
                if similarity >= self.similarity_threshold:
                    results.append({
                        'chunk_id': chunk.id,
                        'document_id': chunk.document.id,
                        'document_title': chunk.document.title,
                        'text': chunk.chunk_text,
                        'similarity': similarity,
                        'metadata': chunk.document.metadata,
                        'chunk_index': chunk.chunk_index,
                    })
            
            # Save to query history
            if save_history:
                QueryHistory.objects.create(
                    query_text=query,
                    query_embedding=query_embedding,
                    retrieved_chunks=[r['chunk_id'] for r in results]
                )
            
            logger.info(f"Retrieved {len(results)} chunks for query: {query[:50]}...")
            
            return {
                'chunks': results,
                'query_embedding': query_embedding,
                'query': query
            }
            
        except Exception as e:
            logger.error(f"Error during retrieval: {e}")
            raise
    
    def hybrid_search(self, query: str, top_k: int = None) -> List[Dict]:
        """
        Hybrid search combining vector similarity + keyword matching.
        Useful for queries with specific terms.
        """
        if top_k is None:
            top_k = self.top_k
        
        query_embedding = embedding_service.generate_embedding(query)
        
        # Vector search
        vector_results = DocumentChunk.objects.filter(
            document__is_active=True
        ).annotate(
            distance=CosineDistance('embedding', query_embedding)
        ).order_by('distance')[:top_k * 2]
        
        # Keyword search (full-text search in PostgreSQL)
        from django.contrib.postgres.search import SearchQuery, SearchRank, SearchVector
        
        search_vector = SearchVector('chunk_text', weight='A') + SearchVector('document__title', weight='B')
        search_query = SearchQuery(query)
        
        keyword_results = DocumentChunk.objects.filter(
            document__is_active=True
        ).annotate(
            rank=SearchRank(search_vector, search_query)
        ).filter(rank__gte=0.1).order_by('-rank')[:top_k * 2]
        
        # Combine and deduplicate
        combined_ids = set()
        combined_results = []
        
        for chunk in vector_results:
            if chunk.id not in combined_ids:
                combined_ids.add(chunk.id)
                similarity = 1 - chunk.distance
                combined_results.append({
                    'chunk_id': chunk.id,
                    'text': chunk.chunk_text,
                    'similarity': similarity,
                    'document_title': chunk.document.title,
                    'score_type': 'vector'
                })
        
        for chunk in keyword_results:
            if chunk.id not in combined_ids:
                combined_ids.add(chunk.id)
                combined_results.append({
                    'chunk_id': chunk.id,
                    'text': chunk.chunk_text,
                    'similarity': float(chunk.rank),
                    'document_title': chunk.document.title,
                    'score_type': 'keyword'
                })
        
        # Re-sort by combined score
        combined_results.sort(key=lambda x: x['similarity'], reverse=True)
        
        return combined_results[:top_k]
    
    def get_similar_queries(self, query: str, top_k: int = 5) -> List[Dict]:
        """
        Find similar past queries for analytics or suggestion.
        """
        query_embedding = embedding_service.generate_embedding(query)
        
        similar_queries = QueryHistory.objects.annotate(
            distance=CosineDistance('query_embedding', query_embedding)
        ).order_by('distance')[:top_k]
        
        return [
            {
                'query': q.query_text,
                'similarity': 1 - q.distance,
                'created_at': q.created_at,
                'response': q.response
            }
            for q in similar_queries
        ]

retrieval_service = RetrievalService()

Step 7: Django REST API Views

Create API endpoints for the RAG system:

# views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from django.core.paginator import Paginator
from .services.ingestion_service import ingestion_service
from .services.retrieval_service import retrieval_service
from .models import Document, DocumentChunk
from .serializers import DocumentSerializer, ChunkSerializer

class DocumentIngestionView(APIView):
    """
    API endpoint for ingesting documents.
    POST /api/documents/ingest/
    """
    permission_classes = [IsAuthenticated]
    
    def post(self, request):
        """
        Ingest a new document.
        
        Body:
        {
            "title": "Patient Medical History",
            "content": "Full document text...",
            "document_type": "medical_record",
            "tags": ["cardiology", "patient-123"],
            "metadata": {"patient_id": "123", "date": "2026-01-01"}
        }
        """
        try:
            title = request.data.get('title')
            content = request.data.get('content')
            document_type = request.data.get('document_type', 'general')
            tags = request.data.get('tags', [])
            metadata = request.data.get('metadata', {})
            
            if not title or not content:
                return Response(
                    {'error': 'Title and content are required'},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            document = ingestion_service.ingest_document(
                title=title,
                content=content,
                document_type=document_type,
                tags=tags,
                metadata=metadata
            )
            
            serializer = DocumentSerializer(document)
            return Response({
                'message': 'Document ingested successfully',
                'document': serializer.data
            }, status=status.HTTP_201_CREATED)
            
        except Exception as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class BatchIngestionView(APIView):
    """
    API endpoint for batch document ingestion.
    POST /api/documents/ingest-batch/
    """
    permission_classes = [IsAuthenticated]
    
    def post(self, request):
        """
        Ingest multiple documents.
        
        Body:
        {
            "documents": [
                {"title": "...", "content": "...", "document_type": "..."},
                ...
            ]
        }
        """
        try:
            documents = request.data.get('documents', [])
            
            if not documents:
                return Response(
                    {'error': 'No documents provided'},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            created_docs = ingestion_service.ingest_batch(documents)
            
            return Response({
                'message': f'Successfully ingested {len(created_docs)} documents',
                'document_count': len(created_docs)
            }, status=status.HTTP_201_CREATED)
            
        except Exception as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class RAGQueryView(APIView):
    """
    API endpoint for RAG queries.
    POST /api/rag/query/
    """
    permission_classes = [IsAuthenticated]
    
    def post(self, request):
        """
        Perform RAG retrieval for a query.
        
        Body:
        {
            "query": "What are the symptoms of hypertension?",
            "document_type": "medical_record",  # optional
            "tags": ["cardiology"],  # optional
            "top_k": 5  # optional
        }
        """
        try:
            query = request.data.get('query')
            document_type = request.data.get('document_type')
            tags = request.data.get('tags')
            top_k = request.data.get('top_k', 5)
            
            if not query:
                return Response(
                    {'error': 'Query is required'},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            # Override default top_k if provided
            original_top_k = retrieval_service.top_k
            retrieval_service.top_k = top_k
            
            results = retrieval_service.retrieve_context(
                query=query,
                document_type=document_type,
                tags=tags
            )
            
            # Restore original top_k
            retrieval_service.top_k = original_top_k
            
            return Response({
                'query': query,
                'results': results['chunks'],
                'retrieved_count': len(results['chunks'])
            }, status=status.HTTP_200_OK)
            
        except Exception as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class HybridSearchView(APIView):
    """
    API endpoint for hybrid search.
    POST /api/rag/hybrid-search/
    """
    permission_classes = [IsAuthenticated]
    
    def post(self, request):
        """
        Perform hybrid search (vector + keyword).
        """
        try:
            query = request.data.get('query')
            top_k = request.data.get('top_k', 5)
            
            if not query:
                return Response(
                    {'error': 'Query is required'},
                    status=status.HTTP_400_BAD_REQUEST
                )
            
            results = retrieval_service.hybrid_search(query, top_k)
            
            return Response({
                'query': query,
                'results': results,
                'retrieved_count': len(results)
            }, status=status.HTTP_200_OK)
            
        except Exception as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class DocumentListView(APIView):
    """
    API endpoint to list documents.
    GET /api/documents/
    """
    permission_classes = [IsAuthenticated]
    
    def get(self, request):
        """
        List all documents with pagination and filtering.
        
        Query params:
        - page: Page number
        - page_size: Items per page
        - document_type: Filter by type
        - tags: Filter by tags (comma-separated)
        """
        try:
            page_num = int(request.query_params.get('page', 1))
            page_size = int(request.query_params.get('page_size', 20))
            document_type = request.query_params.get('document_type')
            tags = request.query_params.get('tags')
            
            queryset = Document.objects.filter(is_active=True)
            
            if document_type:
                queryset = queryset.filter(document_type=document_type)
            
            if tags:
                tag_list = [t.strip() for t in tags.split(',')]
                queryset = queryset.filter(tags__overlap=tag_list)
            
            queryset = queryset.order_by('-created_at')
            
            paginator = Paginator(queryset, page_size)
            page = paginator.get_page(page_num)
            
            serializer = DocumentSerializer(page, many=True)
            
            return Response({
                'count': paginator.count,
                'total_pages': paginator.num_pages,
                'current_page': page_num,
                'results': serializer.data
            }, status=status.HTTP_200_OK)
            
        except Exception as e:
            return Response(
                {'error': str(e)},
                status=status.HTTP_500_INTERNAL_SERVER_ERROR
            )

class DocumentDetailView(APIView):
    """
    API endpoint for document details.
    GET /api/documents/<id>/
    """
    permission_classes = [IsAuthenticated]
    
    def get(self, request, document_id):
        """Get document with all chunks."""
        try:
            document = Document.objects.prefetch_related('chunks').get(id=document_id)
            serializer = DocumentSerializer(document)
            return Response(serializer.data, status=status.HTTP_200_OK)
        except Document.DoesNotExist:
            return Response(
                {'error': 'Document not found'},
                status=status.HTTP_404_NOT_FOUND
            )
    
    def delete(self, request, document_id):
        """Soft delete a document."""
        try:
            document = Document.objects.get(id=document_id)
            document.is_active = False
            document.save()
            return Response(
                {'message': 'Document deleted successfully'},
                status=status.HTTP_200_OK
            )
        except Document.DoesNotExist:
            return Response(
                {'error': 'Document not found'},
                status=status.HTTP_404_NOT_FOUND
            )

Step 8: Serializers

# serializers.py
from rest_framework import serializers
from .models import Document, DocumentChunk

class ChunkSerializer(serializers.ModelSerializer):
    class Meta:
        model = DocumentChunk
        fields = ['id', 'chunk_text', 'chunk_index', 'token_count']

class DocumentSerializer(serializers.ModelSerializer):
    chunk_count = serializers.SerializerMethodField()
    chunks = ChunkSerializer(many=True, read_only=True)
    
    class Meta:
        model = Document
        fields = [
            'id', 'title', 'content', 'source', 'document_type',
            'tags', 'metadata', 'is_active', 'created_at', 'updated_at',
            'chunk_count', 'chunks'
        ]
    
    def get_chunk_count(self, obj):
        return obj.chunks.count()

Step 9: URL Configuration

# urls.py
from django.urls import path
from .views import (
    DocumentIngestionView,
    BatchIngestionView,
    RAGQueryView,
    HybridSearchView,
    DocumentListView,
    DocumentDetailView
)
urlpatterns = [
    # Ingestion endpoints
    path('documents/ingest/', DocumentIngestionView.as_view(), name='ingest-document'),
    path('documents/ingest-batch/', BatchIngestionView.as_view(), name='ingest-batch'),
    
    # Query endpoints
    path('rag/query/', RAGQueryView.as_view(), name='rag-query'),
    path('rag/hybrid-search/', HybridSearchView.as_view(), name='hybrid-search'),
    
    # Document management
    path('documents/', DocumentListView.as_view(), name='document-list'),
    path('documents/<int:document_id>/', DocumentDetailView.as_view(), name='document-detail'),
]

Step 10: Requirements & Dependencies

# requirements.txt
Django==5.0.1
djangorestframework==3.14.0
psycopg2-binary==2.9.9
pgvector==0.2.4
sentence-transformers==2.3.1
torch==2.1.2
transformers==4.36.2
langchain==0.1.0
numpy==1.26.3

Performance Optimization Tips

1. Vector Index Tuning

-- IVFFlat index (faster inserts, approximate search)
CREATE INDEX ON document_chunks 
USING ivfflat (embedding vector_cosine_ops) 
WITH (lists = 100);
-- For exact search (slower but accurate)
CREATE INDEX ON document_chunks 
USING hnsw (embedding vector_cosine_ops);

2. Batch Processing

# Process large documents in batches
def ingest_large_document(file_path, batch_size=100):
    with open(file_path, 'r') as f:
        content = f.read()
    
    chunks = chunking_service.chunk_document(content)
    
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i + batch_size]
        # Process batch...

3. Caching Embeddings

# Use Redis for query embedding cache
from django.core.cache import cache
def get_cached_embedding(text):
    cache_key = f"embedding:{hash(text)}"
    embedding = cache.get(cache_key)
    
    if embedding is None:
        embedding = embedding_service.generate_embedding(text)
        cache.set(cache_key, embedding, timeout=3600)
    
    return embedding

Testing Your RAG System

# tests/test_rag.py
from django.test import TestCase
from ..services.ingestion_service import ingestion_service
from ..services.retrieval_service import retrieval_service

class RAGSystemTest(TestCase):
    def setUp(self):
        # Ingest test documents
        self.doc1 = ingestion_service.ingest_document(
            title="Hypertension Guidelines",
            content="Hypertension is defined as blood pressure >140/90...",
            document_type="medical_guideline"
        )
    
    def test_retrieval(self):
        results = retrieval_service.retrieve_context(
            query="What is high blood pressure?"
        )
        
        self.assertGreater(len(results['chunks']), 0)
        self.assertIn('similarity', results['chunks'][0])
    
    def test_filtering(self):
        results = retrieval_service.retrieve_context(
            query="treatment guidelines",
            document_type="medical_guideline"
        )
        
        for chunk in results['chunks']:
            self.assertEqual(chunk['document_type'], 'medical_guideline')

Production Deployment Checklist

Database

  • Enable pgvector extension
  • Create appropriate indexes
  • Set up connection pooling (pgBouncer)

Model Management

  • Download and cache Hugging Face models
  • Use GPU if available (CUDA)
  • Implement model versioning

Monitoring

  • Track query latency
  • Monitor similarity scores
  • Log retrieval failures

Security

  • Implement rate limiting
  • Validate input sanitization
  • Encrypt embeddings at rest
  • Use HTTPS for all endpoints

Scaling

  • Use Celery for async ingestion
  • Implement read replicas for queries
  • Consider vector database sharding

Real-World Use Cases

Healthcare Example

# Ingest patient records
ingestion_service.ingest_document(
    title="Patient Chart - John Doe",
    content="Patient presents with chest pain...",
    document_type="medical_record",
    tags=["cardiology", "emergency"],
    metadata={"patient_id": "P12345", "visit_date": "2026-01-15"}
)
# Query for similar cases
results = retrieval_service.retrieve_context(
    query="chest pain with elevated troponin",
    document_type="medical_record",
    tags=["cardiology"]
)

Legal Document Search

# Ingest contracts
ingestion_service.ingest_document(
    title="Software License Agreement",
    content="This agreement is made between...",
    document_type="contract",
    tags=["software", "licensing"]
)
# Find relevant clauses
results = retrieval_service.retrieve_context(
    query="liability limitations in software contracts"
)

Advanced Features to Add

  1. Re-ranking: Add a cross-encoder for better result ranking
  2. Query expansion: Use synonyms and related terms
  3. Multi-modal: Support images and PDFs with vision models
  4. Feedback loop: Learn from user interactions
  5. Version control: Track document changes

Conclusion

You now have a production-ready, privacy-first RAG system that:

  • Runs entirely on your infrastructure
  • Scales to millions of documents
  • Provides fast semantic search
  • Maintains HIPAA/GDPR compliance

The beauty of this architecture is its flexibility — swap out embedding models, add custom chunking strategies, or integrate with any LLM of your choice.

Resources

Have questions about implementing RAG in your Django application? Drop a comment below!