# Part 6: Building AI-Powered APIs with FastAPI

## From Script to Service

Every AI project I've built started as a Python script. A few `httpx.post()` calls, some `print()` statements, and a proof of concept that runs once. The hard part was always the same: turning that script into a service that other systems (or users) can call reliably.

FastAPI is where I do that work. It gives me async by default (essential for I/O-bound LLM calls), automatic request/response validation via Pydantic, and OpenAPI documentation without extra effort. This article walks through the patterns I use to build AI-powered APIs — from endpoint design to streaming responses and cost control.

***

## Designing AI Endpoints

The first question for any AI-powered endpoint: **what's the contract?** LLM responses are non-deterministic, but the API contract should be predictable.

Here's the core endpoint from my RAG service:

```python
# src/ai_engineer/main.py
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator

from fastapi import FastAPI, HTTPException

from ai_engineer.config import settings
from ai_engineer.db.engine import init_db, close_db
from ai_engineer.models import QuestionRequest, AnswerResponse
from ai_engineer.retrieval.search import semantic_search
from ai_engineer.prompts.templates import PromptBuilder, RAGPromptInput
from ai_engineer.llm.factory import create_llm_provider


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    await init_db()
    app.state.llm = create_llm_provider()
    yield
    await close_db()


app = FastAPI(title="AI Engineer Service", lifespan=lifespan)


@app.post("/ask", response_model=AnswerResponse)
async def ask_question(request: QuestionRequest) -> AnswerResponse:
    """Answer a question using RAG: retrieve context, then generate."""
    import time

    start = time.monotonic()

    # Step 1: Retrieve relevant context
    chunks = await semantic_search(
        query=request.question,
        top_k=5,
        min_similarity=0.3,
    )

    if not chunks:
        return AnswerResponse(
            answer="I don't have information about this in my knowledge base.",
            sources=[],
            model=settings.llm_model,
            tokens_used=0,
            latency_ms=0,
        )

    # Step 2: Build prompt
    prompt_input = RAGPromptInput(
        question=request.question,
        context_chunks=chunks,
    )
    messages = PromptBuilder.build_rag_prompt(prompt_input)

    # Step 3: Generate answer
    llm = app.state.llm
    answer = await llm.generate(
        messages[-1]["content"],
        max_tokens=request.max_tokens,
        temperature=0.1,
    )

    elapsed_ms = (time.monotonic() - start) * 1000

    # Step 4: Build response
    sources = [
        {"title": c["title"], "content_preview": c["content"][:200], "similarity_score": c["similarity"]}
        for c in chunks
    ]

    return AnswerResponse(
        answer=answer,
        sources=sources,
        model=settings.llm_model,
        tokens_used=0,  # Will track properly in Part 8
        latency_ms=round(elapsed_ms, 1),
    )
```

### The Request/Response Contract

```python
# src/ai_engineer/models.py
from pydantic import BaseModel, Field


class QuestionRequest(BaseModel):
    question: str = Field(
        ...,
        min_length=1,
        max_length=2000,
        description="The question to answer",
        examples=["How do I set up pgvector with PostgreSQL?"],
    )
    max_tokens: int = Field(
        default=512,
        ge=1,
        le=4096,
        description="Maximum tokens in the response",
    )
    include_sources: bool = Field(
        default=True,
        description="Whether to include source references",
    )


class Source(BaseModel):
    title: str
    content_preview: str = Field(..., max_length=200)
    similarity_score: float = Field(..., ge=0.0, le=1.0)


class AnswerResponse(BaseModel):
    answer: str
    sources: list[Source]
    model: str
    tokens_used: int
    latency_ms: float


class ErrorResponse(BaseModel):
    error: str
    detail: str | None = None


class HealthResponse(BaseModel):
    status: str
    model: str
    embedding_model: str
```

Things I learned about AI API design:

1. **Always return metadata.** `model`, `tokens_used`, and `latency_ms` are essential for debugging and cost tracking. Every response should tell the caller what happened behind the scenes.
2. **Return sources separately.** Don't embed source references in the answer text. Returning them as structured data lets the frontend render them however it wants.
3. **Set sensible defaults.** `max_tokens=512` is enough for most answers. Making the caller specify it every time is busywork.

***

## Streaming Responses with SSE

LLM responses can take several seconds. Returning the complete response means the user stares at a loading spinner. Streaming with Server-Sent Events (SSE) gives the user feedback immediately:

```python
# src/ai_engineer/routes/streaming.py
import json
from collections.abc import AsyncGenerator

from fastapi import APIRouter
from fastapi.responses import StreamingResponse
import httpx

from ai_engineer.config import settings
from ai_engineer.models import QuestionRequest

router = APIRouter()


async def stream_llm_response(
    prompt: str,
    max_tokens: int = 512,
) -> AsyncGenerator[str, None]:
    """Stream tokens from the LLM as Server-Sent Events."""
    async with httpx.AsyncClient(timeout=60.0) as client:
        async with client.stream(
            "POST",
            "https://models.inference.ai.azure.com/chat/completions",
            headers={
                "Authorization": f"Bearer {settings.llm_api_key}",
                "Content-Type": "application/json",
            },
            json={
                "model": settings.llm_model,
                "messages": [{"role": "user", "content": prompt}],
                "max_tokens": max_tokens,
                "temperature": 0.1,
                "stream": True,
            },
        ) as response:
            response.raise_for_status()

            async for line in response.aiter_lines():
                if not line.startswith("data: "):
                    continue

                data = line[6:]  # Strip "data: " prefix
                if data == "[DONE]":
                    yield f"data: {json.dumps({'done': True})}\n\n"
                    break

                try:
                    chunk = json.loads(data)
                    delta = chunk["choices"][0].get("delta", {})
                    content = delta.get("content", "")
                    if content:
                        yield f"data: {json.dumps({'token': content})}\n\n"
                except (json.JSONDecodeError, KeyError, IndexError):
                    continue


@router.post("/ask/stream")
async def ask_question_stream(request: QuestionRequest) -> StreamingResponse:
    """Stream an answer token by token via Server-Sent Events."""
    # Retrieve context (same as non-streaming)
    from ai_engineer.retrieval.search import semantic_search
    from ai_engineer.prompts.templates import PromptBuilder, RAGPromptInput

    chunks = await semantic_search(query=request.question, top_k=5)

    if not chunks:
        async def no_results() -> AsyncGenerator[str, None]:
            yield f"data: {json.dumps({'token': 'I don\\'t have information about this in my knowledge base.'})}\n\n"
            yield f"data: {json.dumps({'done': True})}\n\n"

        return StreamingResponse(
            no_results(),
            media_type="text/event-stream",
        )

    prompt_input = RAGPromptInput(question=request.question, context_chunks=chunks)
    messages = PromptBuilder.build_rag_prompt(prompt_input)

    return StreamingResponse(
        stream_llm_response(messages[-1]["content"], request.max_tokens),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
        },
    )
```

### Consuming the Stream (Client Side)

```python
# scripts/test_stream.py
import httpx
import asyncio


async def consume_stream():
    async with httpx.AsyncClient(timeout=60.0) as client:
        async with client.stream(
            "POST",
            "http://localhost:8000/ask/stream",
            json={"question": "What is the difference between IVFFlat and HNSW indexes?"},
        ) as response:
            async for line in response.aiter_lines():
                if not line.startswith("data: "):
                    continue
                import json
                data = json.loads(line[6:])
                if data.get("done"):
                    print("\n[Done]")
                    break
                print(data.get("token", ""), end="", flush=True)


asyncio.run(consume_stream())
```

Output appears token by token:

```
IVFFlat and HNSW are both approximate nearest neighbor (ANN) indexes
in pgvector, but they work differently...
[Done]
```

***

## Async Patterns for Concurrent LLM Calls

When a single request needs multiple LLM calls (e.g., generating an answer and then evaluating it), I use `asyncio.gather` to run them concurrently:

```python
import asyncio
from ai_engineer.llm.base import LLMProvider


async def ask_with_confidence(
    question: str,
    context: str,
    provider: LLMProvider,
) -> dict:
    """Generate an answer and a confidence assessment in parallel."""

    answer_prompt = f"Context:\n{context}\n\nQuestion: {question}"
    confidence_prompt = (
        f"Rate how well this context can answer the question on a scale of 0-100.\n\n"
        f"Context:\n{context}\n\nQuestion: {question}\n\n"
        f"Respond with ONLY a number between 0 and 100."
    )

    # Run both calls concurrently — saves ~50% wall time
    answer, confidence_raw = await asyncio.gather(
        provider.generate(answer_prompt, temperature=0.1),
        provider.generate(confidence_prompt, temperature=0.0, max_tokens=10),
    )

    # Parse confidence score
    try:
        confidence = int(confidence_raw.strip()) / 100.0
    except ValueError:
        confidence = 0.5  # Default if parsing fails

    return {
        "answer": answer,
        "confidence": confidence,
    }
```

### Handling Multiple Independent Requests

When you need to embed multiple texts and search across multiple indexes:

```python
async def multi_source_search(
    query: str,
    sources: list[str],
    provider,  # EmbeddingProvider
) -> list[dict]:
    """Search multiple sources concurrently."""
    # Embed the query once
    query_embedding = (await provider.embed([query]))[0]

    # Search all sources concurrently
    search_tasks = [
        search_source(query_embedding, source)
        for source in sources
    ]
    results_per_source = await asyncio.gather(*search_tasks)

    # Merge and re-rank
    all_results = []
    for source_results in results_per_source:
        all_results.extend(source_results)

    # Sort by similarity, return top-k
    all_results.sort(key=lambda r: r["similarity"], reverse=True)
    return all_results[:10]
```

***

## Rate Limiting and Cost Control

LLM APIs charge per token, and a runaway loop or burst of traffic can generate a surprising bill. I build rate limiting into every AI service:

```python
# src/ai_engineer/middleware/rate_limit.py
import time
from collections import defaultdict

from fastapi import Request, HTTPException


class TokenBucketRateLimiter:
    """Simple in-memory rate limiter using token bucket algorithm."""

    def __init__(
        self,
        requests_per_minute: int = 30,
        burst_size: int = 5,
    ) -> None:
        self._rate = requests_per_minute / 60.0  # tokens per second
        self._burst = burst_size
        self._buckets: dict[str, dict] = defaultdict(
            lambda: {"tokens": burst_size, "last_update": time.monotonic()}
        )

    def allow_request(self, key: str) -> bool:
        """Check if a request should be allowed."""
        now = time.monotonic()
        bucket = self._buckets[key]

        # Refill tokens based on elapsed time
        elapsed = now - bucket["last_update"]
        bucket["tokens"] = min(
            self._burst,
            bucket["tokens"] + elapsed * self._rate,
        )
        bucket["last_update"] = now

        if bucket["tokens"] >= 1.0:
            bucket["tokens"] -= 1.0
            return True
        return False


rate_limiter = TokenBucketRateLimiter(requests_per_minute=30)


async def check_rate_limit(request: Request) -> None:
    """FastAPI dependency for rate limiting."""
    # Use client IP as the rate limit key
    client_ip = request.client.host if request.client else "unknown"
    if not rate_limiter.allow_request(client_ip):
        raise HTTPException(
            status_code=429,
            detail="Rate limit exceeded. Please wait before making more requests.",
        )
```

### Cost Tracking

```python
# src/ai_engineer/middleware/cost.py
from dataclasses import dataclass, field
import time


# Pricing per 1M tokens (approximate, check current pricing)
PRICING = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-sonnet-4-20250514": {"input": 3.00, "output": 15.00},
}


@dataclass
class RequestCost:
    model: str
    input_tokens: int
    output_tokens: int
    latency_ms: float
    timestamp: float = field(default_factory=time.time)

    @property
    def cost_usd(self) -> float:
        prices = PRICING.get(self.model, {"input": 5.0, "output": 15.0})
        input_cost = (self.input_tokens / 1_000_000) * prices["input"]
        output_cost = (self.output_tokens / 1_000_000) * prices["output"]
        return input_cost + output_cost


class CostTracker:
    """Track cumulative costs across requests."""

    def __init__(self) -> None:
        self._requests: list[RequestCost] = []

    def record(self, cost: RequestCost) -> None:
        self._requests.append(cost)

    @property
    def total_cost_usd(self) -> float:
        return sum(r.cost_usd for r in self._requests)

    @property
    def total_requests(self) -> int:
        return len(self._requests)

    def summary(self) -> dict:
        if not self._requests:
            return {"total_cost_usd": 0, "total_requests": 0}
        return {
            "total_cost_usd": round(self.total_cost_usd, 6),
            "total_requests": self.total_requests,
            "avg_latency_ms": round(
                sum(r.latency_ms for r in self._requests) / len(self._requests), 1
            ),
            "total_input_tokens": sum(r.input_tokens for r in self._requests),
            "total_output_tokens": sum(r.output_tokens for r in self._requests),
        }
```

I add a `/metrics` endpoint that returns the cost summary:

```python
@app.get("/metrics")
async def metrics() -> dict:
    """Return cost and usage metrics."""
    return app.state.cost_tracker.summary()
```

***

## Retry Logic

LLM APIs fail. Networks time out. Rate limits get hit. Retry logic is essential:

```python
# src/ai_engineer/llm/retry.py
import asyncio
import logging

import httpx

logger = logging.getLogger(__name__)

RETRYABLE_STATUS_CODES = {429, 500, 502, 503, 504}


async def call_with_retry(
    client: httpx.AsyncClient,
    method: str,
    url: str,
    *,
    max_retries: int = 3,
    base_delay: float = 1.0,
    **kwargs,
) -> httpx.Response:
    """Make an HTTP request with exponential backoff retry."""
    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            response = await client.request(method, url, **kwargs)

            if response.status_code not in RETRYABLE_STATUS_CODES:
                return response

            # Retryable HTTP error
            if attempt < max_retries:
                # Respect Retry-After header if present
                retry_after = response.headers.get("retry-after")
                if retry_after:
                    delay = float(retry_after)
                else:
                    delay = base_delay * (2 ** attempt)

                logger.warning(
                    "Retryable HTTP %d from %s, retrying in %.1fs (attempt %d/%d)",
                    response.status_code,
                    url,
                    delay,
                    attempt + 1,
                    max_retries,
                )
                await asyncio.sleep(delay)
            else:
                response.raise_for_status()

        except httpx.TimeoutException as e:
            last_exception = e
            if attempt < max_retries:
                delay = base_delay * (2 ** attempt)
                logger.warning(
                    "Timeout calling %s, retrying in %.1fs (attempt %d/%d)",
                    url,
                    delay,
                    attempt + 1,
                    max_retries,
                )
                await asyncio.sleep(delay)
            else:
                raise

    raise last_exception or RuntimeError("Retries exhausted")
```

***

## Putting It All Together

Here's the complete application wiring:

```python
# src/ai_engineer/main.py
from contextlib import asynccontextmanager
from collections.abc import AsyncGenerator

from fastapi import FastAPI, Depends

from ai_engineer.config import settings
from ai_engineer.db.engine import init_db, close_db
from ai_engineer.models import QuestionRequest, AnswerResponse, HealthResponse
from ai_engineer.middleware.rate_limit import check_rate_limit
from ai_engineer.middleware.cost import CostTracker
from ai_engineer.llm.factory import create_llm_provider
from ai_engineer.routes.streaming import router as streaming_router


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
    await init_db()
    app.state.llm = create_llm_provider()
    app.state.cost_tracker = CostTracker()
    yield
    await close_db()


app = FastAPI(title="AI Engineer Service", lifespan=lifespan)
app.include_router(streaming_router, tags=["streaming"])


@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
    return HealthResponse(
        status="ok",
        model=settings.llm_model,
        embedding_model=settings.embedding_model,
    )


@app.post(
    "/ask",
    response_model=AnswerResponse,
    dependencies=[Depends(check_rate_limit)],
)
async def ask_question(request: QuestionRequest) -> AnswerResponse:
    # ... (implementation from earlier)
    ...


@app.get("/metrics")
async def metrics() -> dict:
    return app.state.cost_tracker.summary()
```

```bash
# Run the service
uv run uvicorn ai_engineer.main:app --reload --host 0.0.0.0 --port 8000

# Test it
curl -X POST http://localhost:8000/ask \
  -H "Content-Type: application/json" \
  -d '{"question": "How do I set up pgvector?"}'
```

***

## Key Takeaways

1. **Define contracts first.** Pydantic request/response models are the API contract. Design them before writing the endpoint logic.
2. **Stream long responses.** Users tolerate 5 seconds of streaming tokens. They don't tolerate 5 seconds of a loading spinner.
3. **Use `asyncio.gather` for independent operations.** When you need multiple LLM calls or searches, run them concurrently.
4. **Build rate limiting and cost tracking from day one.** These are harder to add later, and a runaway script calling your API can generate real costs.
5. **Retry with exponential backoff.** LLM APIs are not 100% reliable. Handle transient failures gracefully.

***

**Previous:** [**Part 5 — Prompt Engineering for Production Systems**](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/ai-engineer-101/part-5-prompt-engineering)

**Next:** [**Part 7 — Evaluating and Testing AI Systems**](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/ai-engineer-101/part-7-evaluating-ai-systems)
