# Article 7: Wrapping Everything in a FastAPI Service

## Introduction

The previous six articles covered each component in isolation: pgvector setup, chunking, embeddings, retrieval, and generation. This article puts them all together into a running FastAPI service.

The service exposes three endpoint groups:

* `/ingest` — submit files or directories for ingestion
* `/query` — submit a question and receive a grounded answer
* `/health` — service status and dependency checks

Everything is async throughout: file loading, database access, embedding calls, and LLM calls. The application uses FastAPI's lifespan to start the embedding background worker and cleanly shut it down.

***

## Table of Contents

1. [Application Structure](#application-structure)
2. [Configuration and Settings](#configuration)
3. [FastAPI Lifespan and Dependency Injection](#lifespan)
4. [Ingestion Endpoints](#ingestion-endpoints)
5. [Query Endpoint with Streaming](#query-endpoint)
6. [Health Endpoint](#health-endpoint)
7. [Running the Service](#running)
8. [Docker Compose Setup](#docker-compose)
9. [What I Learned](#what-i-learned)

***

## Application Structure <a href="#application-structure" id="application-structure"></a>

```
src/
├── main.py                 # FastAPI app, lifespan, router registration
├── core/
│   ├── config.py           # Settings (Pydantic BaseSettings)
│   └── logging.py          # structlog configuration
├── db/
│   ├── base.py             # Async engine, session factory
│   └── models.py           # SQLAlchemy models
├── embeddings/
│   ├── base.py             # EmbeddingProvider ABC
│   ├── local.py            # sentence-transformers
│   └── github_models.py    # GitHub Models API
├── ingestion/
│   ├── loader.py           # File loader
│   ├── chunker.py          # Chunking strategies
│   └── pipeline.py         # Ingest orchestration + embedding worker
├── retrieval/
│   ├── models.py           # RetrievedChunk, RetrievalResult
│   ├── vector_search.py    # pgvector cosine search
│   ├── hybrid_search.py    # RRF fusion
│   └── retriever.py        # Retriever class
├── generation/
│   ├── models.py           # GenerationResult
│   ├── prompt_builder.py   # Prompt assembly
│   ├── llm_client.py       # GitHub Models API client
│   └── generator.py        # Generator class
└── api/
    ├── ingest.py           # /ingest routes
    ├── query.py            # /query routes
    └── health.py           # /health route
```

***

## Configuration and Settings <a href="#configuration" id="configuration"></a>

```python
# src/core/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import field_validator

class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
    
    # Database
    database_url: str = "postgresql+asyncpg://rag:secret@localhost:5432/rag_db"
    db_echo: bool = False
    
    # Embedding
    embedding_provider: str = "local"   # "local" | "github_models"
    embedding_model: str = "all-MiniLM-L6-v2"
    embedding_dimensions: int = 384
    
    # LLM
    llm_model: str = "gpt-4o"
    github_token: str = ""              # Required for github_models provider
    
    # Retrieval
    retrieval_strategy: str = "hybrid"  # "vector" | "hybrid"
    retrieval_limit: int = 5
    min_similarity: float = 0.3
    max_context_tokens: int = 6000
    
    # Ingestion
    corpus_root: str = "/corpus"        # Directory to scan for documents
    embedding_worker_interval: int = 30  # Seconds between embedding sweeps
    
    # App
    log_level: str = "INFO"
    host: str = "0.0.0.0"
    port: int = 8000

settings = Settings()
```

***

## FastAPI Lifespan and Dependency Injection <a href="#lifespan" id="lifespan"></a>

The lifespan function creates all shared objects at startup and tears them down on shutdown:

```python
# src/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from src.core.config import settings
from src.core.logging import configure_logging
from src.db.base import engine, AsyncSessionLocal
from src.embeddings.local import LocalEmbeddingProvider
from src.embeddings.github_models import GitHubModelsEmbeddingProvider
from src.retrieval.retriever import Retriever, RetrievalStrategy
from src.generation.llm_client import LLMClient
from src.generation.generator import Generator
from src.ingestion.pipeline import embedding_worker
from src.api import ingest, query, health
import asyncio, structlog

log = structlog.get_logger()

@asynccontextmanager
async def lifespan(app: FastAPI):
    configure_logging(settings.log_level)
    log.info("rag_service.starting")
    
    # Build embedding provider
    if settings.embedding_provider == "github_models":
        embedder = GitHubModelsEmbeddingProvider(model=settings.embedding_model)
    else:
        embedder = LocalEmbeddingProvider(model_name=settings.embedding_model)
    
    # Build retriever and generator
    retriever = Retriever(
        embedder=embedder,
        default_strategy=RetrievalStrategy(settings.retrieval_strategy),
        default_limit=settings.retrieval_limit,
        min_similarity=settings.min_similarity,
    )
    generator = Generator(
        llm=LLMClient(model=settings.llm_model),
        max_context_tokens=settings.max_context_tokens,
    )
    
    # Attach to app state for route access
    app.state.embedder   = embedder
    app.state.retriever  = retriever
    app.state.generator  = generator
    app.state.db_factory = AsyncSessionLocal
    
    # Start embedding background worker
    worker_task = asyncio.create_task(
        embedding_worker(embedder, intervals_seconds=settings.embedding_worker_interval),
        name="embedding_worker",
    )
    app.state.worker_task = worker_task
    
    log.info("rag_service.ready", provider=settings.embedding_provider, model=settings.llm_model)
    yield
    
    # Shutdown
    worker_task.cancel()
    try:
        await worker_task
    except asyncio.CancelledError:
        pass
    await engine.dispose()
    log.info("rag_service.stopped")

app = FastAPI(title="RAG Service", lifespan=lifespan)
app.include_router(ingest.router)
app.include_router(query.router)
app.include_router(health.router)
```

### Dependency Injection for Database Sessions

```python
# src/db/base.py (dependency)
from fastapi import Request
from sqlalchemy.ext.asyncio import AsyncSession

async def get_db(request: Request) -> AsyncSession:
    async with request.app.state.db_factory() as session:
        yield session
```

Routes use `Depends(get_db)` to get a session scoped to the request lifetime.

***

## Ingestion Endpoints <a href="#ingestion-endpoints" id="ingestion-endpoints"></a>

```python
# src/api/ingest.py
from fastapi import APIRouter, Depends, BackgroundTasks, HTTPException, Request
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from pathlib import Path
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.base import get_db
from src.ingestion.pipeline import ingest_file, scan_corpus
import uuid, structlog

log = structlog.get_logger()
router = APIRouter(prefix="/ingest", tags=["ingestion"])

class IngestFileRequest(BaseModel):
    file_path: str    # Path relative to corpus_root, or absolute

class IngestDirectoryRequest(BaseModel):
    directory: str    # Path to scan recursively

@router.post("/file")
async def ingest_single_file(
    body: IngestFileRequest,
    background_tasks: BackgroundTasks,
    request: Request,
    db: AsyncSession = Depends(get_db),
):
    """Ingest a single file synchronously (chunks created; embedding is async)."""
    path = Path(body.file_path)
    if not path.exists():
        raise HTTPException(status_code=404, detail=f"File not found: {body.file_path}")
    
    corpus_root = Path(request.app.state.embedder.model_name).parent  # or settings.corpus_root
    chunks_created, skipped = await ingest_file(path, corpus_root, db)
    
    return {"status": "ok", "chunks_created": chunks_created, "skipped": bool(skipped)}

@router.post("/directory")
async def ingest_directory(
    body: IngestDirectoryRequest,
    background_tasks: BackgroundTasks,
    request: Request,
):
    """
    Scan a directory and ingest all documents as a background job.
    Returns a job ID immediately; progress is available via /ingest/job/{id}.
    """
    from src.db.models import IngestionJob
    from src.db.base import AsyncSessionLocal
    from src.ingestion.pipeline import ingest_file, scan_corpus

    directory = Path(body.directory)
    if not directory.is_dir():
        raise HTTPException(status_code=400, detail=f"Not a directory: {body.directory}")
    
    job_id = str(uuid.uuid4())
    
    async def run_ingestion():
        async with AsyncSessionLocal() as db:
            files = scan_corpus(directory)
            total_chunks = 0
            for f in files:
                try:
                    chunks, _ = await ingest_file(f, directory, db)
                    total_chunks += chunks
                except Exception as exc:
                    log.error("ingest.file.error", file=str(f), error=str(exc))
            log.info("ingest.directory.done", job_id=job_id, files=len(files), chunks=total_chunks)
    
    background_tasks.add_task(run_ingestion)
    
    return JSONResponse(
        {"status": "accepted", "job_id": job_id, "directory": body.directory},
        status_code=202,
    )

@router.get("/stats")
async def ingestion_stats(db: AsyncSession = Depends(get_db)):
    """Return document and chunk counts."""
    from sqlalchemy import text
    result = await db.execute(text("""
        SELECT
            (SELECT count(*) FROM documents) AS documents,
            (SELECT count(*) FROM chunks)    AS chunks,
            (SELECT count(*) FROM chunks WHERE embedding IS NULL) AS pending_embeddings
    """))
    row = result.one()
    return {"documents": row.documents, "chunks": row.chunks, "pending_embeddings": row.pending_embeddings}
```

***

## Query Endpoint with Streaming <a href="#query-endpoint" id="query-endpoint"></a>

```python
# src/api/query.py
from fastapi import APIRouter, Depends, Request
from fastapi.responses import StreamingResponse, JSONResponse
from pydantic import BaseModel, Field
from sqlalchemy.ext.asyncio import AsyncSession
from src.db.base import get_db
from src.generation.models import GenerationResult
import json, structlog

log = structlog.get_logger()
router = APIRouter(prefix="/query", tags=["query"])

class QueryRequest(BaseModel):
    question: str = Field(..., min_length=3, max_length=2000)
    limit: int = Field(5, ge=1, le=20)
    stream: bool = False
    strategy: str | None = None    # Override default retrieval strategy

@router.post("")
async def query(
    body: QueryRequest,
    request: Request,
    db: AsyncSession = Depends(get_db),
):
    retriever = request.app.state.retriever
    generator = request.app.state.generator
    
    # Step 1: Retrieve relevant chunks
    from src.retrieval.retriever import RetrievalStrategy
    strategy = RetrievalStrategy(body.strategy) if body.strategy else None
    retrieval = await retriever.retrieve(
        query=body.question,
        db=db,
        limit=body.limit,
        strategy=strategy,
    )
    
    if body.stream:
        return _stream_response(generator, retrieval, body.question)
    
    # Step 2: Generate answer
    result: GenerationResult = await generator.generate(retrieval)
    
    return {
        "question":  result.question,
        "answer":    result.answer,
        "sources":   [
            {
                "index":      s.index,
                "file_path":  s.file_path,
                "title":      s.title,
                "similarity": round(s.similarity, 4),
            }
            for s in result.sources
        ],
        "meta": {
            "model":              result.model_used,
            "retrieval_strategy": result.retrieval_strategy,
            "chunks_used":        result.chunks_used,
            "context_tokens":     result.context_tokens,
            "latency_ms":         round(result.latency_ms, 1),
        }
    }

def _stream_response(generator, retrieval, question: str) -> StreamingResponse:
    """Return a Server-Sent Events stream."""
    from src.generation.prompt_builder import build_prompt, extract_sources, SYSTEM_PROMPT
    
    async def event_stream():
        user_message, included_chunks = build_prompt(question, retrieval)
        sources = extract_sources(included_chunks)
        
        # First SSE event: sources metadata
        yield f"data: {json.dumps({'type': 'sources', 'sources': [{'index': s.index, 'file_path': s.file_path, 'title': s.title} for s in sources]})}\n\n"
        
        # Stream answer tokens
        async for token in generator.llm.stream(
            system_prompt=SYSTEM_PROMPT,
            user_message=user_message,
        ):
            yield f"data: {json.dumps({'type': 'token', 'content': token})}\n\n"
        
        # Final event: done signal
        yield f"data: {json.dumps({'type': 'done'})}\n\n"
    
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )
```

### Example Query Request and Response

```bash
# Non-streaming query
curl -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"question": "How does the watch-loop detect CrashLoopBackOff pods?"}'
```

```json
{
  "question": "How does the watch-loop detect CrashLoopBackOff pods?",
  "answer": "The watch-loop detects CrashLoopBackOff pods by polling the Kubernetes pods API every 30 seconds and checking each pod's container statuses. Specifically, it calls `get_pod_status()` and examines the `waiting.reason` field — if it equals `CrashLoopBackOff` and the restart count exceeds the configured threshold, a `ClusterEvent` is emitted. According to [1], the detection function also captures the restart count and last exit code in the event metadata for use by the RCA engine.",
  "sources": [
    {"index": 1, "file_path": "artificial-intelligence/aiops-101/aiops-101-watch-loop.md", "title": "AIOps 101: The Watch-Loop", "similarity": 0.8923},
    {"index": 2, "file_path": "artificial-intelligence/aiops-101/aiops-101-rule-engine.md", "title": "AIOps 101: The Rule Engine", "similarity": 0.7814}
  ],
  "meta": {
    "model": "gpt-4o",
    "retrieval_strategy": "hybrid",
    "chunks_used": 2,
    "context_tokens": 743,
    "latency_ms": 7421.3
  }
}
```

***

## Health Endpoint <a href="#health-endpoint" id="health-endpoint"></a>

```python
# src/api/health.py
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse
from sqlalchemy import text
import structlog

log = structlog.get_logger()
router = APIRouter(tags=["health"])

@router.get("/health")
async def health(request: Request):
    checks = {}
    overall_ok = True
    
    # Database connectivity
    try:
        async with request.app.state.db_factory() as db:
            await db.execute(text("SELECT 1"))
        checks["database"] = {"status": "ok"}
    except Exception as e:
        checks["database"] = {"status": "error", "detail": str(e)}
        overall_ok = False
    
    # pgvector extension
    try:
        async with request.app.state.db_factory() as db:
            result = await db.execute(text("SELECT extversion FROM pg_extension WHERE extname='vector'"))
            row = result.fetchone()
            if row:
                checks["pgvector"] = {"status": "ok", "version": row[0]}
            else:
                checks["pgvector"] = {"status": "error", "detail": "extension not found"}
                overall_ok = False
    except Exception as e:
        checks["pgvector"] = {"status": "error", "detail": str(e)}
        overall_ok = False
    
    # Embedding worker
    worker = getattr(request.app.state, "worker_task", None)
    if worker and not worker.done():
        checks["embedding_worker"] = {"status": "running"}
    else:
        checks["embedding_worker"] = {"status": "stopped"}
        overall_ok = False
    
    # Ingestion stats
    try:
        async with request.app.state.db_factory() as db:
            result = await db.execute(text(
                "SELECT count(*) FROM chunks WHERE embedding IS NULL"
            ))
            pending = result.scalar()
        checks["pending_embeddings"] = {"count": pending}
    except Exception:
        pass
    
    return JSONResponse(
        {"status": "ok" if overall_ok else "degraded", "checks": checks},
        status_code=200 if overall_ok else 503,
    )

@router.get("/")
async def root():
    return {"service": "rag-service", "docs": "/docs"}
```

***

## Running the Service <a href="#running" id="running"></a>

### Development

```bash
# Start PostgreSQL with pgvector
docker compose up -d postgres

# Run Alembic migrations
alembic upgrade head

# Start the service with hot reload
uvicorn src.main:app --host 0.0.0.0 --port 8000 --reload

# Ingest the git-book corpus
curl -X POST http://localhost:8000/ingest/directory \
  -H "Content-Type: application/json" \
  -d '{"directory": "/path/to/git-book"}'

# Check ingestion progress
curl http://localhost:8000/ingest/stats

# Query
curl -X POST http://localhost:8000/query \
  -H "Content-Type: application/json" \
  -d '{"question": "How do I set up a Kubernetes ingress with TLS?"}'
```

### Environment Variables

```bash
# .env
DATABASE_URL=postgresql+asyncpg://rag:secret@localhost:5432/rag_db
EMBEDDING_PROVIDER=local
EMBEDDING_MODEL=all-MiniLM-L6-v2
EMBEDDING_DIMENSIONS=384
LLM_MODEL=gpt-4o
GITHUB_TOKEN=ghp_...
RETRIEVAL_STRATEGY=hybrid
CORPUS_ROOT=/corpus
LOG_LEVEL=INFO
```

***

## Docker Compose Setup <a href="#docker-compose" id="docker-compose"></a>

```yaml
# config/docker-compose.yml
services:
  postgres:
    image: pgvector/pgvector:pg16
    environment:
      POSTGRES_USER:     rag
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB:       rag_db
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U rag -d rag_db"]
      interval: 10s
      timeout: 5s
      retries: 5

  rag-service:
    build: .
    ports:
      - "8000:8000"
    environment:
      DATABASE_URL:       postgresql+asyncpg://rag:${POSTGRES_PASSWORD}@postgres:5432/rag_db
      EMBEDDING_PROVIDER: local
      LLM_MODEL:          gpt-4o
      GITHUB_TOKEN:       ${GITHUB_TOKEN}
      CORPUS_ROOT:        /corpus
    volumes:
      - ${CORPUS_PATH}:/corpus:ro   # Mount the markdown corpus read-only
    depends_on:
      postgres:
        condition: service_healthy

volumes:
  pgdata:
```

```dockerfile
# Dockerfile
FROM python:3.12-slim

WORKDIR /app

# Install uv for fast dependency resolution
RUN pip install uv

COPY pyproject.toml .
RUN uv pip install --system --no-cache .

COPY . .

# sentence-transformers downloads models on first use — pre-download at build time
RUN python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('all-MiniLM-L6-v2')"

# Non-root user
RUN useradd -m -u 1000 app && chown -R app /app
USER app

CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8000"]
```

Pre-downloading the model at Docker build time avoids the 5-second download delay on first request — important for keeping startup time predictable.

***

## What I Learned

**The streaming response header `X-Accel-Buffering: no` is essential behind nginx.** Without it, nginx buffers SSE events and the client sees them in one burst at the end rather than as a stream. I lost two hours debugging "why doesn't streaming work" before finding this header.

**Background tasks in FastAPI have a gotcha with database sessions.** If a background task tries to reuse the request-scoped database session, it will fail because the session is closed when the response is sent. The `run_ingestion` function opens its own session via `AsyncSessionLocal()` instead of using the request session.

**`/ingest/stats` is more useful than I expected.** I check `pending_embeddings` regularly — it tells me how far behind the embedding worker is. If the worker is struggling (high pending count that's not decreasing), it's usually because the local model is CPU-bound and I have 20+ files queued. Adding a metric for this (`PENDING_EMBEDDINGS = Gauge(...)`) was a natural next step.

**The `/docs` FastAPI auto-docs page is genuinely useful for a personal tool.** Since it's just me using this service, the Swagger UI at `/docs` serves as both documentation and a testing interface. I don't need a separate frontend for most tasks.

***

## Wrapping Up the Series

This completes the RAG 101 series. In seven articles, we've built a complete RAG system from scratch:

1. [What is RAG?](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-what-is-rag) — Motivation and end-to-end concept
2. [pgvector Setup](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-pgvector-setup) — PostgreSQL extension, schema, HNSW index
3. [Chunking Strategies](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-chunking) — Markdown-aware splitting with sentence fallback
4. [Embeddings](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-embeddings) — Local and API-based embedding, batching, background worker
5. [Retrieval](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-retrieval) — Vector search, full-text search, RRF hybrid fusion
6. [Generation](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/rag-101/rag-101-generation) — Prompt construction, grounding constraint, streaming
7. **FastAPI Service** (this article) — Putting it all together

The full stack: Python 3.12, FastAPI, PostgreSQL 16, pgvector, sentence-transformers, SQLAlchemy 2 async, GitHub Models API.
