Article 7: Wrapping Everything in a FastAPI Service
Introduction
The previous six articles covered each component in isolation: pgvector setup, chunking, embeddings, retrieval, and generation. This article puts them all together into a running FastAPI service.
The service exposes three endpoint groups:
/ingest β submit files or directories for ingestion
/query β submit a question and receive a grounded answer
/health β service status and dependency checks
Everything is async throughout: file loading, database access, embedding calls, and LLM calls. The application uses FastAPI's lifespan to start the embedding background worker and cleanly shut it down.
The lifespan function creates all shared objects at startup and tears them down on shutdown:
Dependency Injection for Database Sessions
Routes use Depends(get_db) to get a session scoped to the request lifetime.
Ingestion Endpoints
Query Endpoint with Streaming
Example Query Request and Response
Health Endpoint
Running the Service
Development
Environment Variables
Docker Compose Setup
Pre-downloading the model at Docker build time avoids the 5-second download delay on first request β important for keeping startup time predictable.
What I Learned
The streaming response header X-Accel-Buffering: no is essential behind nginx. Without it, nginx buffers SSE events and the client sees them in one burst at the end rather than as a stream. I lost two hours debugging "why doesn't streaming work" before finding this header.
Background tasks in FastAPI have a gotcha with database sessions. If a background task tries to reuse the request-scoped database session, it will fail because the session is closed when the response is sent. The run_ingestion function opens its own session via AsyncSessionLocal() instead of using the request session.
/ingest/stats is more useful than I expected. I check pending_embeddings regularly β it tells me how far behind the embedding worker is. If the worker is struggling (high pending count that's not decreasing), it's usually because the local model is CPU-bound and I have 20+ files queued. Adding a metric for this (PENDING_EMBEDDINGS = Gauge(...)) was a natural next step.
The /docs FastAPI auto-docs page is genuinely useful for a personal tool. Since it's just me using this service, the Swagger UI at /docs serves as both documentation and a testing interface. I don't need a separate frontend for most tasks.
Wrapping Up the Series
This completes the RAG 101 series. In seven articles, we've built a complete RAG system from scratch:
What is RAG? β Motivation and end-to-end concept
pgvector Setup β PostgreSQL extension, schema, HNSW index
# Non-streaming query
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "How does the watch-loop detect CrashLoopBackOff pods?"}'
{
"question": "How does the watch-loop detect CrashLoopBackOff pods?",
"answer": "The watch-loop detects CrashLoopBackOff pods by polling the Kubernetes pods API every 30 seconds and checking each pod's container statuses. Specifically, it calls `get_pod_status()` and examines the `waiting.reason` field β if it equals `CrashLoopBackOff` and the restart count exceeds the configured threshold, a `ClusterEvent` is emitted. According to [1], the detection function also captures the restart count and last exit code in the event metadata for use by the RCA engine.",
"sources": [
{"index": 1, "file_path": "artificial-intelligence/aiops-101/aiops-101-watch-loop.md", "title": "AIOps 101: The Watch-Loop", "similarity": 0.8923},
{"index": 2, "file_path": "artificial-intelligence/aiops-101/aiops-101-rule-engine.md", "title": "AIOps 101: The Rule Engine", "similarity": 0.7814}
],
"meta": {
"model": "gpt-4o",
"retrieval_strategy": "hybrid",
"chunks_used": 2,
"context_tokens": 743,
"latency_ms": 7421.3
}
}
# src/api/health.py
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from sqlalchemy import text
import structlog
log = structlog.get_logger()
router = APIRouter(tags=["health"])
@router.get("/health")
async def health(request: Request):
checks = {}
overall_ok = True
# Database connectivity
try:
async with request.app.state.db_factory() as db:
await db.execute(text("SELECT 1"))
checks["database"] = {"status": "ok"}
except Exception as e:
checks["database"] = {"status": "error", "detail": str(e)}
overall_ok = False
# pgvector extension
try:
async with request.app.state.db_factory() as db:
result = await db.execute(text("SELECT extversion FROM pg_extension WHERE extname='vector'"))
row = result.fetchone()
if row:
checks["pgvector"] = {"status": "ok", "version": row[0]}
else:
checks["pgvector"] = {"status": "error", "detail": "extension not found"}
overall_ok = False
except Exception as e:
checks["pgvector"] = {"status": "error", "detail": str(e)}
overall_ok = False
# Embedding worker
worker = getattr(request.app.state, "worker_task", None)
if worker and not worker.done():
checks["embedding_worker"] = {"status": "running"}
else:
checks["embedding_worker"] = {"status": "stopped"}
overall_ok = False
# Ingestion stats
try:
async with request.app.state.db_factory() as db:
result = await db.execute(text(
"SELECT count(*) FROM chunks WHERE embedding IS NULL"
))
pending = result.scalar()
checks["pending_embeddings"] = {"count": pending}
except Exception:
pass
return JSONResponse(
{"status": "ok" if overall_ok else "degraded", "checks": checks},
status_code=200 if overall_ok else 503,
)
@router.get("/")
async def root():
return {"service": "rag-service", "docs": "/docs"}
# Start PostgreSQL with pgvector
docker compose up -d postgres
# Run Alembic migrations
alembic upgrade head
# Start the service with hot reload
uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload
# Ingest the git-book corpus
curl -X POST http://localhost:8000/ingest/directory \
-H "Content-Type: application/json" \
-d '{"directory": "/path/to/git-book"}'
# Check ingestion progress
curl http://localhost:8000/ingest/stats
# Query
curl -X POST http://localhost:8000/query \
-H "Content-Type: application/json" \
-d '{"question": "How do I set up a Kubernetes ingress with TLS?"}'