# Article 5: Playbooks and Human-in-the-Loop Approvals

## Introduction

The rule engine decides that something needs to happen. Playbooks define exactly what "something" is — and whether a human needs to sign off before any action is taken.

When I first thought about automated remediation, the obvious approach was: event fires, playbook runs, problem solved. I rejected that design immediately. The failure mode of "playbook runs at the wrong time" in production is much worse than the failure mode of "playbook requires approval and takes 2 extra minutes". This is why simple-ai-agent has a built-in approval gate: every action with non-trivial consequence must go through a human's `approve` before execution.

This article covers `src/aiops/playbooks.py` and `src/services/approval_manager.py`.

***

## Table of Contents

1. [The Problem with Full Automation](#the-problem-with-full-automation)
2. [Playbook Structure](#playbook-structure)
3. [Risk Classification](#risk-classification)
4. [The Approval Manager](#the-approval-manager)
5. [Chat-Native Approve and Reject](#chat-native-approve-and-reject)
6. [Playbook Executor Implementation](#playbook-executor-implementation)
7. [Real Playbooks from the Project](#real-playbooks-from-the-project)
8. [What I Learned](#what-i-learned)

***

## The Problem with Full Automation

I've seen the following happen in production at workplaces where I've worked:

* An automation script restarts a pod that's in `CrashLoopBackOff` because of a broken config. The script restarts it 40 times before anyone notices.
* A memory-limit increase playbook fires during a traffic spike and raises limits so high that other pods get OOMKilled by the higher-than-available allocations.
* A scale-up playbook fires based on a false alert and spins up 10 replicas of a service that was already overloaded at the API gateway layer — doing nothing except wasting money.

I don't want an agent that acts without context. I want an agent that does the analysis, presents its findings and proposed action, and waits for me to say yes.

That's the design: **LLM does the thinking, human does the approving, playbook does the executing**.

***

## Playbook Structure

Playbooks are defined in `config/playbooks.yml`. Each playbook is a sequence of steps.

```yaml
# config/playbooks.yml
playbooks:

  restart-crashed-pod:
    description: "Delete the crashed pod and let the Deployment recreate it"
    risk: medium
    steps:
      - action: "k8s:delete_pod"
        params:
          grace_period_seconds: 0
      - action: "k8s:wait_for_ready"
        params:
          timeout_seconds: 120
    rollback:
      - action: "notify"
        message: "Restart playbook failed. Manual intervention required."

  increase-memory-limit:
    description: "Patch the container memory limit by +512Mi"
    risk: high
    steps:
      - action: "k8s:get_deployment"
      - action: "k8s:patch_memory_limit"
        params:
          delta_mi: 512
          max_limit_gi: 4
    rollback:
      - action: "k8s:revert_patch"
      - action: "notify"
        message: "Memory limit patch failed. Reverted. Manual check required."

  scale-down-zero-replicas-fix:
    description: "Scale deployment to 1 replica if it's currently at 0"
    risk: medium
    steps:
      - action: "k8s:scale_deployment"
        params:
          replicas: 1
    rollback:
      - action: "notify"
        message: "Scale-up failed. Check deployment spec manually."
```

### Step Actions

Each step `action` maps to a function in the MCP tool layer. The pattern is `layer:function_name`:

| Action                   | Description                                           |
| ------------------------ | ----------------------------------------------------- |
| `k8s:delete_pod`         | Delete a pod by name (triggers Deployment recreation) |
| `k8s:wait_for_ready`     | Poll until pod is Running/Ready or timeout            |
| `k8s:patch_memory_limit` | Patch the Deployment container memory limit           |
| `k8s:get_deployment`     | Fetch current Deployment spec into execution context  |
| `k8s:scale_deployment`   | Set replica count on a Deployment                     |
| `notify`                 | Send message to configured notification channel       |

***

## Risk Classification

Every playbook has a `risk` level. This controls whether the playbook executes immediately or requires approval.

| Risk Level | Execution Behavior                                                                      |
| ---------- | --------------------------------------------------------------------------------------- |
| `low`      | Execute immediately, notify after                                                       |
| `medium`   | Post approval request to chat channel, hold execution, 10-minute timeout                |
| `high`     | Warn only — provide runbook steps, do not execute automatically under any circumstances |

The `high` risk level never executes. The playbook executor posts what it *would* do if executed — the steps, the parameters, the expected outcome — so the operator can do it manually with full context. This is intentional. High-risk operations on production infrastructure should never be delegated to an automated system.

```python
# src/aiops/playbooks.py
from enum import Enum

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"

async def execute_with_risk_gate(
    playbook: Playbook,
    context: ExecutionContext,
    approval_manager: ApprovalManager,
    notifier: Notifier,
) -> PlaybookResult:
    match playbook.risk:
        case RiskLevel.LOW:
            return await run_steps(playbook, context)
        
        case RiskLevel.MEDIUM:
            approval_id = await approval_manager.request_approval(
                playbook=playbook,
                context=context,
                timeout_seconds=600,   # 10 minutes
            )
            await notifier.send_approval_request(approval_id, playbook, context)
            
            # Block and wait for approval
            approved = await approval_manager.wait_for_decision(approval_id)
            if approved:
                return await run_steps(playbook, context)
            else:
                return PlaybookResult.rejected(approval_id)
        
        case RiskLevel.HIGH:
            await notifier.send_high_risk_warning(playbook, context)
            return PlaybookResult.manual_required(playbook.name)
```

***

## The Approval Manager

The approval manager stores pending approval requests in Redis with a TTL. When approval times out, the request expires automatically — no cleanup needed.

```python
# src/services/approval_manager.py
import json
import asyncio
import uuid
from dataclasses import dataclass
from datetime import datetime
from redis.asyncio import Redis

@dataclass
class ApprovalRequest:
    id: str
    playbook_name: str
    namespace: str
    resource_name: str
    event_type: str
    created_at: str
    timeout_seconds: int

class ApprovalManager:
    PREFIX = "approval:"
    
    def __init__(self, redis: Redis):
        self.redis = redis
        # Maps approval_id -> asyncio.Event for waiting
        self._pending: dict[str, asyncio.Event] = {}
        self._decisions: dict[str, bool] = {}
    
    async def request_approval(
        self,
        playbook: Playbook,
        context: ExecutionContext,
        timeout_seconds: int,
    ) -> str:
        approval_id = str(uuid.uuid4())[:8]  # Short ID for easy typing in chat
        
        request = ApprovalRequest(
            id=approval_id,
            playbook_name=playbook.name,
            namespace=context.event.namespace,
            resource_name=context.event.resource_name,
            event_type=context.event.event_type,
            created_at=datetime.utcnow().isoformat(),
            timeout_seconds=timeout_seconds,
        )
        
        key = f"{self.PREFIX}{approval_id}"
        await self.redis.setex(
            key,
            timeout_seconds,
            json.dumps(request.__dict__),
        )
        
        self._pending[approval_id] = asyncio.Event()
        return approval_id
    
    async def wait_for_decision(self, approval_id: str) -> bool:
        event = self._pending.get(approval_id)
        if not event:
            return False
        
        try:
            await asyncio.wait_for(event.wait(), timeout=600.0)
            return self._decisions.get(approval_id, False)
        except asyncio.TimeoutError:
            await self._cleanup(approval_id)
            return False  # Timeout = implicit rejection
    
    async def resolve(self, approval_id: str, approved: bool) -> bool:
        """Called when user types 'approve <id>' or 'reject <id>'."""
        key = f"{self.PREFIX}{approval_id}"
        exists = await self.redis.exists(key)
        if not exists:
            return False   # Not found or already expired
        
        event = self._pending.get(approval_id)
        if not event:
            return False
        
        self._decisions[approval_id] = approved
        await self.redis.delete(key)
        event.set()  # Unblocks wait_for_decision
        return True
    
    async def _cleanup(self, approval_id: str) -> None:
        self._pending.pop(approval_id, None)
        self._decisions.pop(approval_id, None)
```

### Why 8-Character IDs

When an approval request is posted to Telegram, the user needs to type something like:

```
approve a3f7b2c1
```

Short IDs matter. A full UUID is impractical to type on a phone. Eight characters from `uuid4()` give enough uniqueness for the small number of concurrent pending approvals a homelab cluster generates.

***

## Chat-Native Approve and Reject

The approval workflow integrates with the chat adapter at the intent detection layer. When the bot receives a message, the intent detector checks for approval commands before any other processing:

```python
# src/conversation/intent.py
import re

APPROVE_PATTERN = re.compile(r'^approve\s+([a-f0-9]{6,8})$', re.IGNORECASE)
REJECT_PATTERN  = re.compile(r'^reject\s+([a-f0-9]{6,8})$',  re.IGNORECASE)

async def handle_approval_commands(
    text: str,
    sender_id: str,
    approval_manager: ApprovalManager,
    notifier: Notifier,
) -> bool:
    """Returns True if the message was an approval command (consumed)."""
    
    if m := APPROVE_PATTERN.match(text.strip()):
        approval_id = m.group(1)
        ok = await approval_manager.resolve(approval_id, approved=True)
        if ok:
            await notifier.send_text(f"Approved {approval_id}. Executing playbook.")
        else:
            await notifier.send_text(f"No pending approval found for ID {approval_id}.")
        return True
    
    if m := REJECT_PATTERN.match(text.strip()):
        approval_id = m.group(1)
        ok = await approval_manager.resolve(approval_id, approved=False)
        if ok:
            await notifier.send_text(f"Rejected {approval_id}. Playbook cancelled.")
        else:
            await notifier.send_text(f"No pending approval found for ID {approval_id}.")
        return True
    
    return False
```

This runs before any LLM call. If the message is `approve a3f7b2c1`, no LLM is invoked — the approval resolves directly. This keeps approval latency low and avoids confusing the LLM with approval syntax.

***

## Playbook Executor Implementation

The executor runs steps sequentially. If any step fails, it runs the rollback steps and stops.

```python
# src/aiops/playbooks.py
from dataclasses import dataclass, field
from typing import Any

@dataclass
class ExecutionContext:
    event: ClusterEvent
    state: dict[str, Any] = field(default_factory=dict)  # Shared between steps

@dataclass 
class StepResult:
    action: str
    success: bool
    output: Any = None
    error: str | None = None

async def run_steps(playbook: Playbook, context: ExecutionContext) -> PlaybookResult:
    results: list[StepResult] = []
    
    for step in playbook.steps:
        log.info("playbook.step.start", 
                 playbook=playbook.name, 
                 action=step.action,
                 resource=context.event.resource_name)
        
        try:
            output = await dispatch_step(step, context)
            results.append(StepResult(action=step.action, success=True, output=output))
            
        except Exception as exc:
            log.error("playbook.step.failed",
                      playbook=playbook.name,
                      action=step.action,
                      error=str(exc))
            results.append(StepResult(action=step.action, success=False, error=str(exc)))
            
            # Attempt rollback
            await run_rollback(playbook.rollback, context)
            return PlaybookResult(
                playbook_name=playbook.name,
                success=False,
                step_results=results,
                failure_reason=str(exc),
            )
    
    return PlaybookResult(
        playbook_name=playbook.name,
        success=True,
        step_results=results,
    )
```

### Step Dispatch

Step actions are dispatched by prefix:

```python
async def dispatch_step(step: Step, context: ExecutionContext) -> Any:
    layer, fn_name = step.action.split(":", 1)
    
    match layer:
        case "k8s":
            return await dispatch_kubernetes_action(fn_name, step.params, context)
        case "notify":
            return await dispatch_notify_action(step.message, context)
        case _:
            raise ValueError(f"Unknown action layer: {layer}")
```

Kubernetes actions are dispatched through the MCP client layer rather than calling `kubernetes-asyncio` directly. This keeps the playbook executor decoupled from Kubernetes API details and reuses the same tool surface exposed to the LLM.

***

## Real Playbooks from the Project

The playbooks I actually run in `simple-ai-agent`:

```yaml
playbooks:

  restart-crashed-pod:
    description: "Delete a pod in CrashLoopBackOff and let the Deployment recreate it"
    risk: medium
    steps:
      - action: "k8s:delete_pod"
        params:
          grace_period_seconds: 30
      - action: "k8s:wait_for_ready"
        params:
          timeout_seconds: 120
          expected_count: 1
    rollback:
      - action: "notify"
        message: "Pod restart failed or did not become Ready within 120s. Check manually."

  scale-to-minimum:
    description: "Scale a zero-replica Deployment back to its minimum (1 replica)"
    risk: medium
    steps:
      - action: "k8s:scale_deployment"
        params:
          replicas: 1
    rollback:
      - action: "notify"
        message: "Failed to scale deployment. Check spec and namespace."
```

I only have two enabled playbooks. This is intentional. More playbooks means more surface area for mistakes. I add a new playbook only when I've seen the manual remediation steps more than three times.

***

## What I Learned

**Approval timeout should be long enough to actually respond.** I started with 5 minutes. I missed several approvals because I was away from my phone. 10 minutes is better for a personal homelab. In a production team context you might want 15–30 minutes with escalation.

**The approval message must include everything needed to make the decision.** My earliest approval messages looked like: `Approval required: restart-crashed-pod (ID: a3f7)`. That's useless. The operator needs: which pod, which namespace, what event triggered this, what the proposed action is, and what the rollback is. I reformatted to:

```
⚠️ Approval Required [ID: a3f7]
─────────────────────────────
Playbook:   restart-crashed-pod
Namespace:  production
Resource:   api-server-abc123
Trigger:    CrashLoopBackOff (restarts: 7)
Action:     Delete pod (grace: 30s), wait for Ready
Rollback:   Notify on failure
─────────────────────────────
Reply: approve a3f7 or reject a3f7
Expires in: 10 minutes
```

**Reject must also be meaningful.** When I reject a playbook, the agent logs the rejection with a timestamp and includes it in the next RCA context if the same event fires again. "This event was manually rejected 20 minutes ago" is useful signal.

**Don't gate `LOW` risk actions with approval.** Early on I gated everything. This turned the agent into an answering machine — it asked permission to do things that had no meaningful failure mode. `notify` actions and read-only operations should be `LOW` risk and execute without approval.

***

**Next**: [Article 6 — LLM-Powered Root Cause Analysis](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/aiops-101/aiops-101-rca-engine)
