# Observability & Monitoring Architecture

## Table of Contents

* [Introduction](#introduction)
* [The Debugging Nightmare](#the-debugging-nightmare)
* [Three Pillars of Observability](#three-pillars-of-observability)
* [Structured Logging with Correlation IDs](#structured-logging-with-correlation-ids)
* [Distributed Tracing](#distributed-tracing)
* [Metrics Collection](#metrics-collection)
* [Health Checks](#health-checks)
* [Alerting Strategy](#alerting-strategy)
* [Production Incident Investigation](#production-incident-investigation)
* [Best Practices](#best-practices)

## Introduction

"The Chatbot is slow." That was the bug report from a customer. No error message. No specifics. Just "slow."

In my monolithic POS system, debugging was easy: check the logs, find the slow query, optimize it. But in my distributed system with 6 microservices, "slow" could mean:

* Auth Service taking 2s to validate JWT?
* Inventory Service timing out on MongoDB query?
* Payment Service waiting on external gateway?
* Network latency between services?
* All of the above?

I spent **4 hours** debugging that single issue because I lacked proper **observability**. I had logs, but they were scattered across 6 services with no way to correlate them. I had no visibility into request flow across services.

After that painful experience, I built a comprehensive observability architecture. Now, I can diagnose most issues in **under 5 minutes**.

In this final article of the series, I'll share how I implemented structured logging, distributed tracing, metrics, and health checks—the foundation of a observable distributed system.

## The Debugging Nightmare

Here's what debugging looked like before observability:

```bash
# Chatbot Service logs
2024-01-15 14:23:45 INFO Request received
2024-01-15 14:23:47 INFO Calling POS Core
2024-01-15 14:24:12 ERROR Request timeout

# POS Core logs (different server)
2024-01-15 14:23:48 INFO Query started
2024-01-15 14:24:11 INFO Query completed (23s)

# Payment Service logs (yet another server)
2024-01-15 14:23:50 INFO Payment gateway call
2024-01-15 14:24:08 ERROR Gateway timeout
```

Questions I couldn't answer:

* Which Chatbot request caused the timeout?
* Was it the same request that hit POS Core?
* Did the Payment Service failure cause the timeout?
* Which tenant was affected?

I had logs, but no **context** linking them together.

## Three Pillars of Observability

Modern observability rests on three pillars:

1. **Logs**: Discrete events ("Payment processed", "Query failed")
2. **Metrics**: Aggregated measurements (request rate, error rate, latency)
3. **Traces**: Request flow across services (full lifecycle of a request)

Let me show you how I implemented each.

## Structured Logging with Correlation IDs

### Before: Unstructured Logs

```python
# Bad logging
logger.info("User logged in")
logger.error("Payment failed")
```

Problems:

* Hard to parse programmatically
* No context (which user? which payment?)
* Can't filter by tenant or request

### After: Structured Logging

```python
# infrastructure/logging_config.py
import logging
import json
from datetime import datetime
from contextvars import ContextVar
from typing import Optional

# Context variables for request-scoped data
correlation_id_var = ContextVar("correlation_id", default=None)
tenant_id_var = ContextVar("tenant_id", default=None)
user_id_var = ContextVar("user_id", default=None)


class StructuredFormatter(logging.Formatter):
    """JSON formatter for structured logging."""
    
    def format(self, record: logging.LogRecord) -> str:
        """Format log record as JSON."""
        log_data = {
            "timestamp": datetime.utcnow().isoformat() + "Z",
            "level": record.levelname,
            "logger": record.name,
            "message": record.getMessage(),
            "service": "chatbot",  # Service name
            
            # Context from ContextVars
            "correlation_id": correlation_id_var.get(),
            "tenant_id": tenant_id_var.get(),
            "user_id": user_id_var.get(),
        }
        
        # Add exception info if present
        if record.exc_info:
            log_data["exception"] = {
                "type": record.exc_info[0].__name__,
                "message": str(record.exc_info[1]),
                "traceback": self.formatException(record.exc_info)
            }
        
        # Add extra fields
        if hasattr(record, "extra_fields"):
            log_data.update(record.extra_fields)
        
        return json.dumps(log_data)


def setup_logging(service_name: str):
    """Configure structured logging for a service."""
    handler = logging.StreamHandler()
    handler.setFormatter(StructuredFormatter())
    
    logger = logging.getLogger()
    logger.addHandler(handler)
    logger.setLevel(logging.INFO)
    
    return logger


# Middleware to extract context from requests
async def logging_middleware(request: Request, call_next):
    """Extract correlation ID and tenant ID from request."""
    # Get or generate correlation ID
    correlation_id = request.headers.get("x-correlation-id") or str(uuid4())
    correlation_id_var.set(correlation_id)
    
    # Extract tenant ID
    tenant_id = request.headers.get("x-tenant-id")
    tenant_id_var.set(tenant_id)
    
    # Extract user ID from JWT if present
    user_id = None
    if token := request.headers.get("Authorization"):
        try:
            payload = jwt.decode(token.replace("Bearer ", ""), SECRET_KEY)
            user_id = payload.get("user_id")
            user_id_var.set(user_id)
        except Exception:
            pass
    
    # Process request
    response = await call_next(request)
    
    # Add correlation ID to response
    response.headers["x-correlation-id"] = correlation_id
    
    return response
```

### Usage

```python
# services/chatbot_service.py
import logging

logger = logging.getLogger(__name__)

async def process_query(query: str):
    """Process chatbot query with structured logging."""
    logger.info(
        "Processing query",
        extra={"extra_fields": {
            "query_length": len(query),
            "query_type": "revenue"
        }}
    )
    
    try:
        result = await get_revenue_data()
        
        logger.info(
            "Query completed",
            extra={"extra_fields": {
                "result_count": len(result),
                "duration_ms": 234
            }}
        )
        
        return result
        
    except Exception as e:
        logger.error(
            "Query failed",
            exc_info=True,
            extra={"extra_fields": {
                "query": query
            }}
        )
        raise
```

### Log Output

```json
{
  "timestamp": "2024-01-15T14:23:45.123Z",
  "level": "INFO",
  "logger": "chatbot_service",
  "message": "Processing query",
  "service": "chatbot",
  "correlation_id": "abc-123-def",
  "tenant_id": "acme_corp",
  "user_id": "user_456",
  "query_length": 25,
  "query_type": "revenue"
}
```

Now I can:

* **Filter by tenant**: `grep '"tenant_id":"acme_corp"' logs.json`
* **Track requests**: `grep '"correlation_id":"abc-123"' */logs.json`
* **Parse programmatically**: Load JSON into log aggregation tool

## Distributed Tracing

Distributed tracing shows the **full lifecycle** of a request across all services.

### Tracing Implementation

```python
# infrastructure/tracing.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List, Dict
from uuid import uuid4

@dataclass
class Span:
    """A span represents a unit of work in a trace."""
    span_id: str = field(default_factory=lambda: str(uuid4()))
    trace_id: str = ""
    parent_span_id: Optional[str] = None
    service_name: str = ""
    operation_name: str = ""
    start_time: datetime = field(default_factory=datetime.utcnow)
    end_time: Optional[datetime] = None
    duration_ms: Optional[float] = None
    tags: Dict[str, str] = field(default_factory=dict)
    logs: List[Dict] = field(default_factory=list)
    error: bool = False
    
    def finish(self):
        """Mark span as finished and calculate duration."""
        self.end_time = datetime.utcnow()
        self.duration_ms = (self.end_time - self.start_time).total_seconds() * 1000
    
    def add_tag(self, key: str, value: str):
        """Add metadata to span."""
        self.tags[key] = value
    
    def add_log(self, message: str, **fields):
        """Add a log entry to span."""
        self.logs.append({
            "timestamp": datetime.utcnow().isoformat(),
            "message": message,
            **fields
        })
    
    def mark_error(self, error: Exception):
        """Mark span as errored."""
        self.error = True
        self.add_tag("error.type", type(error).__name__)
        self.add_tag("error.message", str(error))


class Tracer:
    """Simple distributed tracing implementation."""
    
    def __init__(self, service_name: str):
        self.service_name = service_name
        self.spans: List[Span] = []
    
    def start_span(
        self,
        operation_name: str,
        trace_id: Optional[str] = None,
        parent_span_id: Optional[str] = None
    ) -> Span:
        """Start a new span."""
        span = Span(
            trace_id=trace_id or str(uuid4()),
            parent_span_id=parent_span_id,
            service_name=self.service_name,
            operation_name=operation_name
        )
        self.spans.append(span)
        return span
    
    def export_spans(self) -> List[Dict]:
        """Export spans for visualization."""
        return [
            {
                "span_id": span.span_id,
                "trace_id": span.trace_id,
                "parent_span_id": span.parent_span_id,
                "service": span.service_name,
                "operation": span.operation_name,
                "start": span.start_time.isoformat(),
                "duration_ms": span.duration_ms,
                "tags": span.tags,
                "error": span.error
            }
            for span in self.spans
        ]


# Global tracer instance
tracer = Tracer("chatbot")


# Tracing middleware
async def tracing_middleware(request: Request, call_next):
    """Create root span for incoming request."""
    # Extract trace context from headers
    trace_id = request.headers.get("x-trace-id") or str(uuid4())
    parent_span_id = request.headers.get("x-parent-span-id")
    
    # Start root span
    span = tracer.start_span(
        operation_name=f"{request.method} {request.url.path}",
        trace_id=trace_id,
        parent_span_id=parent_span_id
    )
    
    span.add_tag("http.method", request.method)
    span.add_tag("http.url", str(request.url))
    span.add_tag("tenant_id", request.headers.get("x-tenant-id", "unknown"))
    
    try:
        # Process request
        response = await call_next(request)
        
        span.add_tag("http.status_code", str(response.status_code))
        
        if response.status_code >= 400:
            span.mark_error(Exception(f"HTTP {response.status_code}"))
        
        return response
        
    except Exception as e:
        span.mark_error(e)
        raise
    finally:
        span.finish()
        
        # Add trace ID to response
        if isinstance(response, Response):
            response.headers["x-trace-id"] = trace_id
```

### Tracing Service Calls

```python
# services/chatbot_orchestrator.py
async def get_daily_revenue(tenant_id: str, date: date):
    """Get daily revenue with distributed tracing."""
    trace_id = correlation_id_var.get()  # Use correlation ID as trace ID
    
    # Create parent span for orchestration
    parent_span = tracer.start_span("get_daily_revenue", trace_id=trace_id)
    parent_span.add_tag("tenant_id", tenant_id)
    parent_span.add_tag("date", date.isoformat())
    
    try:
        # Call POS Core
        pos_span = tracer.start_span(
            "call_pos_core",
            trace_id=trace_id,
            parent_span_id=parent_span.span_id
        )
        
        try:
            orders = await call_pos_core(
                tenant_id,
                date,
                trace_id=trace_id,
                parent_span_id=pos_span.span_id
            )
            pos_span.add_tag("order_count", str(len(orders)))
        except Exception as e:
            pos_span.mark_error(e)
            raise
        finally:
            pos_span.finish()
        
        # Call Payment Service
        payment_span = tracer.start_span(
            "call_payment_service",
            trace_id=trace_id,
            parent_span_id=parent_span.span_id
        )
        
        try:
            payments = await call_payment_service(
                tenant_id,
                date,
                trace_id=trace_id,
                parent_span_id=payment_span.span_id
            )
            payment_span.add_tag("total_revenue", str(payments["total"]))
        except Exception as e:
            payment_span.mark_error(e)
            raise
        finally:
            payment_span.finish()
        
        # Aggregate
        result = aggregate(orders, payments)
        parent_span.add_tag("result_revenue", str(result["total"]))
        
        return result
        
    finally:
        parent_span.finish()


async def call_pos_core(tenant_id: str, date: date, trace_id: str, parent_span_id: str):
    """Call POS Core with tracing headers."""
    headers = {
        "x-tenant-id": tenant_id,
        "x-trace-id": trace_id,
        "x-parent-span-id": parent_span_id
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "http://localhost:4002/orders",
            params={"date": date.isoformat()},
            headers=headers
        )
        return response.json()
```

### Trace Visualization

A single chatbot query generates this trace:

```
Trace ID: abc-123-def
├─ [Chatbot] GET /query (500ms total)
│  ├─ [Chatbot] get_daily_revenue (480ms)
│  │  ├─ [POS Core] get_orders (200ms) ✓
│  │  ├─ [Payment] get_payments (180ms) ✓
│  │  └─ [Inventory] get_products (95ms) ✓
```

Now I can see:

* Which service is slowest (POS Core: 200ms)
* Which calls are parallel vs sequential
* Where errors occurred

## Metrics Collection

Metrics answer questions like:

* How many requests per second?
* What's the average response time?
* What's the error rate?

### Metrics Implementation

```python
# infrastructure/metrics.py
from collections import defaultdict
from datetime import datetime, timedelta
from typing import Dict, List
import asyncio

class MetricsCollector:
    """Simple in-memory metrics collector."""
    
    def __init__(self):
        self.counters: Dict[str, int] = defaultdict(int)
        self.gauges: Dict[str, float] = {}
        self.histograms: Dict[str, List[float]] = defaultdict(list)
    
    async def increment(self, metric_name: str, value: int = 1, tags: dict = None):
        """Increment a counter."""
        key = self._build_key(metric_name, tags)
        self.counters[key] += value
    
    async def gauge(self, metric_name: str, value: float, tags: dict = None):
        """Set a gauge value."""
        key = self._build_key(metric_name, tags)
        self.gauges[key] = value
    
    async def histogram(self, metric_name: str, value: float, tags: dict = None):
        """Record a histogram value."""
        key = self._build_key(metric_name, tags)
        self.histograms[key].append(value)
        
        # Keep only last 1000 values
        if len(self.histograms[key]) > 1000:
            self.histograms[key] = self.histograms[key][-1000:]
    
    def _build_key(self, metric_name: str, tags: dict = None) -> str:
        """Build metric key with tags."""
        if not tags:
            return metric_name
        
        tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items()))
        return f"{metric_name}[{tag_str}]"
    
    def get_percentile(self, metric_name: str, percentile: float, tags: dict = None) -> float:
        """Calculate percentile for histogram."""
        key = self._build_key(metric_name, tags)
        values = sorted(self.histograms.get(key, []))
        
        if not values:
            return 0.0
        
        index = int(len(values) * percentile / 100)
        return values[min(index, len(values) - 1)]


# Global metrics instance
metrics = MetricsCollector()


# Metrics middleware
async def metrics_middleware(request: Request, call_next):
    """Collect metrics for each request."""
    start_time = datetime.utcnow()
    
    # Increment request counter
    await metrics.increment(
        "http.requests",
        tags={
            "method": request.method,
            "path": request.url.path,
            "tenant": request.headers.get("x-tenant-id", "unknown")
        }
    )
    
    try:
        response = await call_next(request)
        
        # Calculate duration
        duration_ms = (datetime.utcnow() - start_time).total_seconds() * 1000
        
        # Record response time
        await metrics.histogram(
            "http.response_time_ms",
            duration_ms,
            tags={
                "method": request.method,
                "path": request.url.path,
                "status": str(response.status_code)
            }
        )
        
        # Increment status code counter
        await metrics.increment(
            "http.responses",
            tags={
                "status": str(response.status_code),
                "tenant": request.headers.get("x-tenant-id", "unknown")
            }
        )
        
        return response
        
    except Exception as e:
        # Increment error counter
        await metrics.increment(
            "http.errors",
            tags={
                "error_type": type(e).__name__,
                "tenant": request.headers.get("x-tenant-id", "unknown")
            }
        )
        raise
```

### Service-Level Metrics

```python
# Track business metrics
async def process_payment(order_id: str, amount: float):
    """Process payment with metrics."""
    await metrics.increment("payments.attempted")
    
    try:
        result = await payment_gateway.charge(order_id, amount)
        
        await metrics.increment("payments.succeeded")
        await metrics.histogram("payments.amount", amount)
        
        return result
        
    except Exception as e:
        await metrics.increment("payments.failed")
        raise


# Track cache hit rate
async def get_from_cache(key: str):
    """Get from cache and track hit rate."""
    data = await redis.get(key)
    
    if data:
        await metrics.increment("cache.hits")
    else:
        await metrics.increment("cache.misses")
    
    return data
```

### Metrics Dashboard

```python
# api/metrics_endpoint.py
from fastapi import APIRouter

router = APIRouter()

@router.get("/metrics")
async def get_metrics():
    """Expose metrics for monitoring."""
    return {
        "counters": dict(metrics.counters),
        "gauges": dict(metrics.gauges),
        "percentiles": {
            "http.response_time_ms": {
                "p50": metrics.get_percentile("http.response_time_ms", 50),
                "p95": metrics.get_percentile("http.response_time_ms", 95),
                "p99": metrics.get_percentile("http.response_time_ms", 99)
            }
        }
    }
```

## Health Checks

Health checks tell you if a service is healthy and ready to receive traffic.

```python
# api/health.py
from fastapi import APIRouter, status
from enum import Enum

router = APIRouter()


class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"


@router.get("/health")
async def health_check():
    """
    Basic health check (liveness probe).
    Returns 200 if service is running.
    """
    return {"status": "ok"}


@router.get("/health/ready")
async def readiness_check():
    """
    Readiness check (can service handle traffic?).
    Checks dependencies: database, Redis, downstream services.
    """
    checks = {}
    overall_status = HealthStatus.HEALTHY
    
    # Check database
    try:
        await db.execute("SELECT 1")
        checks["database"] = {"status": HealthStatus.HEALTHY}
    except Exception as e:
        checks["database"] = {
            "status": HealthStatus.UNHEALTHY,
            "error": str(e)
        }
        overall_status = HealthStatus.UNHEALTHY
    
    # Check Redis
    try:
        await redis.ping()
        checks["redis"] = {"status": HealthStatus.HEALTHY}
    except Exception as e:
        checks["redis"] = {
            "status": HealthStatus.DEGRADED,
            "error": str(e)
        }
        # Redis failure is degraded, not unhealthy
        if overall_status == HealthStatus.HEALTHY:
            overall_status = HealthStatus.DEGRADED
    
    # Check downstream services
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(
                "http://localhost:4002/health",
                timeout=2.0
            )
            checks["pos_core"] = {
                "status": HealthStatus.HEALTHY if response.status_code == 200 else HealthStatus.UNHEALTHY
            }
    except Exception as e:
        checks["pos_core"] = {
            "status": HealthStatus.DEGRADED,
            "error": str(e)
        }
        if overall_status == HealthStatus.HEALTHY:
            overall_status = HealthStatus.DEGRADED
    
    # Return appropriate status code
    status_code = {
        HealthStatus.HEALTHY: status.HTTP_200_OK,
        HealthStatus.DEGRADED: status.HTTP_200_OK,  # Still accept traffic
        HealthStatus.UNHEALTHY: status.HTTP_503_SERVICE_UNAVAILABLE
    }[overall_status]
    
    return JSONResponse(
        status_code=status_code,
        content={
            "status": overall_status,
            "checks": checks,
            "timestamp": datetime.utcnow().isoformat()
        }
    )
```

## Alerting Strategy

Alerts notify you when things go wrong. But too many alerts = alert fatigue.

### Key Alerts

```python
# monitoring/alerts.py
class AlertRule:
    """Configuration for an alert rule."""
    
    def __init__(
        self,
        name: str,
        condition: str,
        threshold: float,
        window_seconds: int,
        severity: str
    ):
        self.name = name
        self.condition = condition
        self.threshold = threshold
        self.window_seconds = window_seconds
        self.severity = severity  # 'critical', 'warning', 'info'


# Alert rules
ALERT_RULES = [
    # Critical: Service is down
    AlertRule(
        name="ServiceDown",
        condition="health_check_failed",
        threshold=3,  # 3 consecutive failures
        window_seconds=60,
        severity="critical"
    ),
    
    # Critical: Error rate spike
    AlertRule(
        name="HighErrorRate",
        condition="error_rate > threshold",
        threshold=0.05,  # 5% error rate
        window_seconds=300,  # 5 minutes
        severity="critical"
    ),
    
    # Warning: High latency
    AlertRule(
        name="HighLatency",
        condition="p95_latency > threshold",
        threshold=1000,  # 1 second
        window_seconds=300,
        severity="warning"
    ),
    
    # Warning: Cache hit rate low
    AlertRule(
        name="LowCacheHitRate",
        condition="cache_hit_rate < threshold",
        threshold=0.7,  # 70%
        window_seconds=600,
        severity="warning"
    ),
    
    # Critical: Circuit breaker opened
    AlertRule(
        name="CircuitBreakerOpen",
        condition="circuit_breaker_state == OPEN",
        threshold=1,
        window_seconds=0,  # Immediate
        severity="critical"
    )
]
```

## Production Incident Investigation

Let me show you how observability helped debug a real incident.

### Incident: Slow Chatbot (Resolved in 4 minutes)

**Step 1: Check metrics dashboard**

```
http.response_time_ms[path=/query]:
- p50: 450ms (normal: 150ms) ← 3x slower
- p95: 2100ms (normal: 380ms) ← 5x slower

Error rate: 2% (normal: 0.1%)
```

**Step 2: Check logs for recent errors**

```bash
$ grep '"level":"ERROR"' chatbot/logs.json | tail -5

{
  "correlation_id": "xyz-789",
  "message": "Timeout calling POS Core",
  "duration_ms": 5000,
  "tenant_id": "restaurant_abc"
}
```

**Step 3: Follow correlation ID across services**

```bash
$ grep '"correlation_id":"xyz-789"' */logs.json

chatbot: "Calling POS Core" (14:23:45)
pos_core: "Query started" (14:23:46)
pos_core: "Slow query detected: 4.8s" (14:23:51)
chatbot: "Timeout" (14:23:51)
```

**Step 4: Check POS Core traces**

```
Trace xyz-789:
├─ [POS Core] GET /orders (4800ms) ← SLOW
│  └─ [PostgreSQL] SELECT * FROM orders... (4750ms) ← Root cause
```

**Step 5: Check database**

```sql
-- Found: Missing index on orders.created_at
CREATE INDEX idx_orders_created_at ON orders(tenant_id, created_at);
```

**Total time: 4 minutes**

Without observability, this would've taken hours.

## Best Practices

1. **Use correlation IDs** to track requests across services
2. **Log structured JSON** for programmatic parsing
3. **Include context** in every log (tenant\_id, user\_id, correlation\_id)
4. **Trace expensive operations** (database queries, external APIs)
5. **Collect business metrics** not just technical metrics
6. **Monitor error rates and latency** (SLI metrics)
7. **Set up meaningful alerts** (avoid alert fatigue)
8. **Test observability** (can you debug issues in production?)
9. **Aggregate logs centrally** (ELK, Splunk, CloudWatch)
10. **Use visualization tools** (Grafana, Datadog) for metrics and traces

## Conclusion

Observability transformed my debugging experience from hours of frustration to minutes of focused investigation. The key is building observability **into** your architecture from day one, not bolting it on later.

The three pillars work together:

* **Logs** tell you what happened
* **Metrics** tell you how much/how fast
* **Traces** tell you where time was spent

Combined with correlation IDs, structured logging, and distributed tracing, you get complete visibility into your distributed system.

This completes the Software Architecture 101 series. We've covered:

1. Introduction to Software Architecture
2. Modular Monolith Architecture
3. Multi-Tenant Architecture Patterns
4. Service Layer Architecture
5. API Design & Contracts
6. Authentication & Authorization
7. Data Architecture Patterns
8. Event-Driven Architecture
9. Caching & Session Management
10. Integration & Orchestration Patterns
11. Resilience & Fault Tolerance
12. Observability & Monitoring ← You are here

Thank you for following along. I hope these lessons from my production POS system help you build better distributed architectures.

***

*This is part of the Software Architecture 101 series, where I shared lessons learned building a production multi-tenant POS system with 6 microservices: Auth (4001), POS Core (4002), Inventory (4003), Payment (4004), Restaurant (4005), and Chatbot (4006).*
