# Event Sourcing

## The Audit Trail Problem That Led Me Here

The POS system needed an audit trail. Every time an order was modified — items added, discount applied, order voided — the client wanted to know who changed what, and when, and what the order looked like before the change.

The obvious approach: a change\_log table with old and new values. I built it. It worked for a while. But it was a duplicate of my domain logic — every ORDER update needed a corresponding INSERT into the log. Eventually the log and the actual order fell out of sync during a transaction that failed partway through.

Event sourcing solves this differently: instead of storing the current state and maintaining a separate log, **the log is the state**. I store every event that happened to an order (OrderCreated, ItemAdded, DiscountApplied, OrderVoided), and the current state is computed by replaying those events. There is no duplication because the audit trail and the state are the same thing.

## Table of Contents

* [What Is Event Sourcing?](#what-is-event-sourcing)
* [Events as the Source of Truth](#events-as-the-source-of-truth)
* [Aggregates and Event Application](#aggregates-and-event-application)
* [The Event Store](#the-event-store)
* [Practical Example: Order Aggregate](#practical-example-order-aggregate)
* [Rebuilding State From Events](#rebuilding-state-from-events)
* [Snapshots for Performance](#snapshots-for-performance)
* [Projections and Read Models](#projections-and-read-models)
* [Event Sourcing and CQRS](#event-sourcing-and-cqrs)
* [When to Use Event Sourcing](#when-to-use-event-sourcing)
* [Lessons Learned](#lessons-learned)

***

## What Is Event Sourcing?

In conventional persistence, you store the **current state** of an entity and overwrite it on every update:

```sql
-- Conventional: current state overwritten
UPDATE orders SET status = 'voided', voided_reason = 'Customer cancelled' WHERE id = 42;
```

In event sourcing, you store **what happened** as an append-only log of events:

```sql
-- Event sourcing: new event appended, nothing deleted or overwritten
INSERT INTO order_events (order_id, event_type, payload, occurred_at)
VALUES (42, 'OrderVoided', '{"reason": "Customer cancelled", "authorised_by": 5}', NOW());
```

Current state is derived by reading and replaying all events:

{% @mermaid/diagram content="graph LR
E1\[OrderCreated<br/>t=0]
E2\[ItemAdded<br/>t=1]
E3\[ItemAdded<br/>t=2]
E4\[DiscountApplied<br/>t=3]
E5\[OrderConfirmed<br/>t=4]

```
E1 --> E2 --> E3 --> E4 --> E5

E5 -->|Replay all events| STATE["Current State<br/>Order #42<br/>Status: Confirmed<br/>Total: 450 THB<br/>Discount: 10%\n2 items"]" %}
```

***

## Events as the Source of Truth

Events have three properties that make them the ideal source of truth:

1. **Immutability:** Events are facts that have already happened. `OrderCreated` happened; it cannot un-happen. Events are never updated or deleted.
2. **Completeness:** The full history of every state transition is preserved. There is no "lost update" — every change is a record.
3. **Semantic richness:** `DiscountApplied` carries far more meaning than `total = 405`. It records intent, not just result.

***

## Aggregates and Event Application

The central concept in event-sourced domain models is the **aggregate** — an object that records what events happened and applies them to update its internal state.

```python
# orders/domain/aggregate_base.py

from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any

@dataclass
class DomainEvent:
    aggregate_id: int
    occurred_at: datetime
    event_type: str
    payload: dict[str, Any]

class Aggregate(ABC):
    def __init__(self, id: int | None = None):
        self.id = id
        self.version = 0
        self._pending_events: list[DomainEvent] = []

    def _apply(self, event: DomainEvent) -> None:
        """Apply an event to update internal state."""
        handler = getattr(self, f"_on_{event.event_type.lower()}", None)
        if handler:
            handler(event.payload)
        self.version += 1

    def _record(self, event_type: str, payload: dict) -> None:
        """Record a new event — apply it and stage it for persistence."""
        event = DomainEvent(
            aggregate_id=self.id or 0,
            occurred_at=datetime.utcnow(),
            event_type=event_type,
            payload=payload
        )
        self._apply(event)
        self._pending_events.append(event)

    def pop_events(self) -> list[DomainEvent]:
        events = list(self._pending_events)
        self._pending_events.clear()
        return events
```

***

## Practical Example: Order Aggregate

```python
# orders/domain/order_aggregate.py

from decimal import Decimal
from dataclasses import dataclass, field
from .aggregate_base import Aggregate

@dataclass
class OrderItem:
    product_id: int
    name: str
    quantity: int
    unit_price: Decimal

class OrderAggregate(Aggregate):
    def __init__(self):
        super().__init__()
        self.tenant_id: str = ""
        self.cashier_id: int = 0
        self.items: list[OrderItem] = []
        self.status: str = ""
        self.discount_percent: Decimal = Decimal("0")
        self.voided_reason: str | None = None

    @property
    def subtotal(self) -> Decimal:
        return sum(Decimal(str(i.unit_price)) * i.quantity for i in self.items)

    @property
    def total(self) -> Decimal:
        return self.subtotal * (1 - self.discount_percent / 100)

    # --- Command methods: validate and record events ---

    def create(self, tenant_id: str, cashier_id: int) -> None:
        if self.status:
            raise ValueError("Order already initialised")
        self._record("OrderCreated", {
            "tenant_id": tenant_id,
            "cashier_id": cashier_id,
        })

    def add_item(self, product_id: int, name: str, quantity: int, unit_price: float) -> None:
        if self.status != "open":
            raise ValueError("Cannot add items to a non-open order")
        if quantity <= 0:
            raise ValueError("Quantity must be positive")
        self._record("ItemAdded", {
            "product_id": product_id,
            "name": name,
            "quantity": quantity,
            "unit_price": unit_price,
        })

    def apply_discount(self, percent: float, authorised_by: int) -> None:
        if not (0 < percent <= 100):
            raise ValueError("Discount must be between 0 and 100")
        self._record("DiscountApplied", {
            "percent": percent,
            "authorised_by": authorised_by,
        })

    def confirm(self) -> None:
        if not self.items:
            raise ValueError("Cannot confirm an empty order")
        if self.status != "open":
            raise ValueError("Order is not open")
        self._record("OrderConfirmed", {"total": float(self.total)})

    def void(self, reason: str, authorised_by: int) -> None:
        if self.status == "voided":
            raise ValueError("Order is already voided")
        self._record("OrderVoided", {
            "reason": reason,
            "authorised_by": authorised_by,
        })

    # --- Event application methods: update state from events ---

    def _on_ordercreated(self, payload: dict) -> None:
        self.tenant_id = payload["tenant_id"]
        self.cashier_id = payload["cashier_id"]
        self.status = "open"

    def _on_itemadded(self, payload: dict) -> None:
        self.items.append(OrderItem(
            product_id=payload["product_id"],
            name=payload["name"],
            quantity=payload["quantity"],
            unit_price=Decimal(str(payload["unit_price"])),
        ))

    def _on_discountapplied(self, payload: dict) -> None:
        self.discount_percent = Decimal(str(payload["percent"]))

    def _on_orderconfirmed(self, payload: dict) -> None:
        self.status = "confirmed"

    def _on_ordervoided(self, payload: dict) -> None:
        self.status = "voided"
        self.voided_reason = payload["reason"]
```

***

## The Event Store

The event store is an append-only table. Nothing is ever updated or deleted.

```sql
-- Event store schema
CREATE TABLE order_events (
    id           SERIAL PRIMARY KEY,
    aggregate_id INTEGER     NOT NULL,
    tenant_id    VARCHAR(50) NOT NULL,
    event_type   VARCHAR(100) NOT NULL,
    payload      JSONB        NOT NULL,
    version      INTEGER      NOT NULL,
    occurred_at  TIMESTAMPTZ DEFAULT NOW(),
    
    UNIQUE (aggregate_id, version)  -- No duplicate versions per aggregate
);

CREATE INDEX idx_order_events_aggregate ON order_events (aggregate_id, version);
```

```python
# orders/infrastructure/event_store.py

from sqlalchemy.orm import Session
from ..domain.aggregate_base import DomainEvent
from ..domain.order_aggregate import OrderAggregate
import json

class OrderEventStore:
    def __init__(self, db: Session):
        self._db = db

    def save(self, aggregate: OrderAggregate) -> None:
        events = aggregate.pop_events()
        for event in events:
            self._db.execute(
                """
                INSERT INTO order_events (aggregate_id, tenant_id, event_type, payload, version)
                VALUES (:agg_id, :tenant_id, :type, :payload, :version)
                """,
                {
                    "agg_id": aggregate.id,
                    "tenant_id": aggregate.tenant_id,
                    "type": event.event_type,
                    "payload": json.dumps(event.payload),
                    "version": aggregate.version - len(events) + events.index(event) + 1,
                }
            )
        self._db.commit()

    def load(self, aggregate_id: int) -> OrderAggregate | None:
        rows = self._db.execute(
            "SELECT event_type, payload FROM order_events WHERE aggregate_id = :id ORDER BY version",
            {"id": aggregate_id}
        ).fetchall()

        if not rows:
            return None

        order = OrderAggregate()
        order.id = aggregate_id
        for row in rows:
            from ..domain.aggregate_base import DomainEvent
            from datetime import datetime
            event = DomainEvent(
                aggregate_id=aggregate_id,
                occurred_at=datetime.utcnow(),
                event_type=row.event_type,
                payload=json.loads(row.payload)
            )
            order._apply(event)
        return order
```

***

## Rebuilding State From Events

Because the state is derived from events, I can reconstruct any past state of an order by replaying events up to a specific version:

```python
def load_at_version(self, aggregate_id: int, version: int) -> OrderAggregate:
    rows = self._db.execute(
        """
        SELECT event_type, payload FROM order_events
        WHERE aggregate_id = :id AND version <= :version
        ORDER BY version
        """,
        {"id": aggregate_id, "version": version}
    ).fetchall()

    order = OrderAggregate()
    order.id = aggregate_id
    for row in rows:
        # ... apply event ...
        pass
    return order
```

This is how I debug production incidents: "what did order #1234 look like at version 3, before the discount was applied?"

***

## Snapshots for Performance

For aggregates with thousands of events, replaying all events on every load is slow. Snapshots solve this:

```python
# Periodically save a snapshot of the aggregate's current state
def save_snapshot(self, aggregate: OrderAggregate) -> None:
    snapshot = {
        "tenant_id": aggregate.tenant_id,
        "cashier_id": aggregate.cashier_id,
        "status": aggregate.status,
        "items": [vars(i) for i in aggregate.items],
        "discount_percent": str(aggregate.discount_percent),
    }
    self._db.execute(
        "INSERT INTO order_snapshots (aggregate_id, version, state) VALUES (:id, :version, :state) ON CONFLICT (aggregate_id) DO UPDATE SET version = :version, state = :state",
        {"id": aggregate.id, "version": aggregate.version, "state": json.dumps(snapshot)}
    )

# Load: start from snapshot, then apply only newer events
def load(self, aggregate_id: int) -> OrderAggregate:
    snapshot_row = self._db.execute(
        "SELECT version, state FROM order_snapshots WHERE aggregate_id = :id",
        {"id": aggregate_id}
    ).first()

    start_version = 0
    order = OrderAggregate()
    order.id = aggregate_id

    if snapshot_row:
        start_version = snapshot_row.version
        state = json.loads(snapshot_row.state)
        # Restore from snapshot
        order.tenant_id = state["tenant_id"]
        order.status = state["status"]
        # ... restore other fields ...
        order.version = start_version

    # Apply only events after the snapshot
    rows = self._db.execute(
        "SELECT event_type, payload FROM order_events WHERE aggregate_id = :id AND version > :v ORDER BY version",
        {"id": aggregate_id, "v": start_version}
    ).fetchall()
    # ... apply remaining events ...
    return order
```

***

## Projections and Read Models

Events in the store are the write-side truth. The read side is built by **projectors** — processes that consume events and maintain denormalised read models.

```python
# orders/projections/order_summary_projector.py

class OrderSummaryProjector:
    """
    Maintains a denormalised order_summaries table
    for fast list/reporting queries.
    """
    def __init__(self, db):
        self._db = db

    def on_ordercreated(self, aggregate_id: int, payload: dict):
        self._db.execute(
            "INSERT INTO order_summaries (id, tenant_id, cashier_id, status, item_count, total, created_at) VALUES (:id, :tid, :cashier, 'open', 0, 0, NOW())",
            {"id": aggregate_id, "tid": payload["tenant_id"], "cashier": payload["cashier_id"]}
        )

    def on_itemadded(self, aggregate_id: int, payload: dict):
        self._db.execute(
            "UPDATE order_summaries SET item_count = item_count + 1, total = total + :subtotal WHERE id = :id",
            {"id": aggregate_id, "subtotal": payload["quantity"] * payload["unit_price"]}
        )

    def on_orderconfirmed(self, aggregate_id: int, payload: dict):
        self._db.execute(
            "UPDATE order_summaries SET status = 'confirmed', total = :total WHERE id = :id",
            {"id": aggregate_id, "total": payload["total"]}
        )
```

Projections can be rebuilt at any time by replaying the event stream from the start. This is a powerful capability during schema migrations or bug fixes in the read model.

***

## Event Sourcing and CQRS

Event sourcing and CQRS are independent patterns that work exceptionally well together:

* **CQRS** separates command handlers (writes) from query handlers (reads)
* **Event sourcing** gives the write side an append-only store with full history
* **Projections** connect the two: events from the write side build the read models that queries use

Used together, they provide audit trails, temporal queries, and independently optimised read and write paths.

***

## When to Use Event Sourcing

Event sourcing is a good fit when:

* **Audit trail is a requirement** — who changed what and when, with the ability to reconstruct past state
* **Temporal queries are needed** — "what did the inventory look like at midnight?"
* **Undo/redo functionality** — events can be applied or reversed
* **Complex domain with many state transitions** — the event log makes transitions explicit
* **Analytics and process mining** — event streams are a natural input for analytics pipelines
* **Compliance in regulated domains** — immutable event logs satisfy many audit requirements

Avoid event sourcing when:

* Simple CRUD with no audit requirement
* The team is not familiar with the pattern and the system is under time pressure
* The query patterns are simple and a current-state model serves them well
* Storage and replay performance have not been thought through

***

## Lessons Learned

* **The audit trail problem is the best entry point into event sourcing.** If a stakeholder asks "can we see who changed this order?", that is the signal.
* **Events should be named from the domain's perspective, not the technical perspective.** `OrderVoided`, not `StatusUpdated`. `DiscountApplied`, not `DiscountPercentSet`.
* **Immutability is non-negotiable.** If you start deleting or updating events, you lose the history guarantee that makes event sourcing valuable.
* **Start without projections, add them when read performance demands it.** Replaying 50 events per request is fine. Replaying 5,000 is not.
* **Upcasting is the migration path.** When an event's structure needs to change, write an upcaster that transforms old event payloads to the new format on read — never modify existing events.
