Article 2: Architecture and Stack Decisions
Introduction
Before writing a single line of the AIOps engine, I spent time thinking about the layers. Getting the architecture wrong early creates technical debt that compounds β an AIOps system that's hard to extend means you stop extending it, and it atrophies into a static alerting layer you could have built with a webhook.
This article walks through the layered architecture of simple-ai-agent, explains the technology choices, and covers the design decisions I'd make the same way again β and the ones I'd do differently.
Table of Contents
Layered Architecture Overview
The architecture is deliberately layered. Each layer has a single responsibility and communicates with adjacent layers through defined interfaces:
The AI layer and AIOps layer sit side by side at the same tier because they're both consumers of the MCP layer and the data layer, but they operate independently. The AI layer handles conversational requests. The AIOps layer runs the background loop and remediation pipeline.
Channel Layer: Telegram and Slack
src/channels/ contains three files that matter:
base.pyβBaseAdapterabstract class defining the interfacetelegram_adapter.pyβ wrapspython-telegram-botwebhook modeslack_adapter.pyβ wrapsslack_boltrouter.pyβ fan-out / fan-in: one inbound message β one handler β response routed back to originating channel
The key design constraint here is that the business logic layer has no knowledge of which channel a message came from. MessageHandler receives a normalized IncomingMessage object with user_id, channel_type, text, and metadata. The channel adapters do translation from platform-specific webhook payloads to this common format.
This means the approval manager can send a message back through either channel using the same interface, regardless of where the original alert was triggered from.
Why Telegram and Slack (not Discord)
I started with Discord, then removed it. Discord's bot permission model is fine for gaming servers but adds unnecessary complexity for an internal ops tool β role hierarchies, guild IDs, intent flags. Telegram bots are simpler to set up and the webhook API is clean. Slack is the de-facto enterprise messaging platform.
I removed Discord to keep the adapter surface area minimal. If you need Discord, adding it is one new file implementing BaseAdapter.
API Layer: FastAPI
src/api/ has four files:
health.pyβ/healthand/readyendpointswebhooks.pyβ/api/webhook/telegram,/api/webhook/slack,/api/alert/webhookmiddleware.pyβslowapirate limiter setup
FastAPI was the right choice here for three reasons:
Async-native. The watch-loop runs as an async background task. The application needs to handle webhook requests while the watch-loop is polling Kubernetes concurrently. FastAPI +
asynciomakes this natural.Pydantic validation. All incoming webhook payloads go through Pydantic models. The Alertmanager receiver won't silently drop malformed alerts; it raises a 422 with details.
Lifespan context manager. FastAPI's
lifespanparameter gives clean startup/shutdown hooks for the database pool, Redis connection, and watch-loop task.
Rate Limiting
All endpoints are rate-limited with slowapi (per-IP). The default is 60 requests/minute, configurable via RATE_LIMIT_PER_MINUTE. This matters because Slack can retry webhook deliveries aggressively if your endpoint is slow or errors.
Business Logic Layer
src/services/ is the core of what the application does when a user sends a message:
message_handler.py β Intent Detection
message_handler.py β Intent DetectionThis is the most important file for reactive mode. Given a message like "show me error pods in production", the handler needs to decide:
Is this a Kubernetes query? β route to
KubernetesHandlerIs this a security scan request? β route to MCP security tools
Is this an approval/rejection? β route to
ApprovalManagerOtherwise, send to the LLM with conversation context
Intent detection is not an ML classifier. It's keyword matching with priority order:
This is intentionally simple. I experimented with using the LLM itself for intent classification, but the latency was unacceptable for Kubernetes status queries where the user expects a fast response. Keyword matching on well-defined prefixes is 0ms; an LLM classification call is 500β2000ms.
session_manager.py β Redis TTL Sessions
session_manager.py β Redis TTL SessionsEach user gets a session stored in Redis with a configurable TTL. The session holds:
Active channel (telegram or slack)
Model preference
Any pending context from multi-turn interactions
Sessions expire automatically. There's no session cleanup job needed.
approval_manager.py β Human-in-the-Loop Gate
approval_manager.py β Human-in-the-Loop GateI'll cover this in detail in Article 5. The short version: when a playbook step has risk: MEDIUM or risk: HIGH, the executor calls ApprovalManager.request() which:
Generates a unique
approval_idStores the pending action in Redis with a 5-minute TTL (
APPROVAL_TIMEOUT_SECONDS)Sends a formatted message to the configured notification channel
Returns an
awaitablethat resolves when the user approves/rejects or the TTL expires
AIOps Layer
src/aiops/ and src/monitoring/watchloop.py are covered in depth in Articles 3β6. The quick reference:
src/monitoring/watchloop.py
Background async task; polls Kubernetes every N seconds
src/aiops/rule_engine.py
Matches ClusterEvent objects against YAML-defined rules
src/aiops/playbooks.py
Ordered remediation steps with risk classification
src/aiops/rca_engine.py
Builds context, calls LLM, returns structured JSON RCA report
src/aiops/log_analyzer.py
Pattern matching on pod/container logs
The AIOps layer does not call the business logic layer directly. Communication happens at the data layer (shared Redis/PostgreSQL) and through the notification channel (which goes through the channel adapters).
MCP Layer: Tool Execution
MCP (Model Context Protocol) is how the agent executes actions β listing Kubernetes resources, running security scans β without giving the LLM unrestricted access to a shell.
src/mcp/ has:
mcp_manager.pyβ lifecycle management (start/stop) and routingbase_transport.pyβ abstract transport ABCstdio_transport.pyβ subprocess-based transport for local MCP serverssse_transport.pyβ HTTP SSE transport for cloud MCP servers
Two MCP servers are configured in .mcp-config.json:
MCPManager maintains a tool_registry mapping tool names to server names. When the message handler or playbook executor calls MCPManager.call_tool("list_pods", {"namespace": "production"}), the manager looks up which server owns list_pods and dispatches via the correct transport.
Why MCP Instead of Direct kubectl Subprocess
I could have called subprocess.run(["kubectl", "get", "pods"]) directly. I chose MCP because:
Defined tool contracts. Each tool has a typed input schema. The LLM can't accidentally construct a destructive command by passing unexpected parameters.
Extensibility. Adding a new capability means adding a tool to the MCP server, not modifying the business logic.
Testability. The MCP transport is an interface I can mock in tests.
Remote servers. The SSE transport means I can use cloud-hosted MCP servers (
simpleportchecker.com) alongside local ones without changing the calling code.
Data Layer: PostgreSQL and Redis
PostgreSQL 16
PostgreSQL stores four main tables:
usersβ user profile, preferred model, channel configsconversationsβ session metadata per usermessagesβ full conversation history (role, content, JSONB metadata)cluster_eventsβ persistedClusterEventobjects from the watch-loop
The conversation history table enables the context builder (src/ai/context_builder.py) to load the last N messages and send them as the conversation window to the LLM. This is how the agent "remembers" what was discussed earlier in a session.
All database access is async via asyncpg + SQLAlchemy with async sessions. Schema migrations are managed with Alembic (alembic/).
Redis 7
Redis serves two distinct purposes:
Session cache β lightweight user session state, TTL-based expiry, sub-millisecond access
Approval store β pending remediation approvals, 5-minute TTL,
HSETfor atomicity
The session cache is a read-heavy, write-occasionally workload. The approval store is write-once, read-once (or TTL-expire). Both fit Redis's simple key-value model well.
I enabled AOF persistence on Redis so approvals survive a Redis restart during a live incident.
Observability Layer
src/monitoring/prometheus.py registers Prometheus counters and histograms that the agent exposes at /metrics. Grafana reads from Prometheus and the dashboards in config/grafana/ are provisioned automatically via Docker Compose volume mounts.
structlog produces JSON-formatted log output. Every log record includes event, level, timestamp, plus context fields like user_id, channel, tool_name, approval_id. This makes filtering in log aggregators trivial.
I'll cover the full observability setup in Article 8.
Stack Decisions
What I'd Do the Same
FastAPI + asyncio: The async model is non-negotiable when you have background tasks (watch-loop), long-running tool calls (MCP), and simultaneous webhook handling. Synchronous frameworks like Flask would require threading or separate processes for the watch-loop.
Redis for approvals: The TTL semantics of Redis are perfect for approval timeouts. If the approval key expires, the approval is gone. No cleanup jobs, no expired-approval handling code.
Pydantic Settings (src/config.py): Every environment variable goes through a Pydantic BaseSettings model. Configuration errors fail fast at startup with clear messages rather than KeyError exceptions buried in application code.
MCP for tool execution: Defining tools as explicit schemas rather than allowing LLM-generated shell commands is a security and reliability decision I'd make the same way every time.
What I'd Do Differently
Alembic migration discipline: In the early iteration, I ran schema changes manually and added Alembic late. Setting up Alembic as the very first thing before writing any models would have saved several migrations that existed just to fix the initial schema.
Event sourcing for ClusterEvent: Currently, cluster events are stored as rows with a status field. An event-sourced log (append-only, events with timestamps) would make replaying the incident timeline cleaner for the RCA engine. This is something I want to evolve toward.
Separate AIOps process: The watch-loop runs as an async task inside the same FastAPI process. For a production-grade system running on a real cluster, I'd move the watch-loop to a separate process (or a separate Kubernetes Deployment) so that API traffic doesn't affect polling latency and vice versa. For my homelab, shared process is fine.
Next: Article 3 β The Watch-Loop: Continuous Cluster Health Polling
Last updated