# Part 5: Production Patterns

*Part of the* [*Multi Agent Orchestration 101 Series*](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101)

## The Gap Between "It Works" and "It Works in Production"

The first version of my multi-agent system worked great in development. It fell apart in production within a day. The failures weren't dramatic — no crashes, no errors — just silent misbehaviour: agents looping indefinitely, costs spiking without warning, no way to trace why an agent made a particular decision.

This part covers the things I had to add to make the system trustworthy. None of it is glamorous, but all of it is necessary.

***

## 1. Structured Logging with Trace IDs

The biggest debugging problem with multi-agent systems is attribution: when something goes wrong, which agent caused it? A trace ID threads through every log line in a single request, making it possible to reconstruct the full chain of agent decisions.

```python
# tracing.py
from __future__ import annotations
import logging
import uuid
from contextvars import ContextVar

trace_id: ContextVar[str] = ContextVar("trace_id", default="no-trace")


def new_trace() -> str:
    tid = str(uuid.uuid4())[:8]
    trace_id.set(tid)
    return tid


class TraceFilter(logging.Filter):
    def filter(self, record: logging.LogRecord) -> bool:
        record.trace_id = trace_id.get()
        return True


def configure_logging() -> None:
    handler = logging.StreamHandler()
    handler.addFilter(TraceFilter())
    formatter = logging.Formatter(
        "%(asctime)s [%(trace_id)s] %(levelname)s %(name)s — %(message)s"
    )
    handler.setFormatter(formatter)
    logging.basicConfig(level=logging.INFO, handlers=[handler])
```

Add logging calls to the `AgentV2.run()` loop:

```python
import logging
from tracing import new_trace, trace_id

logger = logging.getLogger(__name__)


async def run(self, user_message: str) -> str:
    tid = trace_id.get()
    logger.info("agent=%s starting task=%r trace=%s", self.name, user_message[:60], tid)

    # ... existing loop logic ...

    logger.info("agent=%s finished trace=%s", self.name, tid)
    return result
```

At the entry point, call `new_trace()` once per request:

```python
from tracing import new_trace, configure_logging

configure_logging()

async def handle_request(goal: str) -> str:
    new_trace()
    return await supervisor.run(goal)
```

Now every log line from every agent in a request shares the same 8-character trace ID. When a request goes wrong, `grep 'a3f2b1c9' app.log` shows the complete picture.

***

## 2. Token Budget and Cost Tracking

I had a billing surprise in the first week of running my agent system. The supervisor was occasionally getting into a reasoning loop, making 15–20 LLM calls per request instead of the expected 3–5. At GPT-4o pricing, that adds up fast.

Two mitigations: a hard iteration cap (already in Part 1's loop), and per-request token tracking.

```python
# cost_tracker.py
from __future__ import annotations
from dataclasses import dataclass, field


# Prices per 1M tokens as of early 2025 — update these as pricing changes
TOKEN_COSTS: dict[str, dict[str, float]] = {
    "gpt-4o": {"input": 2.50, "output": 10.00},
    "gpt-4o-mini": {"input": 0.15, "output": 0.60},
    "claude-3-5-haiku-20241022": {"input": 0.80, "output": 4.00},
    "claude-3-7-sonnet-20250219": {"input": 3.00, "output": 15.00},
}


@dataclass
class UsageRecord:
    model: str
    input_tokens: int
    output_tokens: int

    @property
    def cost_usd(self) -> float:
        prices = TOKEN_COSTS.get(self.model, {"input": 0, "output": 0})
        return (
            self.input_tokens * prices["input"] / 1_000_000
            + self.output_tokens * prices["output"] / 1_000_000
        )


@dataclass
class CostTracker:
    records: list[UsageRecord] = field(default_factory=list)

    def record(self, model: str, input_tokens: int, output_tokens: int) -> None:
        self.records.append(UsageRecord(model, input_tokens, output_tokens))

    @property
    def total_cost_usd(self) -> float:
        return sum(r.cost_usd for r in self.records)

    @property
    def total_tokens(self) -> int:
        return sum(r.input_tokens + r.output_tokens for r in self.records)

    def summary(self) -> str:
        return (
            f"{len(self.records)} calls — "
            f"{self.total_tokens:,} tokens — "
            f"${self.total_cost_usd:.4f}"
        )
```

Pass a shared `CostTracker` to all agents and record after every API call:

```python
# In OpenAIAgent._think():
response = await client.chat.completions.create(...)
self.cost_tracker.record(
    model=self.model,
    input_tokens=response.usage.prompt_tokens,
    output_tokens=response.usage.completion_tokens,
)
```

Log the summary at the end of every request:

```python
logger.info("request finished — %s", cost_tracker.summary())
# request finished — 7 calls — 12,341 tokens — $0.0032
```

Add a hard limit if you want cost-based circuit breaking:

```python
MAX_COST_USD = 0.50

async def run(self, user_message: str) -> str:
    for _ in range(10):
        if self.cost_tracker.total_cost_usd > MAX_COST_USD:
            return "Aborted: cost budget exceeded."
        # ... rest of loop
```

***

## 3. Rate Limiting

Both OpenAI and Anthropic have per-minute and per-day token limits. When you run parallel agents (Part 3), you can hit rate limits quickly.

A simple token bucket limiter:

```python
# rate_limiter.py
from __future__ import annotations
import asyncio
import time


class TokenBucketLimiter:
    """
    Limits the number of API calls per minute.
    Adjust `calls_per_minute` to stay under your tier's limits.
    """

    def __init__(self, calls_per_minute: int = 20) -> None:
        self._interval = 60.0 / calls_per_minute
        self._last_call: float = 0.0
        self._lock = asyncio.Lock()

    async def acquire(self) -> None:
        async with self._lock:
            now = time.monotonic()
            wait = self._interval - (now - self._last_call)
            if wait > 0:
                await asyncio.sleep(wait)
            self._last_call = time.monotonic()
```

Use it in `_think()` before every API call:

```python
await self.limiter.acquire()
response = await client.chat.completions.create(...)
```

For production systems I recommend the `limits` library or `slowapi` for FastAPI, but this bucket is enough for a standalone agent process.

***

## 4. Graceful Degradation

When a worker agent fails, the supervisor shouldn't crash — it should handle the failure and either retry, use a fallback, or return a partial result.

```python
# graceful.py
import asyncio
import logging
from openai_agent import OpenAIAgent

logger = logging.getLogger(__name__)


async def safe_run(
    agent: OpenAIAgent,
    task: str,
    fallback: str = "Worker unavailable.",
    timeout: float = 30.0,
) -> str:
    try:
        return await asyncio.wait_for(agent.run(task), timeout=timeout)
    except asyncio.TimeoutError:
        logger.warning("agent=%s timed out after %.1fs", agent.name, timeout)
        return fallback
    except Exception as exc:
        logger.error("agent=%s raised %s: %s", agent.name, type(exc).__name__, exc)
        return fallback
```

In the supervisor's `make_worker_tool`, wrap the inner call:

```python
async def _call(args: str) -> str:
    parsed = json.loads(args)
    return await safe_run(worker, parsed["task"])
```

Now a crashing worker returns a fallback message to the supervisor, which can decide how to proceed (retry, skip, or escalate).

***

## 5. Health Checks and Readiness

If you expose your agent system as a service (FastAPI, for example), add a health check that verifies the LLM APIs are reachable:

```python
# health.py
import asyncio
import os
from anthropic import AsyncAnthropic
from openai import AsyncOpenAI

openai_client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])
anthropic_client = AsyncAnthropic(api_key=os.environ["ANTHROPIC_API_KEY"])


async def check_openai() -> bool:
    try:
        await openai_client.models.list()
        return True
    except Exception:
        return False


async def check_anthropic() -> bool:
    try:
        # Minimal call — 1 token, cheapest model
        await anthropic_client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=1,
            messages=[{"role": "user", "content": "hi"}],
        )
        return True
    except Exception:
        return False


async def health_status() -> dict[str, bool]:
    openai_ok, anthropic_ok = await asyncio.gather(check_openai(), check_anthropic())
    return {"openai": openai_ok, "anthropic": anthropic_ok}
```

***

## 6. The Mistakes I Made (So You Don't Have To)

**Mistake 1: Trusting the agent to stop itself.** Without a hard iteration cap, an agent can loop indefinitely. I now always enforce `for _ in range(max_iter)` and return a clear message when the cap is hit.

**Mistake 2: Not trimming memory.** After enough turns, the context window fills up and old conversation turns get truncated by the LLM — silently. The agent starts forgetting earlier parts of the task. I trim memory proactively (see Part 1's `_trim_memory` pattern).

**Mistake 3: Passing all tools to all agents.** Early on I gave every agent every tool to "keep things flexible". The model would occasionally call the wrong tool for the wrong reason. Tight tool lists = more predictable behaviour. Workers should only have the tools they need.

**Mistake 4: No timeout on worker calls.** A shell command that hangs blocks the whole agent loop. Always wrap worker calls with `asyncio.wait_for`.

**Mistake 5: Logging everything in development, nothing in production.** I had verbose logging locally but stripped it all out for production to "keep things clean". Then I had a billing anomaly and no way to investigate. Log at `INFO` in production, `DEBUG` in development.

***

## Putting It All Together

Here is the final entry point that applies all the patterns from this series:

```python
# main.py
import asyncio
import logging
from tracing import new_trace, configure_logging
from cost_tracker import CostTracker
from rate_limiter import TokenBucketLimiter
from claude_agent import ClaudeAgent
from openai_agent import OpenAIAgent
from dispatcher import ToolDispatcher
from supervisor import make_worker_tool
from tools import shell_tool, kv_set_tool, kv_get_tool
from graceful import safe_run

configure_logging()
logger = logging.getLogger(__name__)


async def run_system(goal: str) -> str:
    new_trace()
    tracker = CostTracker()
    limiter = TokenBucketLimiter(calls_per_minute=20)

    shell_worker = OpenAIAgent(
        name="shell_worker",
        system_prompt="Execute shell commands concisely.",
        dispatcher=ToolDispatcher([shell_tool]),
        model="gpt-4o-mini",
        cost_tracker=tracker,
        limiter=limiter,
    )

    memory_worker = OpenAIAgent(
        name="memory_worker",
        system_prompt="Store and retrieve data.",
        dispatcher=ToolDispatcher([kv_set_tool, kv_get_tool]),
        model="gpt-4o-mini",
        cost_tracker=tracker,
        limiter=limiter,
    )

    supervisor = ClaudeAgent(
        name="supervisor",
        system_prompt=(
            "Orchestrate subtasks using shell_worker and memory_worker. "
            "Think carefully before delegating. Return a concise final answer."
        ),
        dispatcher=ToolDispatcher(
            [make_worker_tool(shell_worker), make_worker_tool(memory_worker)]
        ),
        model="claude-3-7-sonnet-20250219",
        use_extended_thinking=True,
        thinking_budget=4000,
        cost_tracker=tracker,
        limiter=limiter,
    )

    result = await supervisor.run(goal)

    logger.info("completed — %s", tracker.summary())
    return result


if __name__ == "__main__":
    import sys
    goal = " ".join(sys.argv[1:]) or "Check Python version and store it."
    print(asyncio.run(run_system(goal)))
```

***

## Key Takeaways

* Trace IDs make multi-agent debugging tractable
* Track tokens and cost per request — surprises are expensive
* Rate limit proactively before hitting API caps
* `asyncio.wait_for` and fallback strings prevent a single worker from blocking everything
* Tight tool lists per agent = more predictable behaviour
* Log at INFO in production, always

***

## Series Complete

You now have a multi-agent system built from first principles:

| Part                                                                                                                                                | What You Built                                    |
| --------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------- |
| [1](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-1-building-agents-from-scratch) | Agent loop + message bus in pure Python           |
| [2](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-2-tools-and-memory)             | Tool dispatcher + short/long-term memory          |
| [3](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-3-openai-multi-agent-workflow)  | OpenAI function calling + supervisor pattern      |
| [4](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-4-claude-multi-agent-workflow)  | Claude tool use + extended thinking               |
| [5](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-5-production-patterns)          | Tracing, cost tracking, rate limiting, resilience |

The same patterns extend to more sophisticated frameworks (LangGraph, AutoGen, CrewAI). The difference is that now you understand what those frameworks are doing under the hood — which means you can debug them when they break.
