# Integration Patterns & Orchestration

## Table of Contents

* [Introduction](#introduction)
* [The Chatbot Integration Challenge](#the-chatbot-integration-challenge)
* [Orchestration vs Choreography](#orchestration-vs-choreography)
* [Implementing the Orchestrator Pattern](#implementing-the-orchestrator-pattern)
* [Parallel Service Calls with asyncio](#parallel-service-calls-with-asyncio)
* [Correlation IDs for Request Tracing](#correlation-ids-for-request-tracing)
* [Error Handling in Orchestration](#error-handling-in-orchestration)
* [Production Lessons Learned](#production-lessons-learned)
* [Best Practices](#best-practices)

## Introduction

The Chatbot Service (port 4006) in my POS system has one job: answer user questions by aggregating data from 5 other services. Sounds simple, right? It wasn't.

A question like "What's my revenue today?" requires:

* Auth Service: Verify user and get permissions
* POS Core: Get all orders for the date
* Payment: Get payment totals and methods
* Inventory: Get product details and costs
* Restaurant: Get table assignments and covers

When I first implemented this, I made sequential HTTP calls, waiting for each response before making the next request. A single chatbot query took **2.3 seconds**. Users thought the system was broken.

The solution was **orchestration**: one service (the orchestrator) coordinates multiple services, making parallel calls when possible, and aggregating results. This pattern reduced response times from 2.3s to 380ms—a 6x improvement.

In this article, I'll share how I built the Chatbot Orchestrator, including parallel execution with `asyncio`, correlation IDs for distributed tracing, and the hard lessons I learned about orchestration vs choreography.

## The Chatbot Integration Challenge

The Chatbot Service needs to answer business questions that span multiple domains:

**Revenue Questions**:

* "What's my revenue today?" → POS Core + Payment
* "What's my best-selling product?" → POS Core + Inventory + Payment
* "What payment methods are customers using?" → Payment + POS Core

**Inventory Questions**:

* "What products are low in stock?" → Inventory
* "What's my inventory value?" → Inventory + Product costs

**Operational Questions**:

* "How many tables are occupied?" → Restaurant
* "What orders are pending?" → POS Core + Restaurant

Each question requires data from 2-5 services. The challenge: how do we coordinate these calls efficiently, reliably, and with proper error handling?

## Orchestration vs Choreography

There are two patterns for integrating distributed services:

### Orchestration (Centralized)

One service (orchestrator) explicitly controls the workflow:

```
┌──────────────┐
│   Chatbot    │ ← Orchestrator (knows the workflow)
│ Orchestrator │
└───────┬──────┘
        │
        ├─────→ Auth Service
        ├─────→ POS Core
        ├─────→ Payment
        ├─────→ Inventory
        └─────→ Restaurant
```

**Pros**:

* Clear workflow visibility (all logic in one place)
* Easy to debug (centralized logs)
* Simple error handling and compensation

**Cons**:

* Single point of failure
* Can become a bottleneck
* Tight coupling to downstream services

### Choreography (Event-Driven)

Services react to events without central coordination:

```
Order Created Event
    │
    ├─────→ Inventory (reduce stock)
    ├─────→ Payment (charge customer)
    └─────→ Restaurant (notify kitchen)
```

**Pros**:

* Decoupled services
* No single point of failure
* Scales independently

**Cons**:

* Workflow is implicit (harder to understand)
* Difficult to debug (distributed logs)
* Complex error handling (compensating transactions)

**My decision**: Use **orchestration** for the Chatbot because:

1. Chatbot queries need synchronous responses (users wait for answers)
2. Workflow is complex (5 services, conditional logic)
3. I need centralized error handling and retry logic

I use **choreography** for background workflows like order processing (covered in the Event-Driven Architecture article).

## Implementing the Orchestrator Pattern

Here's the Chatbot Orchestrator that coordinates calls to 5 services:

```python
# services/chatbot_orchestrator.py
from typing import Optional, Dict, Any
from datetime import date, datetime
import httpx
import asyncio
import logging
from dataclasses import dataclass

logger = logging.getLogger(__name__)


@dataclass
class ServiceEndpoint:
    """Configuration for a service endpoint."""
    name: str
    base_url: str
    timeout: float = 5.0


class ChatbotOrchestrator:
    """
    Orchestrates calls to multiple services to answer chatbot queries.
    Implements parallel execution, error handling, and correlation tracking.
    """
    
    def __init__(self):
        # Service registry
        self.services = {
            "auth": ServiceEndpoint("Auth", "http://localhost:4001", timeout=2.0),
            "pos": ServiceEndpoint("POS Core", "http://localhost:4002", timeout=5.0),
            "inventory": ServiceEndpoint("Inventory", "http://localhost:4003", timeout=3.0),
            "payment": ServiceEndpoint("Payment", "http://localhost:4004", timeout=5.0),
            "restaurant": ServiceEndpoint("Restaurant", "http://localhost:4005", timeout=3.0),
        }
    
    async def get_daily_revenue(
        self,
        tenant_id: str,
        query_date: date,
        correlation_id: str
    ) -> Dict[str, Any]:
        """
        Answer: "What's my revenue today?"
        Requires: POS Core (orders) + Payment (totals)
        """
        logger.info(
            f"[{correlation_id}] Getting daily revenue for {tenant_id} on {query_date}"
        )
        
        headers = {
            "x-tenant-id": tenant_id,
            "x-correlation-id": correlation_id
        }
        
        async with httpx.AsyncClient() as client:
            # Execute calls in parallel
            orders_task = client.get(
                f"{self.services['pos'].base_url}/orders",
                params={"date": query_date.isoformat()},
                headers=headers,
                timeout=self.services['pos'].timeout
            )
            
            payments_task = client.get(
                f"{self.services['payment'].base_url}/payments/daily-total",
                params={"date": query_date.isoformat()},
                headers=headers,
                timeout=self.services['payment'].timeout
            )
            
            # Wait for both to complete
            orders_resp, payments_resp = await asyncio.gather(
                orders_task,
                payments_task,
                return_exceptions=True
            )
            
            # Handle errors
            if isinstance(orders_resp, Exception):
                logger.error(f"[{correlation_id}] POS Core error: {orders_resp}")
                return {"error": "Failed to get orders", "revenue": 0}
            
            if isinstance(payments_resp, Exception):
                logger.error(f"[{correlation_id}] Payment error: {payments_resp}")
                return {"error": "Failed to get payment data", "revenue": 0}
            
            # Parse responses
            orders = orders_resp.json()
            payments = payments_resp.json()
            
            # Aggregate data
            result = {
                "date": query_date.isoformat(),
                "total_revenue": payments["total"],
                "order_count": len(orders),
                "payment_methods": payments["by_method"],
                "average_order_value": payments["total"] / len(orders) if orders else 0,
                "correlation_id": correlation_id
            }
            
            logger.info(
                f"[{correlation_id}] Revenue query completed: "
                f"${result['total_revenue']:.2f} from {result['order_count']} orders"
            )
            
            return result
    
    async def get_top_products(
        self,
        tenant_id: str,
        query_date: date,
        correlation_id: str,
        limit: int = 10
    ) -> Dict[str, Any]:
        """
        Answer: "What are my top-selling products today?"
        Requires: POS Core + Inventory + Payment
        """
        logger.info(
            f"[{correlation_id}] Getting top products for {tenant_id} on {query_date}"
        )
        
        headers = {
            "x-tenant-id": tenant_id,
            "x-correlation-id": correlation_id
        }
        
        async with httpx.AsyncClient() as client:
            # Step 1: Get orders (includes product IDs and quantities)
            orders_resp = await client.get(
                f"{self.services['pos'].base_url}/orders",
                params={"date": query_date.isoformat()},
                headers=headers,
                timeout=self.services['pos'].timeout
            )
            
            if orders_resp.status_code != 200:
                return {"error": "Failed to get orders"}
            
            orders = orders_resp.json()
            
            # Extract product IDs
            product_ids = self._extract_product_ids(orders)
            
            if not product_ids:
                return {"products": [], "message": "No sales today"}
            
            # Step 2: Get product details and payment info in parallel
            products_task = client.get(
                f"{self.services['inventory'].base_url}/products",
                params={"ids": ",".join(product_ids)},
                headers=headers,
                timeout=self.services['inventory'].timeout
            )
            
            payments_task = client.get(
                f"{self.services['payment'].base_url}/payments",
                params={"date": query_date.isoformat()},
                headers=headers,
                timeout=self.services['payment'].timeout
            )
            
            products_resp, payments_resp = await asyncio.gather(
                products_task,
                payments_task,
                return_exceptions=True
            )
            
            # Parse responses
            products = products_resp.json() if not isinstance(products_resp, Exception) else {}
            payments = payments_resp.json() if not isinstance(payments_resp, Exception) else []
            
            # Aggregate: Calculate revenue per product
            product_stats = self._calculate_product_stats(orders, products, payments)
            
            # Sort by revenue and take top N
            top_products = sorted(
                product_stats,
                key=lambda x: x["revenue"],
                reverse=True
            )[:limit]
            
            return {
                "products": top_products,
                "total_products": len(product_stats),
                "date": query_date.isoformat(),
                "correlation_id": correlation_id
            }
    
    async def get_restaurant_status(
        self,
        tenant_id: str,
        correlation_id: str
    ) -> Dict[str, Any]:
        """
        Answer: "How busy is my restaurant?"
        Requires: Restaurant + POS Core
        """
        logger.info(f"[{correlation_id}] Getting restaurant status for {tenant_id}")
        
        headers = {
            "x-tenant-id": tenant_id,
            "x-correlation-id": correlation_id
        }
        
        async with httpx.AsyncClient() as client:
            # Parallel calls
            tasks = {
                "tables": client.get(
                    f"{self.services['restaurant'].base_url}/tables/status",
                    headers=headers,
                    timeout=self.services['restaurant'].timeout
                ),
                "active_orders": client.get(
                    f"{self.services['pos'].base_url}/orders/active",
                    headers=headers,
                    timeout=self.services['pos'].timeout
                )
            }
            
            # Execute in parallel
            results = await asyncio.gather(
                *tasks.values(),
                return_exceptions=True
            )
            
            # Map results back to keys
            responses = dict(zip(tasks.keys(), results))
            
            # Parse
            tables = responses["tables"].json() if not isinstance(responses["tables"], Exception) else []
            active_orders = responses["active_orders"].json() if not isinstance(responses["active_orders"], Exception) else []
            
            # Calculate stats
            total_tables = len(tables)
            occupied_tables = sum(1 for t in tables if t["status"] == "occupied")
            occupancy_rate = (occupied_tables / total_tables * 100) if total_tables > 0 else 0
            
            return {
                "total_tables": total_tables,
                "occupied_tables": occupied_tables,
                "occupancy_rate": f"{occupancy_rate:.1f}%",
                "active_orders": len(active_orders),
                "average_covers": sum(t.get("party_size", 0) for t in tables if t["status"] == "occupied") / occupied_tables if occupied_tables > 0 else 0,
                "correlation_id": correlation_id
            }
    
    def _extract_product_ids(self, orders: list) -> list[str]:
        """Extract unique product IDs from orders."""
        product_ids = set()
        for order in orders:
            for item in order.get("items", []):
                product_ids.add(item["product_id"])
        return list(product_ids)
    
    def _calculate_product_stats(
        self,
        orders: list,
        products: dict,
        payments: list
    ) -> list[dict]:
        """Calculate sales statistics per product."""
        product_stats = {}
        
        for order in orders:
            for item in order.get("items", []):
                pid = item["product_id"]
                
                if pid not in product_stats:
                    product_stats[pid] = {
                        "product_id": pid,
                        "name": products.get(pid, {}).get("name", "Unknown"),
                        "sku": products.get(pid, {}).get("sku", "N/A"),
                        "quantity_sold": 0,
                        "revenue": 0.0
                    }
                
                product_stats[pid]["quantity_sold"] += item["quantity"]
                product_stats[pid]["revenue"] += item.get("subtotal", 0.0)
        
        return list(product_stats.values())
```

This orchestrator:

* Coordinates 5 services
* Executes parallel calls when dependencies allow
* Handles errors gracefully
* Tracks requests with correlation IDs
* Aggregates data from multiple sources

## Parallel Service Calls with asyncio

The key to fast orchestration is **parallel execution**. Python's `asyncio.gather()` makes this easy:

```python
# Sequential (slow - 2.3 seconds total)
async def slow_orchestration():
    orders = await get_orders()        # 800ms
    products = await get_products()    # 600ms
    payments = await get_payments()    # 900ms
    # Total: 2300ms

# Parallel (fast - 900ms total)
async def fast_orchestration():
    results = await asyncio.gather(
        get_orders(),      # 800ms
        get_products(),    # 600ms
        get_payments()     # 900ms
    )
    orders, products, payments = results
    # Total: max(800, 600, 900) = 900ms
```

### Handling Dependencies

Some calls depend on others. Use sequential then parallel:

```python
async def dependent_orchestration():
    # Step 1: Get orders first (needed for product IDs)
    orders = await get_orders()
    
    # Step 2: Extract product IDs
    product_ids = extract_product_ids(orders)
    
    # Step 3: Get products and payments in parallel
    products, payments = await asyncio.gather(
        get_products(product_ids),
        get_payments()
    )
    
    # Step 4: Aggregate
    return aggregate(orders, products, payments)
```

### Error Handling with gather()

Use `return_exceptions=True` to handle errors without stopping other tasks:

```python
results = await asyncio.gather(
    get_orders(),
    get_products(),
    get_payments(),
    return_exceptions=True  # Don't fail if one service fails
)

# Check for errors
for i, result in enumerate(results):
    if isinstance(result, Exception):
        logger.error(f"Service {i} failed: {result}")
        # Use fallback data or skip this service
```

## Correlation IDs for Request Tracing

When a chatbot query touches 5 services, debugging failures is hard. **Correlation IDs** solve this:

```python
# middleware/correlation_middleware.py
from fastapi import Request
from uuid import uuid4
import contextvars

# Context variable for correlation ID
correlation_id_var = contextvars.ContextVar("correlation_id", default=None)


async def correlation_middleware(request: Request, call_next):
    """
    Extract or generate correlation ID and pass it through the request chain.
    """
    # Get correlation ID from header or generate new one
    correlation_id = request.headers.get("x-correlation-id") or str(uuid4())
    
    # Store in context (accessible by all async tasks)
    correlation_id_var.set(correlation_id)
    
    # Add to response headers
    response = await call_next(request)
    response.headers["x-correlation-id"] = correlation_id
    
    return response


def get_correlation_id() -> str:
    """Get current correlation ID from context."""
    return correlation_id_var.get() or str(uuid4())
```

### Using Correlation IDs

```python
# In orchestrator
async def get_daily_revenue(tenant_id: str, query_date: date):
    correlation_id = get_correlation_id()
    
    logger.info(f"[{correlation_id}] Starting revenue query")
    
    # Pass correlation ID to downstream services
    headers = {
        "x-tenant-id": tenant_id,
        "x-correlation-id": correlation_id
    }
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            "http://localhost:4002/orders",
            headers=headers
        )
    
    logger.info(f"[{correlation_id}] Revenue query completed")
```

### Log Output with Correlation IDs

```
[abc-123] Starting revenue query
[abc-123] Calling POS Core service
[abc-123] Calling Payment service
[abc-123] POS Core responded in 234ms
[abc-123] Payment service responded in 189ms
[abc-123] Aggregating results
[abc-123] Revenue query completed

[def-456] Starting top products query
[def-456] Calling POS Core service
[def-456] ERROR: POS Core timeout after 5000ms
[def-456] Top products query failed
```

Now I can grep logs for `[abc-123]` to see the entire request lifecycle across all services.

## Error Handling in Orchestration

Orchestration error handling is critical. I learned this the hard way when the Payment Service went down and took the entire Chatbot offline.

### Pattern 1: Partial Success

Return partial results if some services fail:

```python
async def get_restaurant_status_resilient(tenant_id: str):
    """Return what we can, even if some services fail."""
    headers = {"x-tenant-id": tenant_id}
    
    results = await asyncio.gather(
        get_tables(headers),
        get_orders(headers),
        return_exceptions=True
    )
    
    tables, orders = results
    
    response = {}
    
    # Tables service succeeded
    if not isinstance(tables, Exception):
        response["tables"] = tables
        response["occupancy"] = calculate_occupancy(tables)
    else:
        response["tables_error"] = "Service unavailable"
    
    # Orders service succeeded
    if not isinstance(orders, Exception):
        response["active_orders"] = len(orders)
    else:
        response["orders_error"] = "Service unavailable"
    
    return response
```

### Pattern 2: Fallback Values

Use cached or default values when services fail:

```python
async def get_top_products_with_fallback(tenant_id: str, date: date):
    """Use cached data if services fail."""
    try:
        # Try to get live data
        return await get_top_products(tenant_id, date)
    except Exception as e:
        logger.error(f"Live query failed: {e}")
        
        # Fallback to yesterday's data
        cached = await cache.get(f"top_products:{tenant_id}")
        if cached:
            cached["from_cache"] = True
            cached["warning"] = "Using cached data from yesterday"
            return cached
        
        # Ultimate fallback
        return {"error": "Data unavailable", "products": []}
```

### Pattern 3: Timeout Configuration

Different services have different SLAs. Configure timeouts accordingly:

```python
service_timeouts = {
    "auth": 2.0,        # Fast, authentication is critical
    "inventory": 3.0,   # MongoDB, usually fast
    "pos": 5.0,         # PostgreSQL, complex queries
    "payment": 5.0,     # External payment gateway
    "restaurant": 3.0   # Small dataset
}

async def call_service(service_name: str, url: str, headers: dict):
    """Call service with configured timeout."""
    timeout = service_timeouts.get(service_name, 5.0)
    
    try:
        async with httpx.AsyncClient() as client:
            response = await client.get(url, headers=headers, timeout=timeout)
            return response.json()
    except httpx.TimeoutException:
        logger.error(f"{service_name} timeout after {timeout}s")
        raise
```

## Production Lessons Learned

### Lesson 1: Cascade Failures

When the Inventory Service slowed down (500ms → 3s), the Chatbot started timing out, backing up requests, and eventually crashing.

**Solution**: Implement circuit breaker (covered in next article) and aggressive timeouts:

```python
# Don't wait forever for slow services
AGGRESSIVE_TIMEOUT = 2.0

async def get_inventory_with_circuit_breaker():
    if circuit_breaker.is_open("inventory"):
        return {"error": "Service unavailable", "products": []}
    
    try:
        return await get_inventory(timeout=AGGRESSIVE_TIMEOUT)
    except Exception as e:
        circuit_breaker.record_failure("inventory")
        raise
```

### Lesson 2: Thundering Herd

During a popular lunch rush, 100 simultaneous chatbot queries hit all services at once, overwhelming them.

**Solution**: Rate limiting and request coalescing:

```python
from asyncio import Lock

# Coalesce identical requests
_request_locks = {}

async def get_daily_revenue_coalesced(tenant_id: str, date: date):
    """Multiple identical requests share one upstream call."""
    cache_key = f"{tenant_id}:{date}"
    
    if cache_key not in _request_locks:
        _request_locks[cache_key] = Lock()
    
    async with _request_locks[cache_key]:
        # Check cache first
        cached = await cache.get(cache_key)
        if cached:
            return cached
        
        # Make actual call (other concurrent requests wait here)
        result = await get_daily_revenue(tenant_id, date)
        
        # Cache for 5 minutes
        await cache.set(cache_key, result, ttl=300)
        
        return result
```

### Lesson 3: Debugging Distributed Failures

A user reported "Chatbot not working." I had no idea which service failed or why.

**Solution**: Structured logging with correlation IDs (shown earlier) and response metadata:

```python
return {
    "data": result,
    "metadata": {
        "correlation_id": correlation_id,
        "service_calls": {
            "pos_core": {"status": "success", "duration_ms": 234},
            "payment": {"status": "success", "duration_ms": 189},
            "inventory": {"status": "timeout", "duration_ms": 3000}
            },
        "total_duration_ms": 3234
    }
}
```

## Best Practices

1. **Use correlation IDs** for distributed tracing across services
2. **Execute in parallel** when dependencies allow (use `asyncio.gather()`)
3. **Configure timeouts** per service based on SLAs
4. **Handle partial failures** gracefully (don't fail entire response if one service is down)
5. **Implement circuit breakers** to prevent cascade failures
6. **Cache aggressively** (especially for expensive aggregations)
7. **Monitor orchestrator performance** (response time, failure rate, timeout rate)
8. **Log service call duration** to identify slow dependencies
9. **Use request coalescing** to prevent thundering herd
10. **Prefer orchestration for synchronous workflows** (user-facing queries)
11. **Prefer choreography for asynchronous workflows** (background processing)

## Next Steps

The Chatbot Orchestrator improved performance 6x, but it introduced new failure modes. When the Payment Service fails, should the Chatbot fail too? How do we handle transient errors vs permanent failures?

In the next article, **Resilience & Fault Tolerance**, we'll explore circuit breakers, retry strategies, and graceful degradation—the patterns that keep distributed systems running when things go wrong (and they will).

***

*This is part of the Software Architecture 101 series, where I share lessons learned building a production multi-tenant POS system with 6 microservices.*
