# Part 2: Giving Agents Tools and Memory

*Part of the* [*Multi Agent Orchestration 101 Series*](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101)

## The Tool Problem I Kept Hitting

After getting my first agent loop working, I ran into a pattern that kept repeating: the agent would say it wanted to do something, but there was no clean way to map that decision to actual Python code. I was writing `if "search" in response` conditions everywhere. It was brittle and hard to extend.

The fix is **structured tool definitions**. This is the same mechanism that OpenAI's function calling and Anthropic's tool use are built on at the API level. Understanding it from first principles means you will know exactly what those APIs are doing when you start using them in Parts 3 and 4.

***

## What Is a Tool?

A tool is a named, callable unit of work that an agent can invoke. It has three parts:

1. **A schema** — what the tool is called, what it does, what parameters it accepts
2. **An implementation** — the actual Python function
3. **A dispatcher** — code that maps a name + arguments to a function call

This separation is important. The schema is what you give to the LLM (or use in your rule-based `_think()` logic). The implementation and dispatcher are pure Python that the LLM never sees.

***

## Defining Tools with JSON Schema

I use Python `dataclass` + `dict` for tool definitions. It is verbose but explicit — you can see exactly what will be sent to an LLM later.

```python
# tools.py
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass
from typing import Any, Awaitable, Callable


@dataclass
class ToolDefinition:
    name: str
    description: str
    parameters: dict[str, Any]  # JSON Schema object
    fn: Callable[..., Awaitable[str]]

    def to_dict(self) -> dict[str, Any]:
        """Schema representation sent to OpenAI / Anthropic."""
        return {
            "name": self.name,
            "description": self.description,
            "parameters": self.parameters,
        }
```

Here are two concrete tools I use in my own projects:

```python
# --- Tool 1: run a shell command (carefully!) ---

async def run_shell(command: str) -> str:
    proc = await asyncio.create_subprocess_shell(
        command,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE,
    )
    stdout, stderr = await proc.communicate()
    if proc.returncode != 0:
        return f"Error (exit {proc.returncode}): {stderr.decode().strip()}"
    return stdout.decode().strip()


shell_tool = ToolDefinition(
    name="run_shell",
    description="Execute a shell command and return stdout. Use carefully.",
    parameters={
        "type": "object",
        "properties": {
            "command": {
                "type": "string",
                "description": "The shell command to execute",
            }
        },
        "required": ["command"],
    },
    fn=lambda args: run_shell(json.loads(args)["command"]),
)


# --- Tool 2: simple in-memory key/value store ---

_store: dict[str, str] = {}


async def kv_set(key: str, value: str) -> str:
    _store[key] = value
    return f"Stored '{key}'."


async def kv_get(key: str) -> str:
    if key not in _store:
        return f"Key '{key}' not found."
    return _store[key]


kv_set_tool = ToolDefinition(
    name="kv_set",
    description="Store a value under a key for later retrieval.",
    parameters={
        "type": "object",
        "properties": {
            "key": {"type": "string"},
            "value": {"type": "string"},
        },
        "required": ["key", "value"],
    },
    fn=lambda args: kv_set(**json.loads(args)),
)

kv_get_tool = ToolDefinition(
    name="kv_get",
    description="Retrieve a previously stored value by key.",
    parameters={
        "type": "object",
        "properties": {
            "key": {"type": "string"},
        },
        "required": ["key"],
    },
    fn=lambda args: kv_get(**json.loads(args)),
)
```

***

## The Tool Dispatcher

The dispatcher maps a tool name to its `ToolDefinition` and calls it safely:

```python
# dispatcher.py
from __future__ import annotations
from tools import ToolDefinition


class ToolDispatcher:
    def __init__(self, tools: list[ToolDefinition]) -> None:
        self._registry: dict[str, ToolDefinition] = {t.name: t for t in tools}

    def schemas(self) -> list[dict]:
        """Return all schemas — this is what you pass to OpenAI / Claude."""
        return [t.to_dict() for t in self._registry.values()]

    async def call(self, name: str, args: str) -> str:
        if name not in self._registry:
            return f"Unknown tool: '{name}'"
        try:
            return await self._registry[name].fn(args)
        except Exception as exc:
            return f"Tool error: {exc}"
```

Agents now hold a `ToolDispatcher` instead of a raw dict. The key benefit: `dispatcher.schemas()` returns exactly the JSON array that OpenAI and Anthropic expect for their `tools` parameter. **No translation needed when you wire in the real API.**

***

## Short-Term vs Long-Term Memory

Part 1's `memory` list is short-term memory — it lives in RAM and resets when the process exits. That is fine for a single request, but falls apart when:

* An agent needs context from a previous session
* Two agents need to share state without passing messages
* You want to inspect what an agent was "thinking" after the fact

I use two patterns depending on the use case.

### Pattern A: Shared Context Dict (in-process)

For agents in the same process that need a shared scratchpad:

```python
# shared_context.py
from __future__ import annotations
from threading import Lock
from typing import Any


class SharedContext:
    """Thread-safe dictionary shared across all agents in one process."""

    def __init__(self) -> None:
        self._data: dict[str, Any] = {}
        self._lock = Lock()

    def set(self, key: str, value: Any) -> None:
        with self._lock:
            self._data[key] = value

    def get(self, key: str, default: Any = None) -> Any:
        with self._lock:
            return self._data.get(key, default)

    def snapshot(self) -> dict[str, Any]:
        with self._lock:
            return dict(self._data)
```

### Pattern B: SQLite for Persistence

When I need history to survive restarts, I use SQLite via `aiosqlite`. It's a single file, zero infrastructure, and fast enough for local development and small production workloads.

```python
# memory_store.py
from __future__ import annotations
import json
import time
import aiosqlite
from agent import Message

DB_PATH = "agent_memory.db"


async def init_db() -> None:
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            """
            CREATE TABLE IF NOT EXISTS messages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                agent_name TEXT NOT NULL,
                role TEXT NOT NULL,
                content TEXT NOT NULL,
                metadata TEXT DEFAULT '{}',
                created_at REAL NOT NULL
            )
            """
        )
        await db.commit()


async def save_message(agent_name: str, msg: Message) -> None:
    async with aiosqlite.connect(DB_PATH) as db:
        await db.execute(
            "INSERT INTO messages (agent_name, role, content, metadata, created_at) "
            "VALUES (?, ?, ?, ?, ?)",
            (agent_name, msg.role, msg.content, json.dumps(msg.metadata), time.time()),
        )
        await db.commit()


async def load_messages(agent_name: str, last_n: int = 20) -> list[Message]:
    async with aiosqlite.connect(DB_PATH) as db:
        async with db.execute(
            "SELECT role, content, metadata FROM messages "
            "WHERE agent_name = ? ORDER BY created_at DESC LIMIT ?",
            (agent_name, last_n),
        ) as cursor:
            rows = await cursor.fetchall()

    return [
        Message(role=r[0], content=r[1], metadata=json.loads(r[2]))
        for r in reversed(rows)
    ]
```

To use it, call `load_messages` at agent startup to restore context, and `save_message` after every new message.

***

## Wiring It Together: An Agent with Tools and Persistence

Here is an updated `Agent` base class that incorporates the dispatcher and optional persistence:

```python
# agent_v2.py
from __future__ import annotations
import asyncio
import json
from dataclasses import dataclass, field
from typing import Any

from dispatcher import ToolDispatcher
from memory_store import load_messages, save_message
from agent import Message


@dataclass
class AgentV2:
    name: str
    system_prompt: str
    dispatcher: ToolDispatcher
    persist: bool = False

    memory: list[Message] = field(default_factory=list)

    async def restore_memory(self) -> None:
        if self.persist:
            self.memory = await load_messages(self.name)

    async def run(self, user_message: str) -> str:
        msg = Message(role="user", content=user_message)
        self.memory.append(msg)
        if self.persist:
            await save_message(self.name, msg)

        for _ in range(10):
            raw = await self._think()

            if raw.startswith("DONE:"):
                final = raw[5:].strip()
                done_msg = Message(role="agent", content=final)
                self.memory.append(done_msg)
                if self.persist:
                    await save_message(self.name, done_msg)
                return final

            if raw.startswith("TOOL:"):
                _, name, args = raw.split(" ", 2)
                result = await self.dispatcher.call(name, args)
                tool_msg = Message(role="tool", content=result, metadata={"tool": name})
                self.memory.append(tool_msg)
                if self.persist:
                    await save_message(self.name, tool_msg)
                continue

            return raw

        return "Max iterations reached."

    async def _think(self) -> str:
        raise NotImplementedError
```

***

## Sharing Context Between Two Agents

Here is a minimal complete example: two agents share a `SharedContext`. One writes a plan, the other executes it.

```python
# two_agents_shared.py
import asyncio
from agent_v2 import AgentV2, Message
from dispatcher import ToolDispatcher
from shared_context import SharedContext
from tools import kv_set_tool, kv_get_tool

ctx = SharedContext()


class PlannerAgent(AgentV2):
    async def _think(self) -> str:
        # In a real system this is an LLM call
        last = self.memory[-1].content
        plan = f"Step 1: process '{last}'. Step 2: summarise."
        ctx.set("plan", plan)
        return f"DONE: plan stored"


class ExecutorAgent(AgentV2):
    async def _think(self) -> str:
        plan = ctx.get("plan")
        if not plan:
            return "DONE: no plan found"
        return f"DONE: executed — {plan}"


async def main() -> None:
    dispatcher = ToolDispatcher([kv_set_tool, kv_get_tool])

    planner = PlannerAgent(name="planner", system_prompt="", dispatcher=dispatcher)
    executor = ExecutorAgent(name="executor", system_prompt="", dispatcher=dispatcher)

    await planner.run("build feature X")
    result = await executor.run("go")
    print(result)
    # executed — Step 1: process 'build feature X'. Step 2: summarise.


if __name__ == "__main__":
    asyncio.run(main())
```

***

## Key Takeaways

* Separate the tool schema (what the LLM sees) from the implementation (Python)
* A `ToolDispatcher` with `.schemas()` makes LLM integration trivial later
* Short-term memory = list; long-term memory = SQLite (or Redis for scale)
* `SharedContext` is the simplest way to share state in a single process

***

## Up Next

[Part 3: OpenAI Multi-Agent Workflow](https://blog.htunnthuthu.com/ai-and-machine-learning/artificial-intelligence/multi-agent-orchestration-101/part-3-openai-multi-agent-workflow) — replacing the stub `_think()` with real OpenAI function calling, and building a supervisor that delegates to worker agents.
