# Part 3: Orchestrating Agents with OpenAI

*Part of the* [*Multi Agent Orchestration 101 Series*](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101)

## Why OpenAI Function Calling Is Perfect for Multi-Agent Work

I switched from prompt-engineering my tool calls (parsing `TOOL: name args` from raw text) to OpenAI function calling when I noticed my agents were hallucinating tool names. The model would invent tools that didn't exist, or pass arguments in the wrong format. Function calling enforces a contract: the model must emit a structured JSON object that matches a schema you define, or it returns a plain text response. No more parsing.

For multi-agent systems, this constraint is valuable. Each agent has a precise interface. Coordination becomes reliable.

***

## Prerequisites

```bash
pip install openai>=1.14.0 python-dotenv aiosqlite
```

```bash
export OPENAI_API_KEY="sk-..."
```

Use `python-dotenv` and a `.env` file in practice — never hardcode keys.

***

## Replacing `_think()` with an OpenAI Call

Taking the `AgentV2` base from Part 2, here is a concrete OpenAI agent:

```python
# openai_agent.py
from __future__ import annotations
import json
import os
from openai import AsyncOpenAI
from agent_v2 import AgentV2, Message
from dispatcher import ToolDispatcher

client = AsyncOpenAI(api_key=os.environ["OPENAI_API_KEY"])


class OpenAIAgent(AgentV2):
    model: str = "gpt-4o-mini"

    def __init__(
        self,
        name: str,
        system_prompt: str,
        dispatcher: ToolDispatcher,
        model: str = "gpt-4o-mini",
        persist: bool = False,
    ) -> None:
        super().__init__(
            name=name,
            system_prompt=system_prompt,
            dispatcher=dispatcher,
            persist=persist,
        )
        self.model = model

    async def _think(self) -> str:
        """Call OpenAI with current memory and available tools."""
        messages = [{"role": "system", "content": self.system_prompt}]
        for msg in self.memory:
            if msg.role == "tool":
                messages.append({
                    "role": "tool",
                    "tool_call_id": msg.metadata.get("tool_call_id", "unknown"),
                    "content": msg.content,
                })
            else:
                messages.append({"role": msg.role, "content": msg.content})

        response = await client.chat.completions.create(
            model=self.model,
            messages=messages,
            tools=[
                {"type": "function", "function": schema}
                for schema in self.dispatcher.schemas()
            ],
            tool_choice="auto",
        )

        choice = response.choices[0]

        # Model wants to call a tool
        if choice.finish_reason == "tool_calls":
            tool_call = choice.message.tool_calls[0]
            name = tool_call.function.name
            args = tool_call.function.arguments  # already a JSON string

            # Store the assistant message with tool_calls so history is valid
            self.memory.append(
                Message(
                    role="assistant",
                    content="",
                    metadata={"tool_calls": [tool_call.model_dump()]},
                )
            )
            # Return in the format AgentV2.run() expects
            return f"TOOL: {name} {args}"

        # Plain response
        content = choice.message.content or ""
        return f"DONE: {content}"
```

One important detail: when the model emits a `tool_calls` finish, you must append that assistant message (with `tool_calls` in metadata) to history **before** the tool result. OpenAI's API validates message ordering strictly.

***

## The Supervisor Pattern

The most useful multi-agent pattern with OpenAI is the **supervisor**: one agent that breaks down a goal and delegates subtasks to specialised worker agents.

```
User
  │
  ▼
Supervisor Agent
  ├── Worker A (e.g., web search)
  ├── Worker B (e.g., code generation)
  └── Worker C (e.g., summarisation)
```

The supervisor's tools are the worker agents themselves. From the supervisor's perspective, calling a worker is just another tool call.

```python
# supervisor.py
from __future__ import annotations
import asyncio
import json
from openai_agent import OpenAIAgent
from dispatcher import ToolDispatcher
from tools import ToolDefinition


def make_worker_tool(worker: OpenAIAgent) -> ToolDefinition:
    """Wrap an agent as a tool the supervisor can call."""

    async def _call(args: str) -> str:
        parsed = json.loads(args)
        return await worker.run(parsed["task"])

    return ToolDefinition(
        name=worker.name,
        description=f"Delegate a task to the '{worker.name}' specialist agent.",
        parameters={
            "type": "object",
            "properties": {
                "task": {
                    "type": "string",
                    "description": "The task description to send to this agent.",
                }
            },
            "required": ["task"],
        },
        fn=_call,
    )
```

Now build the full system:

```python
# run_supervisor.py
import asyncio
import os
from openai_agent import OpenAIAgent
from dispatcher import ToolDispatcher
from supervisor import make_worker_tool
from tools import shell_tool, kv_set_tool, kv_get_tool


async def main() -> None:
    # --- Worker agents ---
    shell_worker = OpenAIAgent(
        name="shell_worker",
        system_prompt=(
            "You are a shell execution agent. "
            "Run the shell commands needed to complete the given task. "
            "Return a concise summary of what you did and the output."
        ),
        dispatcher=ToolDispatcher([shell_tool]),
        model="gpt-4o-mini",
    )

    memory_worker = OpenAIAgent(
        name="memory_worker",
        system_prompt=(
            "You are a memory agent. "
            "Store and retrieve information using the kv_set and kv_get tools. "
            "Always confirm what you stored or retrieved."
        ),
        dispatcher=ToolDispatcher([kv_set_tool, kv_get_tool]),
        model="gpt-4o-mini",
    )

    # --- Supervisor ---
    supervisor_tools = ToolDispatcher(
        [make_worker_tool(shell_worker), make_worker_tool(memory_worker)]
    )

    supervisor = OpenAIAgent(
        name="supervisor",
        system_prompt=(
            "You are an orchestration agent. "
            "Break down the user's goal into subtasks and delegate each to the "
            "appropriate specialist: 'shell_worker' for commands, "
            "'memory_worker' for storing and retrieving data. "
            "Combine the results and return a final answer."
        ),
        dispatcher=supervisor_tools,
        model="gpt-4o",  # stronger model for planning
    )

    result = await supervisor.run(
        "Find out the current Python version on this machine, "
        "then store it under the key 'python_version'."
    )
    print(result)


if __name__ == "__main__":
    asyncio.run(main())
```

When I run this on my machine, the supervisor calls `shell_worker` with `python3 --version`, gets back the version string, then calls `memory_worker` to store it. The final response confirms both steps. **The supervisor never ran a shell command directly** — it delegated.

***

## Parallel Agent Execution

Sequential delegation is simple but slow. When subtasks are independent, run workers in parallel with `asyncio.gather`:

```python
# parallel_delegation.py
import asyncio
import json
from openai_agent import OpenAIAgent
from dispatcher import ToolDispatcher
from tools import shell_tool


async def run_parallel_workers(
    workers: list[OpenAIAgent], tasks: list[str]
) -> list[str]:
    """Run each worker on its task simultaneously."""
    return await asyncio.gather(
        *[w.run(t) for w, t in zip(workers, tasks)]
    )


# Example: three workers check three different things at the same time
async def main() -> None:
    tasks = [
        "Check disk usage with df -h",
        "Show top 5 processes by CPU with ps aux --sort=-%cpu | head -6",
        "Show memory usage with free -h",
    ]

    workers = [
        OpenAIAgent(
            name=f"worker_{i}",
            system_prompt="Execute the given shell command and return the output.",
            dispatcher=ToolDispatcher([shell_tool]),
        )
        for i in range(len(tasks))
    ]

    results = await run_parallel_workers(workers, tasks)
    for task, result in zip(tasks, results):
        print(f"\n[{task[:40]}...]\n{result}")


if __name__ == "__main__":
    asyncio.run(main())
```

I use this pattern extensively when a supervisor has to gather information from multiple sources before synthesising an answer.

***

## Handling Errors and Retries

Agents in production fail. Models return malformed JSON, tools time out, API rate limits hit. A simple retry wrapper:

```python
# retry.py
import asyncio
import logging
from openai_agent import OpenAIAgent

logger = logging.getLogger(__name__)


async def run_with_retry(
    agent: OpenAIAgent,
    task: str,
    max_attempts: int = 3,
    delay: float = 2.0,
) -> str:
    for attempt in range(1, max_attempts + 1):
        try:
            return await agent.run(task)
        except Exception as exc:
            logger.warning(
                "Agent '%s' failed on attempt %d/%d: %s",
                agent.name, attempt, max_attempts, exc,
            )
            if attempt < max_attempts:
                await asyncio.sleep(delay * attempt)  # exponential back-off

    return f"Agent '{agent.name}' failed after {max_attempts} attempts."
```

For rate limiting specifically, catch `openai.RateLimitError` and sleep for longer. I use `tenacity` in production for more sophisticated retry policies, but the pattern above is enough to understand the concept.

***

## Key Takeaways

* OpenAI function calling enforces a tool interface contract — no more parsing free text
* The supervisor pattern: strong model for planning, cheaper models for execution
* Worker agents are just tools from the supervisor's perspective
* `asyncio.gather` makes parallel delegation straightforward
* Retry with exponential back-off covers most transient failures

***

## Up Next

[Part 4: Claude Multi-Agent Workflow](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-4-claude-multi-agent-workflow) — the same patterns using Anthropic's tool use API, plus Claude's extended thinking for supervisor-level planning.
