# Event-Driven Architecture Basics

## Table of Contents

* [Introduction](#introduction)
* [Understanding Events in Distributed Systems](#understanding-events-in-distributed-systems)
* [Event Types in the POS System](#event-types-in-the-pos-system)
* [Implementing an In-Process Event Bus](#implementing-an-in-process-event-bus)
* [Publisher-Subscriber Pattern](#publisher-subscriber-pattern)
* [Event Versioning for Multi-Tenancy](#event-versioning-for-multi-tenancy)
* [Production Lessons Learned](#production-lessons-learned)
* [Best Practices](#best-practices)

## Introduction

When building my multi-tenant POS system with 6 microservices, I quickly learned that direct service-to-service HTTP calls created tight coupling. Every time inventory changed, I had to update the POS Core service manually. When a payment completed, I needed to notify multiple services. This tight coupling made changes risky and deployments complicated.

Event-driven architecture (EDA) changed everything. Instead of services calling each other directly, they publish events when something important happens and subscribe to events they care about. This decoupling allowed services to evolve independently while maintaining consistency across the system.

In this article, I'll share how I implemented event-driven patterns in my POS system, starting with a simple in-process event bus and evolving to handle multi-tenant scenarios. We'll use Python to build practical, production-ready event infrastructure.

## Understanding Events in Distributed Systems

Events represent **facts** about things that have happened in your system. Unlike commands (which request an action), events are notifications about completed state changes.

Key characteristics of events:

* **Immutable**: Events describe the past and cannot be changed
* **Named in past tense**: `OrderPlaced`, `PaymentCompleted`, `InventoryUpdated`
* **Contain sufficient context**: Include all data needed by subscribers
* **Fire-and-forget**: Publishers don't wait for subscriber responses

In my POS system, I identified three categories of events:

1. **Domain Events**: Business-meaningful changes (OrderPlaced, ProductOutOfStock)
2. **Integration Events**: Cross-service notifications (InventoryStockChanged, PaymentProcessed)
3. **System Events**: Infrastructure concerns (ServiceStarted, CacheInvalidated)

## Event Types in the POS System

Let me show you the core events flowing through my 6-microservice architecture:

```python
# domain/events.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, Dict, Any
from uuid import uuid4

@dataclass
class DomainEvent:
    """Base class for all domain events."""
    event_id: str = field(default_factory=lambda: str(uuid4()))
    event_type: str = ""
    tenant_id: str = ""
    occurred_at: datetime = field(default_factory=datetime.utcnow)
    version: int = 1
    metadata: Dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if not self.event_type:
            self.event_type = self.__class__.__name__


# Inventory Service Events
@dataclass
class InventoryStockChanged(DomainEvent):
    """Published when product stock levels change."""
    product_id: str = ""
    sku: str = ""
    previous_quantity: int = 0
    new_quantity: int = 0
    change_reason: str = ""  # 'sale', 'restock', 'adjustment'
    changed_by: str = ""

    def is_out_of_stock(self) -> bool:
        return self.new_quantity == 0

    def is_low_stock(self, threshold: int = 10) -> bool:
        return 0 < self.new_quantity <= threshold


# Payment Service Events
@dataclass
class PaymentCompleted(DomainEvent):
    """Published when payment processing succeeds."""
    payment_id: str = ""
    order_id: str = ""
    amount: float = 0.0
    currency: str = "USD"
    payment_method: str = ""  # 'card', 'cash', 'mobile'
    transaction_id: str = ""


@dataclass
class PaymentFailed(DomainEvent):
    """Published when payment processing fails."""
    payment_id: str = ""
    order_id: str = ""
    amount: float = 0.0
    failure_reason: str = ""
    retry_count: int = 0


# POS Core Service Events
@dataclass
class OrderPlaced(DomainEvent):
    """Published when a new order is created."""
    order_id: str = ""
    customer_id: Optional[str] = None
    total_amount: float = 0.0
    items: list = field(default_factory=list)
    order_type: str = "dine_in"  # 'dine_in', 'takeout', 'delivery'


@dataclass
class OrderCompleted(DomainEvent):
    """Published when order fulfillment is complete."""
    order_id: str = ""
    completion_time: datetime = field(default_factory=datetime.utcnow)
    fulfilled_by: str = ""


# Restaurant Service Events
@dataclass
class TableStatusChanged(DomainEvent):
    """Published when table status changes."""
    table_id: str = ""
    previous_status: str = ""
    new_status: str = ""  # 'available', 'occupied', 'reserved'
    party_size: Optional[int] = None
```

This event structure gives me everything I need:

* **Tenant isolation**: Every event carries `tenant_id`
* **Traceability**: Unique `event_id` and timestamp
* **Versioning**: Support for event schema evolution
* **Type safety**: Python dataclasses with validation

## Implementing an In-Process Event Bus

For services that need internal event handling (before implementing a message broker), I built a simple in-process event bus:

```python
# infrastructure/event_bus.py
from typing import Dict, List, Callable, Type
from collections import defaultdict
import asyncio
import logging
from domain.events import DomainEvent

logger = logging.getLogger(__name__)


class EventBus:
    """Simple in-process event bus for pub/sub within a single service."""
    
    def __init__(self):
        self._handlers: Dict[Type[DomainEvent], List[Callable]] = defaultdict(list)
        self._middleware: List[Callable] = []
    
    def subscribe(self, event_type: Type[DomainEvent], handler: Callable):
        """Subscribe a handler to an event type."""
        self._handlers[event_type].append(handler)
        logger.info(f"Subscribed {handler.__name__} to {event_type.__name__}")
    
    def add_middleware(self, middleware: Callable):
        """Add middleware that runs for every event."""
        self._middleware.append(middleware)
    
    async def publish(self, event: DomainEvent):
        """
        Publish an event to all subscribed handlers.
        Runs middleware first, then handlers in parallel.
        """
        logger.info(
            f"Publishing event: {event.event_type} "
            f"[id={event.event_id}, tenant={event.tenant_id}]"
        )
        
        # Run middleware
        for middleware in self._middleware:
            try:
                await middleware(event)
            except Exception as e:
                logger.error(f"Middleware error: {e}", exc_info=True)
        
        # Get handlers for this event type
        handlers = self._handlers.get(type(event), [])
        
        if not handlers:
            logger.warning(f"No handlers for event type: {event.event_type}")
            return
        
        # Execute all handlers in parallel
        tasks = [self._run_handler(handler, event) for handler in handlers]
        await asyncio.gather(*tasks, return_exceptions=True)
    
    async def _run_handler(self, handler: Callable, event: DomainEvent):
        """Run a single handler with error isolation."""
        try:
            if asyncio.iscoroutinefunction(handler):
                await handler(event)
            else:
                handler(event)
            logger.debug(f"Handler {handler.__name__} completed for {event.event_id}")
        except Exception as e:
            logger.error(
                f"Handler {handler.__name__} failed for event {event.event_id}: {e}",
                exc_info=True
            )


# Global event bus instance
event_bus = EventBus()
```

This simple event bus gives me:

* **Async support**: Handlers run concurrently
* **Error isolation**: One handler failure doesn't affect others
* **Middleware**: Cross-cutting concerns (logging, metrics)
* **Type safety**: Handlers subscribe to specific event types

## Publisher-Subscriber Pattern

Here's how I use the event bus in the Inventory Service:

```python
# services/inventory_service.py
from typing import Optional
from domain.events import InventoryStockChanged
from infrastructure.event_bus import event_bus
import logging

logger = logging.getLogger(__name__)


class InventoryService:
    """Manages product inventory and publishes stock change events."""
    
    def __init__(self, db_session):
        self.db = db_session
    
    async def update_stock(
        self,
        tenant_id: str,
        product_id: str,
        quantity_change: int,
        reason: str,
        changed_by: str
    ) -> bool:
        """
        Update product stock and publish event.
        Returns True if successful.
        """
        # Get current stock
        product = await self.db.get_product(tenant_id, product_id)
        if not product:
            logger.warning(f"Product not found: {product_id}")
            return False
        
        previous_quantity = product.quantity
        new_quantity = previous_quantity + quantity_change
        
        # Validate stock levels
        if new_quantity < 0:
            logger.error(f"Insufficient stock for {product_id}: {previous_quantity}")
            return False
        
        # Update database
        await self.db.update_product_quantity(
            tenant_id=tenant_id,
            product_id=product_id,
            quantity=new_quantity
        )
        
        # Publish event AFTER successful database update
        event = InventoryStockChanged(
            tenant_id=tenant_id,
            product_id=product_id,
            sku=product.sku,
            previous_quantity=previous_quantity,
            new_quantity=new_quantity,
            change_reason=reason,
            changed_by=changed_by
        )
        
        await event_bus.publish(event)
        
        logger.info(
            f"Stock updated for {product.sku}: "
            f"{previous_quantity} -> {new_quantity} ({reason})"
        )
        
        return True


# Event handlers in other parts of the system
async def handle_low_stock_alert(event: InventoryStockChanged):
    """Send alert when stock is low."""
    if event.is_low_stock(threshold=10):
        logger.warning(
            f"LOW STOCK ALERT: {event.sku} has only {event.new_quantity} units "
            f"(tenant: {event.tenant_id})"
        )
        # Could send email, Slack notification, etc.


async def handle_out_of_stock(event: InventoryStockChanged):
    """Update product availability when out of stock."""
    if event.is_out_of_stock():
        logger.error(f"OUT OF STOCK: {event.sku} (tenant: {event.tenant_id})")
        # Could disable product in POS, trigger reorder, etc.


async def update_chatbot_cache(event: InventoryStockChanged):
    """Invalidate chatbot product cache when stock changes."""
    cache_key = f"products:{event.tenant_id}:{event.product_id}"
    await redis_client.delete(cache_key)
    logger.debug(f"Invalidated cache: {cache_key}")


# Subscribe handlers to events
event_bus.subscribe(InventoryStockChanged, handle_low_stock_alert)
event_bus.subscribe(InventoryStockChanged, handle_out_of_stock)
event_bus.subscribe(InventoryStockChanged, update_chatbot_cache)
```

Key patterns I learned:

* **Publish after persistence**: Only publish events after database commits succeed
* **Multiple subscribers**: Different services handle the same event differently
* **Fire and forget**: Publishers don't wait for subscriber responses
* **Idempotency**: Handlers should handle duplicate events gracefully

## Event Versioning for Multi-Tenancy

As my POS system evolved, I needed to change event schemas without breaking existing subscribers. Here's my versioning strategy:

```python
# domain/events_v2.py
from dataclasses import dataclass, field
from domain.events import InventoryStockChanged as InventoryStockChangedV1


@dataclass
class InventoryStockChangedV2(InventoryStockChangedV1):
    """
    Version 2: Added warehouse location and batch number tracking.
    
    Migration story:
    - Some tenants needed warehouse location tracking for multi-location inventory
    - Added optional fields to avoid breaking existing consumers
    """
    warehouse_id: Optional[str] = None
    batch_number: Optional[str] = None
    expiry_date: Optional[datetime] = None
    
    def __post_init__(self):
        super().__post_init__()
        self.version = 2


# Event upcasting: Convert old events to new format
def upcast_inventory_event(event: DomainEvent) -> InventoryStockChangedV2:
    """Convert V1 events to V2 for unified handling."""
    if event.version == 2:
        return event  # Already V2
    
    # Upcast V1 to V2
    return InventoryStockChangedV2(
        event_id=event.event_id,
        tenant_id=event.tenant_id,
        product_id=event.product_id,
        sku=event.sku,
        previous_quantity=event.previous_quantity,
        new_quantity=event.new_quantity,
        change_reason=event.change_reason,
        changed_by=event.changed_by,
        occurred_at=event.occurred_at,
        # New fields default to None
        warehouse_id=None,
        batch_number=None,
        expiry_date=None
    )


# Versioned event handler
async def handle_inventory_changed(event: DomainEvent):
    """Handle any version of InventoryStockChanged."""
    # Upcast to latest version
    event_v2 = upcast_inventory_event(event)
    
    # Now handle with V2 schema
    if event_v2.warehouse_id:
        logger.info(f"Stock changed in warehouse: {event_v2.warehouse_id}")
    
    if event_v2.expiry_date and event_v2.expiry_date < datetime.utcnow():
        logger.warning(f"Expired inventory detected: {event_v2.sku}")
```

This versioning approach lets me:

* **Add new fields** without breaking existing code
* **Upcast old events** to new schema for unified processing
* **Support multiple tenants** with different event versions
* **Evolve schemas** incrementally

## Production Lessons Learned

### Lesson 1: Event Ordering Matters

I discovered ordering issues when processing rapid inventory updates. A sale reducing stock by 5 units followed immediately by a restock of 10 units could arrive out of order, causing incorrect stock levels.

**Solution**: Add sequence numbers to events:

```python
@dataclass
class InventoryStockChanged(DomainEvent):
    sequence_number: int = 0  # Monotonically increasing per product
    
# In the service
async def update_stock(self, ...):
    sequence = await self.db.get_next_sequence(tenant_id, product_id)
    event.sequence_number = sequence
    await event_bus.publish(event)
```

### Lesson 2: Idempotent Event Handlers

During a deployment, some events were published twice. Handlers that weren't idempotent caused duplicate notifications and incorrect calculations.

**Solution**: Track processed events:

```python
async def handle_payment_completed(event: PaymentCompleted):
    """Idempotent payment handler."""
    # Check if already processed
    if await redis_client.exists(f"processed:payment:{event.event_id}"):
        logger.info(f"Event already processed: {event.event_id}")
        return
    
    # Process event
    await send_payment_confirmation_email(event)
    
    # Mark as processed (TTL 24 hours)
    await redis_client.setex(
        f"processed:payment:{event.event_id}",
        86400,
        "1"
    )
```

### Lesson 3: Event Monitoring

Without visibility into event flow, debugging production issues was painful. I added event telemetry:

```python
async def event_monitoring_middleware(event: DomainEvent):
    """Middleware to track event metrics."""
    # Increment event counter
    await metrics.increment(
        f"events.published.{event.event_type}",
        tags={"tenant": event.tenant_id}
    )
    
    # Log to centralized logging
    logger.info(
        "Event published",
        extra={
            "event_id": event.event_id,
            "event_type": event.event_type,
            "tenant_id": event.tenant_id,
            "timestamp": event.occurred_at.isoformat()
        }
    )

event_bus.add_middleware(event_monitoring_middleware)
```

## Best Practices

Based on my production experience with the POS system:

1. **Events are facts, not commands**
   * Name events in past tense: `OrderPlaced`, not `PlaceOrder`
   * Events describe what happened, not what should happen
2. **Include sufficient context**
   * Events should contain all data subscribers need
   * Avoid forcing subscribers to make additional queries
3. **Publish after persistence**
   * Only publish events after database transactions commit
   * Prevents inconsistencies if rollback occurs
4. **Design for failure**
   * Make handlers idempotent (can process same event multiple times safely)
   * Implement retry logic with exponential backoff
   * Use dead-letter queues for permanently failed events
5. **Version your events**
   * Add version field to all events
   * Use optional fields for schema evolution
   * Implement upcasting for backward compatibility
6. **Monitor event flow**
   * Track event publishing and consumption rates
   * Alert on event processing delays
   * Log event lifecycle for debugging
7. **Tenant isolation**
   * Always include `tenant_id` in events
   * Validate tenant context in subscribers
   * Separate event streams per tenant if needed

## Next Steps

In this article, we explored event-driven architecture basics using an in-process event bus. This pattern works well for:

* Internal service communication
* Decoupling components within a service
* Small to medium traffic volumes

For production systems with higher scale, you'll want to introduce a message broker (RabbitMQ, Kafka, AWS SNS/SQS). The event patterns we've learned here translate directly to those systems.

In the next article, we'll explore **Caching & Session Management**, where events play a crucial role in cache invalidation strategies across our distributed POS system.

***

*This is part of the Software Architecture 101 series, where I share lessons learned building a production multi-tenant POS system with 6 microservices.*
