# Part 4: Async Programming and FastAPI

## Introduction

The `ansible-inspec` server runs compliance jobs in the background while still serving API requests. Without async, every long-running job would block the HTTP server entirely. Python's `asyncio` plus FastAPI is the combination I landed on — FastAPI handles routing and validation, `asyncio` manages concurrent I/O, and `BackgroundTasks` dispatches jobs without blocking the caller.

This part covers async programming from first principles up to a realistic FastAPI server pattern.

***

## Sync vs Async — The Core Difference

```python
import time
import asyncio

# Synchronous — each sleep blocks the entire thread
def sync_check(host: str, delay: float = 1.0) -> str:
    time.sleep(delay)        # blocks everything
    return f"{host}: ok"

# Asynchronous — sleep yields control back to the event loop
async def async_check(host: str, delay: float = 1.0) -> str:
    await asyncio.sleep(delay)   # yields; other coroutines can run
    return f"{host}: ok"
```

**Running three sync checks** takes 3 seconds (sequential).\
**Running three async checks** with `asyncio.gather` takes \~1 second (concurrent).

```python
import asyncio
import time

async def main() -> None:
    hosts = ["web-01", "web-02", "db-01"]

    # Sequential — 3 × 1s = ~3s
    start = time.perf_counter()
    for host in hosts:
        result = await async_check(host, delay=1.0)
        print(result)
    print(f"Sequential: {time.perf_counter() - start:.2f}s")

    # Concurrent — all run at the same time → ~1s
    start = time.perf_counter()
    results = await asyncio.gather(*[async_check(h, delay=1.0) for h in hosts])
    print(results)
    print(f"Concurrent: {time.perf_counter() - start:.2f}s")

asyncio.run(main())
```

***

## `async/await` Fundamentals

### Coroutine vs function

```python
def regular() -> int:
    return 42

async def coroutine() -> int:
    return 42

# A coroutine function returns a coroutine object when called — NOT the value
print(regular())     # 42
print(coroutine())   # <coroutine object coroutine at 0x...>
                     # must be awaited or passed to asyncio.run()
```

### `await` — yield until ready

```python
import asyncio
import httpx

async def fetch_profile_metadata(profile_name: str) -> dict:
    url = f"https://supermarket.chef.io/api/v1/cookbooks/{profile_name}"
    async with httpx.AsyncClient() as client:
        response = await client.get(url, timeout=10.0)
        response.raise_for_status()
        return response.json()

async def main() -> None:
    data = await fetch_profile_metadata("linux-baseline")
    print(data.get("name"))

asyncio.run(main())
```

### `asyncio.gather` — run concurrently

```python
import asyncio

async def check_host(host: str) -> dict:
    await asyncio.sleep(0.5)   # simulate SSH round-trip
    return {"host": host, "reachable": True}

async def check_all(hosts: list[str]) -> list[dict]:
    # All checks run concurrently
    return await asyncio.gather(*[check_host(h) for h in hosts])

results = asyncio.run(check_all(["web-01", "web-02", "db-01"]))
print(results)
# [{'host': 'web-01', ...}, {'host': 'web-02', ...}, {'host': 'db-01', ...}]
```

### Error handling in `gather`

```python
import asyncio

async def risky(host: str) -> dict:
    if host == "db-01":
        raise ConnectionRefusedError(f"Cannot reach {host}")
    return {"host": host, "status": "ok"}

async def safe_check_all(hosts: list[str]) -> list[dict | Exception]:
    # return_exceptions=True — exceptions become values, not raises
    results = await asyncio.gather(
        *[risky(h) for h in hosts],
        return_exceptions=True,
    )
    return results

async def main() -> None:
    hosts = ["web-01", "web-02", "db-01"]
    results = await safe_check_all(hosts)
    for host, result in zip(hosts, results):
        if isinstance(result, Exception):
            print(f"{host}: FAILED — {result}")
        else:
            print(f"{host}: {result['status']}")

asyncio.run(main())
```

### Timeout with `asyncio.wait_for`

```python
import asyncio

async def slow_check(host: str) -> dict:
    await asyncio.sleep(10)   # simulate very slow host
    return {"host": host, "status": "ok"}

async def check_with_timeout(host: str, timeout: float = 5.0) -> dict:
    try:
        return await asyncio.wait_for(slow_check(host), timeout=timeout)
    except asyncio.TimeoutError:
        return {"host": host, "status": "timeout"}
```

### `asyncio.Queue` — worker pool pattern

The job executor in `ansible-inspec` uses a queue of pending jobs and a pool of worker coroutines:

```python
import asyncio
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class Job:
    job_id: str
    profile: str
    hosts: list[str]
    created_at: datetime = field(default_factory=datetime.now)

async def worker(queue: asyncio.Queue, worker_id: int) -> None:
    while True:
        job: Job = await queue.get()
        print(f"[worker-{worker_id}] Starting job {job.job_id} on {job.hosts}")
        await asyncio.sleep(1)   # simulate compliance check
        print(f"[worker-{worker_id}] Finished job {job.job_id}")
        queue.task_done()

async def run_job_executor(jobs: list[Job], concurrency: int = 3) -> None:
    queue: asyncio.Queue = asyncio.Queue()

    for job in jobs:
        await queue.put(job)

    workers = [
        asyncio.create_task(worker(queue, i))
        for i in range(concurrency)
    ]

    await queue.join()   # wait for all jobs to complete

    for w in workers:
        w.cancel()       # shut down workers

asyncio.run(run_job_executor([
    Job("j1", "linux-baseline", ["web-01"]),
    Job("j2", "ssh-baseline", ["web-02"]),
    Job("j3", "docker-cis", ["db-01"]),
    Job("j4", "nginx-baseline", ["lb-01"]),
]))
```

***

## CPU-bound Work in Async Code

`asyncio` is single-threaded — it can't parallelise CPU-bound work. For that, use `asyncio.to_thread` (3.9+) or `ProcessPoolExecutor`:

```python
import asyncio
from concurrent.futures import ProcessPoolExecutor

def parse_profile_sync(content: str) -> list[dict]:
    """CPU-intensive Ruby AST parsing — runs in a separate process."""
    import re
    controls = re.findall(r'control\s+"([\w-]+)"', content)
    return [{"id": c} for c in controls]

async def parse_profile_async(content: str) -> list[dict]:
    loop = asyncio.get_running_loop()
    with ProcessPoolExecutor() as pool:
        return await loop.run_in_executor(pool, parse_profile_sync, content)

# For I/O-bound blocking code (subprocess, DB driver without async support)
async def run_legacy_sync() -> str:
    import subprocess
    result = await asyncio.to_thread(
        subprocess.check_output, ["ansible", "--version"]
    )
    return result.decode()
```

***

## FastAPI Server

FastAPI is an async-first web framework that auto-generates OpenAPI docs from Pydantic models and type annotations. `ansible-inspec` uses it for the REST API at `localhost:8080`.

### Minimal server

```python
# lib/ansible_inspec/server/app.py
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI(
    title="ansible-inspec API",
    version="0.2.12",
    description="Compliance testing via Ansible + InSpec",
)

class HealthResponse(BaseModel):
    status: str
    version: str

@app.get("/health", response_model=HealthResponse)
async def health() -> HealthResponse:
    return HealthResponse(status="ok", version="0.2.12")
```

Run with:

```bash
uvicorn ansible_inspec.server.app:app --host 0.0.0.0 --port 8080 --reload
```

### Job templates endpoint

```python
from fastapi import FastAPI, HTTPException, status
from pydantic import BaseModel, Field

app = FastAPI()

# In-memory store (replace with Prisma DB in production)
_templates: dict[str, dict] = {}

class JobTemplateCreate(BaseModel):
    name: str
    profile: str
    timeout: int = Field(default=300, ge=10, le=3600)
    supermarket: bool = False

class JobTemplateResponse(JobTemplateCreate):
    id: str
    created_at: str

@app.post(
    "/api/v1/templates",
    response_model=JobTemplateResponse,
    status_code=status.HTTP_201_CREATED,
)
async def create_template(payload: JobTemplateCreate) -> JobTemplateResponse:
    import uuid
    from datetime import datetime, timezone

    if payload.name in _templates:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail=f"Template '{payload.name}' already exists",
        )

    template_id = str(uuid.uuid4())
    template = {
        **payload.model_dump(),
        "id": template_id,
        "created_at": datetime.now(timezone.utc).isoformat(),
    }
    _templates[payload.name] = template
    return JobTemplateResponse(**template)

@app.get("/api/v1/templates", response_model=list[JobTemplateResponse])
async def list_templates() -> list[JobTemplateResponse]:
    return [JobTemplateResponse(**t) for t in _templates.values()]

@app.get("/api/v1/templates/{name}", response_model=JobTemplateResponse)
async def get_template(name: str) -> JobTemplateResponse:
    if name not in _templates:
        raise HTTPException(status_code=404, detail=f"Template '{name}' not found")
    return JobTemplateResponse(**_templates[name])

@app.delete("/api/v1/templates/{name}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_template(name: str) -> None:
    if name not in _templates:
        raise HTTPException(status_code=404, detail=f"Template '{name}' not found")
    del _templates[name]
```

### Background job execution

`ansible-inspec` runs compliance jobs in `BackgroundTasks` so the API returns immediately:

```python
import uuid
import asyncio
from datetime import datetime, timezone
from fastapi import FastAPI, BackgroundTasks, HTTPException
from pydantic import BaseModel

app = FastAPI()

_jobs: dict[str, dict] = {}

class ExecuteRequest(BaseModel):
    template_name: str
    hosts: list[str]

class JobStatusResponse(BaseModel):
    job_id: str
    status: str
    started_at: str | None = None
    finished_at: str | None = None

async def _run_job(job_id: str, template_name: str, hosts: list[str]) -> None:
    """Background coroutine — runs the compliance check."""
    _jobs[job_id]["status"] = "running"
    _jobs[job_id]["started_at"] = datetime.now(timezone.utc).isoformat()

    try:
        # Simulate concurrent per-host checks
        async def check_one(host: str) -> dict:
            await asyncio.sleep(2)   # real: shell out to ansible-playbook
            return {"host": host, "passed": 5, "failed": 0}

        results = await asyncio.gather(*[check_one(h) for h in hosts])
        _jobs[job_id]["status"] = "success"
        _jobs[job_id]["results"] = results
    except Exception as exc:
        _jobs[job_id]["status"] = "failed"
        _jobs[job_id]["error"] = str(exc)
    finally:
        _jobs[job_id]["finished_at"] = datetime.now(timezone.utc).isoformat()

@app.post("/api/v1/jobs", response_model=JobStatusResponse,
          status_code=202)
async def submit_job(
    payload: ExecuteRequest,
    background_tasks: BackgroundTasks,
) -> JobStatusResponse:
    job_id = str(uuid.uuid4())
    _jobs[job_id] = {"job_id": job_id, "status": "pending"}

    background_tasks.add_task(
        _run_job, job_id, payload.template_name, payload.hosts
    )

    return JobStatusResponse(job_id=job_id, status="pending")

@app.get("/api/v1/jobs/{job_id}", response_model=dict)
async def get_job_status(job_id: str) -> dict:
    if job_id not in _jobs:
        raise HTTPException(status_code=404, detail="Job not found")
    return _jobs[job_id]
```

### Dependency injection with `Depends`

FastAPI's `Depends` wires shared resources (DB sessions, auth, config) into route handlers:

```python
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

app = FastAPI()
security = HTTPBearer()

def verify_token(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> str:
    token = credentials.credentials
    # jwt.decode(token, settings.auth.jwt_secret, ...) — simplified
    if token != "dev-token":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token",
        )
    return token

@app.get("/api/v1/protected")
async def protected_route(token: str = Depends(verify_token)) -> dict:
    return {"message": "Authenticated", "token_prefix": token[:8]}
```

### Lifespan — startup and shutdown hooks

```python
from contextlib import asynccontextmanager
from fastapi import FastAPI

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup — runs before the server accepts requests
    print("Starting ansible-inspec server...")
    # await db.connect()
    # await job_queue.start()

    yield  # server runs here

    # Shutdown — runs when server stops
    print("Shutting down...")
    # await db.disconnect()

app = FastAPI(lifespan=lifespan)
```

### Router organisation

For larger projects, split routes into separate files:

```python
# lib/ansible_inspec/server/routes/templates.py
from fastapi import APIRouter

router = APIRouter(prefix="/api/v1/templates", tags=["templates"])

@router.get("/")
async def list_templates() -> list[dict]:
    return []

# lib/ansible_inspec/server/routes/jobs.py
from fastapi import APIRouter

router = APIRouter(prefix="/api/v1/jobs", tags=["jobs"])

@router.post("/")
async def submit_job() -> dict:
    return {"job_id": "abc"}

# lib/ansible_inspec/server/app.py
from fastapi import FastAPI
from .routes import templates, jobs

app = FastAPI()
app.include_router(templates.router)
app.include_router(jobs.router)
```

***

## Error Handling

```python
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

app = FastAPI()

class AppError(Exception):
    def __init__(self, message: str, code: int = 500) -> None:
        self.message = message
        self.code = code

@app.exception_handler(AppError)
async def app_error_handler(request: Request, exc: AppError) -> JSONResponse:
    return JSONResponse(
        status_code=exc.code,
        content={"detail": exc.message},
    )

@app.get("/fail")
async def fail() -> dict:
    raise AppError("Something went wrong", code=503)
```

***

## Summary

| Concept             | Key point                                            |
| ------------------- | ---------------------------------------------------- |
| `async/await`       | `await` yields control; doesn't block the event loop |
| `asyncio.gather`    | Run multiple coroutines concurrently                 |
| `asyncio.wait_for`  | Timeout wrapper for any coroutine                    |
| `asyncio.Queue`     | Worker pool for job queues                           |
| `asyncio.to_thread` | Run blocking sync code without blocking the loop     |
| FastAPI routing     | Type-annotated + Pydantic = auto OpenAPI docs        |
| `BackgroundTasks`   | Non-blocking job dispatch                            |
| `Depends`           | Dependency injection for auth, DB, config            |
| `lifespan`          | Async startup/shutdown hooks                         |

***

## What's Next

[Part 5](https://blog.htunnthuthu.com/getting-started/programming/python-101/python-101-part-5) covers `pytest`, async test fixtures, and how to build a robust CI-checked test suite — the same approach used in `ansible-inspec`'s `tests/` directory.
