# Article 3: The Watch-Loop — Background Cluster Polling

## Introduction

The watch-loop is the heartbeat of the AIOps engine. Everything else — the rule engine, playbooks, RCA — is reactive: it processes events fed to it. The watch-loop is the source of those events. It runs forever, independently of user interaction, polling the Kubernetes API at a configurable interval and converting cluster state into `ClusterEvent` objects that the rest of the system can act on.

This article covers the implementation in `src/monitoring/watchloop.py` and `src/k8s/client.py` from [simple-ai-agent](https://github.com/Htunn/simple-ai-agent).

## Table of Contents

1. [Why a Poll-Based Watch-Loop?](#why-a-poll-based-watch-loop)
2. [ClusterEvent: The Common Data Model](#clusterevent-the-common-data-model)
3. [What We Watch For](#what-we-watch-for)
4. [The Watch-Loop Implementation](#the-watch-loop-implementation)
5. [Kubernetes API Client](#kubernetes-api-client)
6. [Error Containment and Graceful Degradation](#error-containment-and-graceful-degradation)
7. [Configuration](#configuration)
8. [What I Learned from Running This](#what-i-learned)

***

## Why a Poll-Based Watch-Loop?

Kubernetes has a watch API (`kubectl get pods --watch`) that streams events as they happen. This is more efficient than polling — instead of asking "what's the state now?" every 30 seconds, you receive changes immediately.

I chose polling for a few reasons specific to my use case:

1. **Homelab reliability.** My cluster runs on hardware that occasionally has network hiccups. A long-running watch stream that drops silently leaves me with no events and no error. A polling loop with error logging fails visibly.
2. **Simplicity.** The watch stream requires reconnection logic, bookmark handling, and careful error handling to avoid missing events. A polling loop is a `while True:` with a `sleep`. In a personal project, I'll take the simpler implementation.
3. **Idempotency.** Each poll observes the current state. If I missed a `CrashLoopBackOff` event because the watch connection dropped, the pod is still in `CrashLoopBackOff` on the next poll and I'll catch it then. With a push-based watch, missed events are missed permanently.

For a large production cluster with thousands of pods, the polling overhead would be meaningful. For a homelab with tens of pods, a 30-second poll against the local Kubernetes API costs essentially nothing.

***

## ClusterEvent: The Common Data Model

Before writing any detection logic, I defined the data model that everything downstream consumes. This is the contract between the watch-loop and the rule engine.

```python
# src/aiops/rule_engine.py (simplified)
from dataclasses import dataclass, field
from datetime import datetime
from enum import Enum
from typing import Any

class EventType(str, Enum):
    CRASH_LOOP = "crash_loop"
    OOM_KILLED = "oom_killed"
    NOT_READY_NODE = "not_ready_node"
    REPLICATION_FAILURE = "replication_failure"
    ALERTMANAGER_ALERT = "alertmanager_alert"

class Severity(str, Enum):
    CRITICAL = "critical"
    HIGH = "high"
    MEDIUM = "medium"
    LOW = "low"
    INFO = "info"

@dataclass
class ClusterEvent:
    event_type: EventType
    severity: Severity
    resource_name: str
    namespace: str
    message: str
    timestamp: datetime = field(default_factory=datetime.utcnow)
    metadata: dict[str, Any] = field(default_factory=dict)
    raw_data: dict[str, Any] = field(default_factory=dict)
```

The `metadata` and `raw_data` fields carry enough context for the RCA engine to work with. A `CRASH_LOOP` event includes the restart count, the last termination reason, and recent log lines. An `OOM_KILLED` event includes the memory limit, peak usage, and restart history.

Every `ClusterEvent` is persisted to PostgreSQL so the RCA engine can query event history when building its analysis.

***

## What We Watch For

The current watch-loop detects four event classes from the Kubernetes API:

### 1. CrashLoopBackOff

```python
# A pod is in CrashLoopBackOff if:
# - container state is "waiting" with reason "CrashLoopBackOff"
# - OR restart count > threshold (configurable, default 5)
```

This is the most common operational event in my clusters. It usually means the application crashed on startup — misconfigured environment variable, failed database migration, missing dependency. The watch-loop detects it within one polling interval (max 30 seconds).

### 2. OOMKilled

```python
# A container was OOMKilled if:
# - last termination reason is "OOMKilled"
# - the container has restarted at least once since last seen
```

OOMKilled is different from CrashLoopBackOff — the kernel killed the container because it exceeded its memory limit. The fix is different (increase the limit or fix the memory leak) and the RCA approach is different (look at memory usage trends, not application crash logs).

### 3. NotReady Nodes

```python
# A node is NotReady if:
# - node condition type "Ready" has status "False" or "Unknown"
# - the condition has persisted for > grace_period (default 120s)
```

Node issues are more severe than pod issues — they affect all workloads running on the node. The grace period prevents false positives from brief maintenance operations or node restarts.

### 4. Zero-Replica Deployments

```python
# A deployment has a replication failure if:
# - desired replicas > 0 (not intentionally scaled to zero)
# - available replicas == 0
# - the condition has persisted for > grace_period (default 60s)
```

This catches deployments where all pods are either pending or terminating — often a resource pressure situation or a node affinity/taint issue.

***

## The Watch-Loop Implementation

The watch-loop is an async function that runs as a background task from `src/main.py`. Here's the structure:

```python
# src/monitoring/watchloop.py (simplified)
import asyncio
import structlog
from datetime import datetime

log = structlog.get_logger()

async def run_watchloop(
    k8s_client: KubernetesClient,
    rule_engine: RuleEngine,
    settings: Settings,
) -> None:
    """Main watch-loop: poll → detect → emit events."""
    log.info("watchloop.started", interval=settings.K8S_WATCHLOOP_INTERVAL)
    
    while True:
        start = datetime.utcnow()
        
        try:
            events = await poll_cluster(k8s_client, settings)
            
            for event in events:
                await rule_engine.process(event)
            
            WATCHLOOP_RUNS.inc()
            WATCHLOOP_EVENTS.observe(len(events))
            
        except asyncio.CancelledError:
            # Graceful shutdown — propagate the cancellation
            log.info("watchloop.cancelled")
            raise
        
        except Exception as exc:
            # Never let an unhandled exception kill the watch-loop
            WATCHLOOP_ERRORS.inc()
            log.error("watchloop.error", error=str(exc), exc_info=True)
        
        elapsed = (datetime.utcnow() - start).total_seconds()
        sleep_time = max(0, settings.K8S_WATCHLOOP_INTERVAL - elapsed)
        await asyncio.sleep(sleep_time)


async def poll_cluster(
    client: KubernetesClient,
    settings: Settings,
) -> list[ClusterEvent]:
    """Single poll pass: check pods and nodes, return detected events."""
    events: list[ClusterEvent] = []
    
    # Fetch in parallel — don't wait for pods before checking nodes
    pods_task = asyncio.create_task(client.list_pods_all_namespaces())
    nodes_task = asyncio.create_task(client.list_nodes())
    deployments_task = asyncio.create_task(client.list_deployments_all_namespaces())
    
    pods, nodes, deployments = await asyncio.gather(
        pods_task, nodes_task, deployments_task
    )
    
    events.extend(detect_crash_loops(pods))
    events.extend(detect_oom_kills(pods))
    events.extend(detect_not_ready_nodes(nodes))
    events.extend(detect_replication_failures(deployments))
    
    return events
```

### Detection Functions

Each detection function is a pure function: given a list of Kubernetes API objects, return a list of `ClusterEvent` objects. They're independently testable.

```python
def detect_crash_loops(pods: list[V1Pod]) -> list[ClusterEvent]:
    events = []
    for pod in pods:
        if not pod.status or not pod.status.container_statuses:
            continue
        for container in pod.status.container_statuses:
            if not container.state or not container.state.waiting:
                continue
            if container.state.waiting.reason == "CrashLoopBackOff":
                events.append(ClusterEvent(
                    event_type=EventType.CRASH_LOOP,
                    severity=Severity.CRITICAL,
                    resource_name=pod.metadata.name,
                    namespace=pod.metadata.namespace,
                    message=(
                        f"Container '{container.name}' is in CrashLoopBackOff "
                        f"({container.restart_count} restarts)"
                    ),
                    metadata={
                        "container": container.name,
                        "restart_count": container.restart_count,
                        "node": pod.spec.node_name,
                    },
                    raw_data=pod.to_dict(),
                ))
    return events
```

### Deduplication

Naively, the same `CrashLoopBackOff` pod would fire a `ClusterEvent` every 30 seconds indefinitely. The rule engine handles deduplication with a cooldown window — but the watch-loop also tracks which events it has already seen using a simple in-memory `set` of `(event_type, namespace, resource_name)` tuples, reset periodically.

```python
# Simplified dedup logic in watch-loop state
_seen_events: set[tuple] = set()
_seen_events_reset_at: datetime = datetime.utcnow()
DEDUP_WINDOW_SECONDS = 300  # Reset seen events every 5 minutes

def _is_new_event(event: ClusterEvent) -> bool:
    global _seen_events, _seen_events_reset_at
    
    now = datetime.utcnow()
    if (now - _seen_events_reset_at).total_seconds() > DEDUP_WINDOW_SECONDS:
        _seen_events = set()
        _seen_events_reset_at = now
    
    key = (event.event_type, event.namespace, event.resource_name)
    if key in _seen_events:
        return False
    _seen_events.add(key)
    return True
```

***

## Kubernetes API Client

`src/k8s/client.py` wraps the `kubernetes-asyncio` library:

```python
# src/k8s/client.py (simplified)
from kubernetes_asyncio import client, config
from kubernetes_asyncio.client import ApiClient

class KubernetesClient:
    def __init__(self):
        self._api: ApiClient | None = None
    
    async def initialize(self) -> None:
        try:
            # Try in-cluster config first (running inside Kubernetes)
            await config.load_incluster_config()
            log.info("k8s.config", source="incluster")
        except config.ConfigException:
            # Fall back to kubeconfig file (running locally or in Docker)
            await config.load_kube_config()
            log.info("k8s.config", source="kubeconfig")
        
        self._api = ApiClient()
    
    async def list_pods_all_namespaces(self) -> list[V1Pod]:
        v1 = client.CoreV1Api(self._api)
        result = await v1.list_pod_for_all_namespaces()
        return result.items
    
    async def list_nodes(self) -> list[V1Node]:
        v1 = client.CoreV1Api(self._api)
        result = await v1.list_node()
        return result.items
    
    async def list_deployments_all_namespaces(self) -> list[V1Deployment]:
        apps_v1 = client.AppsV1Api(self._api)
        result = await apps_v1.list_deployment_for_all_namespaces()
        return result.items
    
    async def get_pod_logs(self, name: str, namespace: str, tail_lines: int = 100) -> str:
        v1 = client.CoreV1Api(self._api)
        return await v1.read_namespaced_pod_log(
            name=name,
            namespace=namespace,
            tail_lines=tail_lines,
        )
```

The client tries in-cluster config first (for when the agent runs inside Kubernetes) and falls back to kubeconfig (for development or Docker Compose). In my homelab, the agent runs in Docker Compose with the kubeconfig mounted at `./data/kube/config`.

***

## Error Containment and Graceful Degradation

The watch-loop has three error scenarios to handle correctly:

### 1. Kubernetes API Unavailable

If the Kubernetes API is down (cluster restart, network issue), the poll will fail. The watch-loop logs the error with full context, increments the error counter, and sleeps until the next interval. It does not crash. It does not stop.

```python
try:
    events = await poll_cluster(k8s_client, settings)
except Exception as exc:
    WATCHLOOP_ERRORS.inc()
    log.error("watchloop.poll_failed", error=str(exc), exc_info=True)
    # Sleep the full interval, don't retry immediately
    await asyncio.sleep(settings.K8S_WATCHLOOP_INTERVAL)
    continue
```

### 2. Rule Engine Failure

If the rule engine fails to process an event (bug in rule matching, unexpected event shape), the watch-loop catches the exception per-event and continues processing the remaining events.

### 3. Application Shutdown

When FastAPI shuts down, the lifespan context manager cancels the watch-loop task. The `asyncio.CancelledError` is re-raised (not swallowed), allowing the task to exit cleanly. The watch-loop logs `watchloop.cancelled` on shutdown.

***

## Configuration

```bash
# .env
K8S_WATCHLOOP_ENABLED=true           # Set false to disable entirely
K8S_WATCHLOOP_INTERVAL=30            # Poll interval in seconds
AUTO_REMEDIATION_ENABLED=false       # true = skip approvals for LOW-risk only
AIOPS_NOTIFICATION_CHANNEL=telegram:YOUR_CHAT_ID
```

The notification channel format is `<channel_type>:<id>` — e.g., `telegram:123456789` or `slack:C123456789`. The watch-loop uses this to route AIOps notifications to the right chat destination.

Setting `AUTO_REMEDIATION_ENABLED=true` only skips approvals for `LOW` risk playbook steps. `MEDIUM` and `HIGH` risk steps always require explicit approval regardless.

***

## What I Learned from Running This

**Poll interval matters less than you think for most events.** A 30-second detection delay is usually fine. By the time Kubernetes marks a pod as `CrashLoopBackOff`, it has already gone through its restart backoff. Another 30 seconds doesn't meaningfully change the incident timeline.

**The dedup window needs to survive RCA time.** The RCA engine takes 5–15 seconds to run (LLM call). If the dedup window resets while RCA is running, the same event can fire twice before the first remediation completes. I learned this the hard way by getting duplicate approval requests for the same pod. The 5-minute dedup window is the fix.

**Kubernetes API calls fail more than you'd expect.** Even in a healthy cluster, the API server occasionally returns 429s or 503s under load. `kubernetes-asyncio` has retry logic, but the watch-loop needs its own outer retry/continue behavior. A single failed poll should not disrupt the loop — just log it and try again.

**Don't watch every namespace by default.** In a cluster with many namespaces including system namespaces, watching everything generates noise. Consider filtering to application namespaces:

```python
EXCLUDED_NAMESPACES = {"kube-system", "kube-public", "kube-node-lease", "monitoring"}

def detect_crash_loops(pods: list[V1Pod]) -> list[ClusterEvent]:
    events = []
    for pod in pods:
        if pod.metadata.namespace in EXCLUDED_NAMESPACES:
            continue
        # ... rest of detection
```

***

**Next**: [Article 4 — The Rule Engine: Turning Events into Actionable Alerts](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/aiops-101/aiops-101-rule-engine)
