# Article 2: Architecture and Stack Decisions

## Introduction

Before writing a single line of the AIOps engine, I spent time thinking about the layers. Getting the architecture wrong early creates technical debt that compounds — an AIOps system that's hard to extend means you stop extending it, and it atrophies into a static alerting layer you could have built with a webhook.

This article walks through the layered architecture of [simple-ai-agent](https://github.com/Htunn/simple-ai-agent), explains the technology choices, and covers the design decisions I'd make the same way again — and the ones I'd do differently.

## Table of Contents

1. [Layered Architecture Overview](#layered-architecture-overview)
2. [Channel Layer: Telegram and Slack](#channel-layer-telegram-and-slack)
3. [API Layer: FastAPI](#api-layer-fastapi)
4. [Business Logic Layer: Message Handling and Sessions](#business-logic-layer)
5. [AIOps Layer](#aiops-layer)
6. [MCP Layer: Tool Execution](#mcp-layer-tool-execution)
7. [Data Layer: PostgreSQL and Redis](#data-layer-postgresql-and-redis)
8. [Observability Layer](#observability-layer)
9. [Stack Decisions — What I'd Do the Same and Differently](#stack-decisions)

***

## Layered Architecture Overview

The architecture is deliberately layered. Each layer has a single responsibility and communicates with adjacent layers through defined interfaces:

```
┌─────────────────────────────────────────────────────┐
│                   Channel Layer                      │  Telegram / Slack adapters
├─────────────────────────────────────────────────────┤
│                     API Layer                        │  FastAPI, rate-limiter, webhooks
├─────────────────────────────────────────────────────┤
│                 Business Logic Layer                 │  Message handler, sessions, approvals
├────────────────────────┬────────────────────────────┤
│        AI Layer        │       AIOps Layer           │  LLM client | watchloop, rules, playbooks, RCA
├────────────────────────┴────────────────────────────┤
│                    MCP Layer                         │  MCP Manager → stdio + SSE transports
├─────────────────────────────────────────────────────┤
│                    Data Layer                        │  PostgreSQL + Redis
├─────────────────────────────────────────────────────┤
│               Observability Layer                    │  Prometheus metrics, structlog JSON, Grafana
└─────────────────────────────────────────────────────┘
```

The AI layer and AIOps layer sit side by side at the same tier because they're both consumers of the MCP layer and the data layer, but they operate independently. The AI layer handles conversational requests. The AIOps layer runs the background loop and remediation pipeline.

***

## Channel Layer: Telegram and Slack

`src/channels/` contains three files that matter:

* `base.py` — `BaseAdapter` abstract class defining the interface
* `telegram_adapter.py` — wraps `python-telegram-bot` webhook mode
* `slack_adapter.py` — wraps `slack_bolt`
* `router.py` — fan-out / fan-in: one inbound message → one handler → response routed back to originating channel

The key design constraint here is that the business logic layer has no knowledge of which channel a message came from. `MessageHandler` receives a normalized `IncomingMessage` object with `user_id`, `channel_type`, `text`, and `metadata`. The channel adapters do translation from platform-specific webhook payloads to this common format.

This means the approval manager can send a message back through either channel using the same interface, regardless of where the original alert was triggered from.

```python
# src/channels/base.py — the interface both adapters implement
class BaseAdapter(ABC):
    @abstractmethod
    async def send_message(self, recipient: str, text: str) -> None: ...
    
    @abstractmethod
    async def parse_update(self, payload: dict) -> IncomingMessage | None: ...
```

### Why Telegram and Slack (not Discord)

I started with Discord, then removed it. Discord's bot permission model is fine for gaming servers but adds unnecessary complexity for an internal ops tool — role hierarchies, guild IDs, intent flags. Telegram bots are simpler to set up and the webhook API is clean. Slack is the de-facto enterprise messaging platform.

I removed Discord to keep the adapter surface area minimal. If you need Discord, adding it is one new file implementing `BaseAdapter`.

***

## API Layer: FastAPI

`src/api/` has four files:

* `health.py` — `/health` and `/ready` endpoints
* `webhooks.py` — `/api/webhook/telegram`, `/api/webhook/slack`, `/api/alert/webhook`
* `middleware.py` — `slowapi` rate limiter setup

FastAPI was the right choice here for three reasons:

1. **Async-native.** The watch-loop runs as an async background task. The application needs to handle webhook requests while the watch-loop is polling Kubernetes concurrently. FastAPI + `asyncio` makes this natural.
2. **Pydantic validation.** All incoming webhook payloads go through Pydantic models. The Alertmanager receiver won't silently drop malformed alerts; it raises a 422 with details.
3. **Lifespan context manager.** FastAPI's `lifespan` parameter gives clean startup/shutdown hooks for the database pool, Redis connection, and watch-loop task.

```python
# src/main.py — lifespan manages all long-lived resources
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    await init_db()
    await redis.initialize()
    await mcp_manager.start_all()
    watchloop_task = asyncio.create_task(run_watchloop())
    
    yield  # Application runs
    
    # Shutdown
    watchloop_task.cancel()
    await mcp_manager.stop_all()
    await redis.close()
    await close_db()
```

### Rate Limiting

All endpoints are rate-limited with `slowapi` (per-IP). The default is 60 requests/minute, configurable via `RATE_LIMIT_PER_MINUTE`. This matters because Slack can retry webhook deliveries aggressively if your endpoint is slow or errors.

***

## Business Logic Layer

`src/services/` is the core of what the application does when a user sends a message:

### `message_handler.py` — Intent Detection

This is the most important file for reactive mode. Given a message like `"show me error pods in production"`, the handler needs to decide:

1. Is this a Kubernetes query? → route to `KubernetesHandler`
2. Is this a security scan request? → route to MCP security tools
3. Is this an approval/rejection? → route to `ApprovalManager`
4. Otherwise, send to the LLM with conversation context

Intent detection is not an ML classifier. It's keyword matching with priority order:

```python
# Simplified from src/services/message_handler.py
async def handle(self, message: IncomingMessage) -> str:
    text = message.text.lower().strip()
    
    # Highest priority: approval responses
    if text.startswith(("approve ", "reject ")):
        return await self.approval_manager.handle_response(message)
    
    # Kubernetes commands (explicit prefix)
    if text.startswith("/k8s "):
        return await self.k8s_handler.handle(message)
    
    # Natural language Kubernetes queries
    if any(kw in text for kw in K8S_KEYWORDS):
        return await self.k8s_handler.handle(message)
    
    # Security scanning queries
    if any(kw in text for kw in SECURITY_KEYWORDS):
        return await self.llm.chat_with_tools(message, tools=SECURITY_TOOLS)
    
    # General LLM response
    return await self.llm.chat(message)
```

This is intentionally simple. I experimented with using the LLM itself for intent classification, but the latency was unacceptable for Kubernetes status queries where the user expects a fast response. Keyword matching on well-defined prefixes is 0ms; an LLM classification call is 500–2000ms.

### `session_manager.py` — Redis TTL Sessions

Each user gets a session stored in Redis with a configurable TTL. The session holds:

* Active channel (telegram or slack)
* Model preference
* Any pending context from multi-turn interactions

Sessions expire automatically. There's no session cleanup job needed.

### `approval_manager.py` — Human-in-the-Loop Gate

I'll cover this in detail in [Article 5](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/aiops-101/aiops-101-playbooks-human-in-the-loop). The short version: when a playbook step has `risk: MEDIUM` or `risk: HIGH`, the executor calls `ApprovalManager.request()` which:

1. Generates a unique `approval_id`
2. Stores the pending action in Redis with a 5-minute TTL (`APPROVAL_TIMEOUT_SECONDS`)
3. Sends a formatted message to the configured notification channel
4. Returns an `awaitable` that resolves when the user approves/rejects or the TTL expires

***

## AIOps Layer

`src/aiops/` and `src/monitoring/watchloop.py` are covered in depth in Articles 3–6. The quick reference:

| File                          | Responsibility                                                |
| ----------------------------- | ------------------------------------------------------------- |
| `src/monitoring/watchloop.py` | Background async task; polls Kubernetes every N seconds       |
| `src/aiops/rule_engine.py`    | Matches `ClusterEvent` objects against YAML-defined rules     |
| `src/aiops/playbooks.py`      | Ordered remediation steps with risk classification            |
| `src/aiops/rca_engine.py`     | Builds context, calls LLM, returns structured JSON RCA report |
| `src/aiops/log_analyzer.py`   | Pattern matching on pod/container logs                        |

The AIOps layer does not call the business logic layer directly. Communication happens at the data layer (shared Redis/PostgreSQL) and through the notification channel (which goes through the channel adapters).

***

## MCP Layer: Tool Execution

MCP (Model Context Protocol) is how the agent executes actions — listing Kubernetes resources, running security scans — without giving the LLM unrestricted access to a shell.

`src/mcp/` has:

* `mcp_manager.py` — lifecycle management (start/stop) and routing
* `base_transport.py` — abstract transport ABC
* `stdio_transport.py` — subprocess-based transport for local MCP servers
* `sse_transport.py` — HTTP SSE transport for cloud MCP servers

Two MCP servers are configured in `.mcp-config.json`:

```json
{
  "mcpServers": {
    "kubernetes": {
      "type": "stdio",
      "command": "python3",
      "args": ["scripts/mcp_server.py"],
      "description": "Kubernetes management tools via kubectl"
    },
    "simplePortChecker": {
      "type": "sse",
      "url": "https://mcp.simpleportchecker.com/mcp",
      "description": "Security scanning and port checking tools"
    }
  }
}
```

`MCPManager` maintains a `tool_registry` mapping tool names to server names. When the message handler or playbook executor calls `MCPManager.call_tool("list_pods", {"namespace": "production"})`, the manager looks up which server owns `list_pods` and dispatches via the correct transport.

### Why MCP Instead of Direct kubectl Subprocess

I could have called `subprocess.run(["kubectl", "get", "pods"])` directly. I chose MCP because:

1. **Defined tool contracts.** Each tool has a typed input schema. The LLM can't accidentally construct a destructive command by passing unexpected parameters.
2. **Extensibility.** Adding a new capability means adding a tool to the MCP server, not modifying the business logic.
3. **Testability.** The MCP transport is an interface I can mock in tests.
4. **Remote servers.** The SSE transport means I can use cloud-hosted MCP servers (`simpleportchecker.com`) alongside local ones without changing the calling code.

***

## Data Layer: PostgreSQL and Redis

### PostgreSQL 16

PostgreSQL stores four main tables:

* `users` — user profile, preferred model, channel configs
* `conversations` — session metadata per user
* `messages` — full conversation history (role, content, JSONB metadata)
* `cluster_events` — persisted `ClusterEvent` objects from the watch-loop

The conversation history table enables the context builder (`src/ai/context_builder.py`) to load the last N messages and send them as the conversation window to the LLM. This is how the agent "remembers" what was discussed earlier in a session.

All database access is async via `asyncpg` + `SQLAlchemy` with async sessions. Schema migrations are managed with Alembic (`alembic/`).

### Redis 7

Redis serves two distinct purposes:

1. **Session cache** — lightweight user session state, TTL-based expiry, sub-millisecond access
2. **Approval store** — pending remediation approvals, 5-minute TTL, `HSET` for atomicity

The session cache is a read-heavy, write-occasionally workload. The approval store is write-once, read-once (or TTL-expire). Both fit Redis's simple key-value model well.

I enabled AOF persistence on Redis so approvals survive a Redis restart during a live incident.

***

## Observability Layer

`src/monitoring/prometheus.py` registers Prometheus counters and histograms that the agent exposes at `/metrics`. Grafana reads from Prometheus and the dashboards in `config/grafana/` are provisioned automatically via Docker Compose volume mounts.

`structlog` produces JSON-formatted log output. Every log record includes `event`, `level`, `timestamp`, plus context fields like `user_id`, `channel`, `tool_name`, `approval_id`. This makes filtering in log aggregators trivial.

I'll cover the full observability setup in [Article 8](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/aiops-101/aiops-101-observability).

***

## Stack Decisions

### What I'd Do the Same

**FastAPI + asyncio**: The async model is non-negotiable when you have background tasks (watch-loop), long-running tool calls (MCP), and simultaneous webhook handling. Synchronous frameworks like Flask would require threading or separate processes for the watch-loop.

**Redis for approvals**: The TTL semantics of Redis are perfect for approval timeouts. If the approval key expires, the approval is gone. No cleanup jobs, no expired-approval handling code.

**Pydantic Settings** (`src/config.py`): Every environment variable goes through a Pydantic `BaseSettings` model. Configuration errors fail fast at startup with clear messages rather than `KeyError` exceptions buried in application code.

**MCP for tool execution**: Defining tools as explicit schemas rather than allowing LLM-generated shell commands is a security and reliability decision I'd make the same way every time.

### What I'd Do Differently

**Alembic migration discipline**: In the early iteration, I ran schema changes manually and added Alembic late. Setting up Alembic as the very first thing before writing any models would have saved several migrations that existed just to fix the initial schema.

**Event sourcing for `ClusterEvent`**: Currently, cluster events are stored as rows with a `status` field. An event-sourced log (append-only, events with timestamps) would make replaying the incident timeline cleaner for the RCA engine. This is something I want to evolve toward.

**Separate AIOps process**: The watch-loop runs as an async task inside the same FastAPI process. For a production-grade system running on a real cluster, I'd move the watch-loop to a separate process (or a separate Kubernetes Deployment) so that API traffic doesn't affect polling latency and vice versa. For my homelab, shared process is fine.

***

**Next**: [Article 3 — The Watch-Loop: Continuous Cluster Health Polling](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/aiops-101/aiops-101-watch-loop)
