Article 2: Architecture and Stack Decisions

Introduction

Before writing a single line of the AIOps engine, I spent time thinking about the layers. Getting the architecture wrong early creates technical debt that compounds β€” an AIOps system that's hard to extend means you stop extending it, and it atrophies into a static alerting layer you could have built with a webhook.

This article walks through the layered architecture of simple-ai-agentarrow-up-right, explains the technology choices, and covers the design decisions I'd make the same way again β€” and the ones I'd do differently.

Table of Contents


Layered Architecture Overview

The architecture is deliberately layered. Each layer has a single responsibility and communicates with adjacent layers through defined interfaces:

The AI layer and AIOps layer sit side by side at the same tier because they're both consumers of the MCP layer and the data layer, but they operate independently. The AI layer handles conversational requests. The AIOps layer runs the background loop and remediation pipeline.


Channel Layer: Telegram and Slack

src/channels/ contains three files that matter:

  • base.py β€” BaseAdapter abstract class defining the interface

  • telegram_adapter.py β€” wraps python-telegram-bot webhook mode

  • slack_adapter.py β€” wraps slack_bolt

  • router.py β€” fan-out / fan-in: one inbound message β†’ one handler β†’ response routed back to originating channel

The key design constraint here is that the business logic layer has no knowledge of which channel a message came from. MessageHandler receives a normalized IncomingMessage object with user_id, channel_type, text, and metadata. The channel adapters do translation from platform-specific webhook payloads to this common format.

This means the approval manager can send a message back through either channel using the same interface, regardless of where the original alert was triggered from.

Why Telegram and Slack (not Discord)

I started with Discord, then removed it. Discord's bot permission model is fine for gaming servers but adds unnecessary complexity for an internal ops tool β€” role hierarchies, guild IDs, intent flags. Telegram bots are simpler to set up and the webhook API is clean. Slack is the de-facto enterprise messaging platform.

I removed Discord to keep the adapter surface area minimal. If you need Discord, adding it is one new file implementing BaseAdapter.


API Layer: FastAPI

src/api/ has four files:

  • health.py β€” /health and /ready endpoints

  • webhooks.py β€” /api/webhook/telegram, /api/webhook/slack, /api/alert/webhook

  • middleware.py β€” slowapi rate limiter setup

FastAPI was the right choice here for three reasons:

  1. Async-native. The watch-loop runs as an async background task. The application needs to handle webhook requests while the watch-loop is polling Kubernetes concurrently. FastAPI + asyncio makes this natural.

  2. Pydantic validation. All incoming webhook payloads go through Pydantic models. The Alertmanager receiver won't silently drop malformed alerts; it raises a 422 with details.

  3. Lifespan context manager. FastAPI's lifespan parameter gives clean startup/shutdown hooks for the database pool, Redis connection, and watch-loop task.

Rate Limiting

All endpoints are rate-limited with slowapi (per-IP). The default is 60 requests/minute, configurable via RATE_LIMIT_PER_MINUTE. This matters because Slack can retry webhook deliveries aggressively if your endpoint is slow or errors.


Business Logic Layer

src/services/ is the core of what the application does when a user sends a message:

message_handler.py β€” Intent Detection

This is the most important file for reactive mode. Given a message like "show me error pods in production", the handler needs to decide:

  1. Is this a Kubernetes query? β†’ route to KubernetesHandler

  2. Is this a security scan request? β†’ route to MCP security tools

  3. Is this an approval/rejection? β†’ route to ApprovalManager

  4. Otherwise, send to the LLM with conversation context

Intent detection is not an ML classifier. It's keyword matching with priority order:

This is intentionally simple. I experimented with using the LLM itself for intent classification, but the latency was unacceptable for Kubernetes status queries where the user expects a fast response. Keyword matching on well-defined prefixes is 0ms; an LLM classification call is 500–2000ms.

session_manager.py β€” Redis TTL Sessions

Each user gets a session stored in Redis with a configurable TTL. The session holds:

  • Active channel (telegram or slack)

  • Model preference

  • Any pending context from multi-turn interactions

Sessions expire automatically. There's no session cleanup job needed.

approval_manager.py β€” Human-in-the-Loop Gate

I'll cover this in detail in Article 5. The short version: when a playbook step has risk: MEDIUM or risk: HIGH, the executor calls ApprovalManager.request() which:

  1. Generates a unique approval_id

  2. Stores the pending action in Redis with a 5-minute TTL (APPROVAL_TIMEOUT_SECONDS)

  3. Sends a formatted message to the configured notification channel

  4. Returns an awaitable that resolves when the user approves/rejects or the TTL expires


AIOps Layer

src/aiops/ and src/monitoring/watchloop.py are covered in depth in Articles 3–6. The quick reference:

File
Responsibility

src/monitoring/watchloop.py

Background async task; polls Kubernetes every N seconds

src/aiops/rule_engine.py

Matches ClusterEvent objects against YAML-defined rules

src/aiops/playbooks.py

Ordered remediation steps with risk classification

src/aiops/rca_engine.py

Builds context, calls LLM, returns structured JSON RCA report

src/aiops/log_analyzer.py

Pattern matching on pod/container logs

The AIOps layer does not call the business logic layer directly. Communication happens at the data layer (shared Redis/PostgreSQL) and through the notification channel (which goes through the channel adapters).


MCP Layer: Tool Execution

MCP (Model Context Protocol) is how the agent executes actions β€” listing Kubernetes resources, running security scans β€” without giving the LLM unrestricted access to a shell.

src/mcp/ has:

  • mcp_manager.py β€” lifecycle management (start/stop) and routing

  • base_transport.py β€” abstract transport ABC

  • stdio_transport.py β€” subprocess-based transport for local MCP servers

  • sse_transport.py β€” HTTP SSE transport for cloud MCP servers

Two MCP servers are configured in .mcp-config.json:

MCPManager maintains a tool_registry mapping tool names to server names. When the message handler or playbook executor calls MCPManager.call_tool("list_pods", {"namespace": "production"}), the manager looks up which server owns list_pods and dispatches via the correct transport.

Why MCP Instead of Direct kubectl Subprocess

I could have called subprocess.run(["kubectl", "get", "pods"]) directly. I chose MCP because:

  1. Defined tool contracts. Each tool has a typed input schema. The LLM can't accidentally construct a destructive command by passing unexpected parameters.

  2. Extensibility. Adding a new capability means adding a tool to the MCP server, not modifying the business logic.

  3. Testability. The MCP transport is an interface I can mock in tests.

  4. Remote servers. The SSE transport means I can use cloud-hosted MCP servers (simpleportchecker.com) alongside local ones without changing the calling code.


Data Layer: PostgreSQL and Redis

PostgreSQL 16

PostgreSQL stores four main tables:

  • users β€” user profile, preferred model, channel configs

  • conversations β€” session metadata per user

  • messages β€” full conversation history (role, content, JSONB metadata)

  • cluster_events β€” persisted ClusterEvent objects from the watch-loop

The conversation history table enables the context builder (src/ai/context_builder.py) to load the last N messages and send them as the conversation window to the LLM. This is how the agent "remembers" what was discussed earlier in a session.

All database access is async via asyncpg + SQLAlchemy with async sessions. Schema migrations are managed with Alembic (alembic/).

Redis 7

Redis serves two distinct purposes:

  1. Session cache β€” lightweight user session state, TTL-based expiry, sub-millisecond access

  2. Approval store β€” pending remediation approvals, 5-minute TTL, HSET for atomicity

The session cache is a read-heavy, write-occasionally workload. The approval store is write-once, read-once (or TTL-expire). Both fit Redis's simple key-value model well.

I enabled AOF persistence on Redis so approvals survive a Redis restart during a live incident.


Observability Layer

src/monitoring/prometheus.py registers Prometheus counters and histograms that the agent exposes at /metrics. Grafana reads from Prometheus and the dashboards in config/grafana/ are provisioned automatically via Docker Compose volume mounts.

structlog produces JSON-formatted log output. Every log record includes event, level, timestamp, plus context fields like user_id, channel, tool_name, approval_id. This makes filtering in log aggregators trivial.

I'll cover the full observability setup in Article 8.


Stack Decisions

What I'd Do the Same

FastAPI + asyncio: The async model is non-negotiable when you have background tasks (watch-loop), long-running tool calls (MCP), and simultaneous webhook handling. Synchronous frameworks like Flask would require threading or separate processes for the watch-loop.

Redis for approvals: The TTL semantics of Redis are perfect for approval timeouts. If the approval key expires, the approval is gone. No cleanup jobs, no expired-approval handling code.

Pydantic Settings (src/config.py): Every environment variable goes through a Pydantic BaseSettings model. Configuration errors fail fast at startup with clear messages rather than KeyError exceptions buried in application code.

MCP for tool execution: Defining tools as explicit schemas rather than allowing LLM-generated shell commands is a security and reliability decision I'd make the same way every time.

What I'd Do Differently

Alembic migration discipline: In the early iteration, I ran schema changes manually and added Alembic late. Setting up Alembic as the very first thing before writing any models would have saved several migrations that existed just to fix the initial schema.

Event sourcing for ClusterEvent: Currently, cluster events are stored as rows with a status field. An event-sourced log (append-only, events with timestamps) would make replaying the incident timeline cleaner for the RCA engine. This is something I want to evolve toward.

Separate AIOps process: The watch-loop runs as an async task inside the same FastAPI process. For a production-grade system running on a real cluster, I'd move the watch-loop to a separate process (or a separate Kubernetes Deployment) so that API traffic doesn't affect polling latency and vice versa. For my homelab, shared process is fine.


Next: Article 3 β€” The Watch-Loop: Continuous Cluster Health Polling

Last updated