# Data Architecture Patterns

It was 11 PM on a Friday when I realized we had a serious problem. A customer's order showed "Mohinga - 3500 MMK" but our Inventory Service said the current price was 4000 MMK. The order was placed two weeks ago, but now we couldn't tell what the customer actually paid because we'd joined live data from the Inventory Service.

That's when I learned data architecture isn't just about choosing databases—it's about **data ownership, consistency models, and accepting trade-offs** between perfect accuracy and system independence.

## The Problem: Shared Database Chaos

Initially, all services shared one PostgreSQL database:

```python
# All services accessing the same database - BAD
from shared_db import SessionLocal, Product, Order

def create_order(product_id: str):
    db = SessionLocal()
    
    # Order Service reading Product table (owned by Inventory Service)
    product = db.query(Product).filter(Product.id == product_id).first()
    
    order = Order(
        product_id=product.id,
        product_name=product.name,  # ← Reading another service's data
        price=product.price,         # ← Will change when Inventory updates it
    )
    db.add(order)
    db.commit()
```

**Problems we hit:**

1. **Tight coupling**: Can't deploy Inventory Service without coordinating with Order Service
2. **Schema conflicts**: Inventory wants to change `Product.price` type, breaks Order queries
3. **Performance**: Inventory's slow product search query locks tables, slowing down order creation
4. **No clear ownership**: Who owns the `products` table?

## Database-Per-Service Pattern

Each service owns its database and data:

{% @mermaid/diagram content="graph TB
subgraph "Auth Service - Port 4001"
AUTH\[Auth API]
AUTH\_DB\[(PostgreSQL<br/>users, roles, permissions)]
end

```
subgraph "POS Core - Port 4002"
    CORE[POS Core API]
    CORE_DB[(PostgreSQL<br/>orders, sales, transactions)]
end

subgraph "Inventory - Port 4003"
    INV[Inventory API]
    INV_DB[(MongoDB<br/>products, stock, categories)]
end

subgraph "Payment - Port 4004"
    PAY[Payment API]
    PAY_DB[(PostgreSQL<br/>payments, refunds)]
end

subgraph "Restaurant - Port 4005"
    REST[Restaurant API]
    REST_DB[(PostgreSQL<br/>menus, tables)]
end

AUTH --> AUTH_DB
CORE --> CORE_DB
INV --> INV_DB
PAY --> PAY_DB
REST --> REST_DB" %}
```

### Why Different Databases?

**PostgreSQL for transactional services:**

* Auth Service: ACID for user data
* POS Core: Strong consistency for financial records
* Payment Service: Transactional integrity
* Restaurant Service: Relational menu structure

**MongoDB for Inventory Service:**

```python
# Flexible product schema - why we chose MongoDB
{
    "_id": "prod-123",
    "tenant_id": "restaurant-1",
    "name": {
        "en": "Mohinga",
        "my": "မုန့်ဟင်းခါး"  # Multi-language support
    },
    "price": 3500,
    "variants": [  # Flexible nested structure
        {
            "size": "small",
            "price": 3000,
            "stock": 10
        },
        {
            "size": "large",
            "price": 4000,
            "stock": 5,
            "options": ["extra_noodles", "extra_fish_cake"]  # Dynamic options
        }
    ],
    "nutritional_info": {  # Schema varies by product type
        "calories": 400,
        "protein": "15g"
    },
    "seasonal": true  # Fields added as needed
}
```

Product schemas vary wildly across restaurants—some need nutritional info, some need halal certification, some need multiple photos. MongoDB's flexibility was essential.

## Data Duplication: The Necessary Evil

When POS Core creates an order, it **duplicates** product data:

```python
# pos_core/services/order_service.py
from typing import Optional
from pydantic import BaseModel
import httpx

class ProductSnapshot(BaseModel):
    """Snapshot of product at order time"""
    product_id: str
    name: str
    price: float
    category: str

class OrderItem(BaseModel):
    id: str
    tenant_id: str
    order_id: str
    # Duplicated product data (not a foreign key!)
    product_snapshot: ProductSnapshot
    quantity: int
    subtotal: float

async def create_order_item(
    tenant_id: str,
    order_id: str,
    product_id: str,
    quantity: int
) -> OrderItem:
    # Fetch product data from Inventory Service
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"http://inventory-service:4003/api/products/{product_id}",
            headers={"x-tenant-id": tenant_id},
            timeout=5.0
        )
        product = response.json()
    
    # Create snapshot - data duplicated at this moment
    snapshot = ProductSnapshot(
        product_id=product["id"],
        name=product["name"]["en"],  # English name
        price=product["price"],
        category=product["category"]
    )
    
    order_item = OrderItem(
        id=generate_id(),
        tenant_id=tenant_id,
        order_id=order_id,
        product_snapshot=snapshot,  # ← Duplicated, not referenced
        quantity=quantity,
        subtotal=snapshot.price * quantity
    )
    
    await save_order_item(order_item)
    return order_item
```

**Why duplicate?**

1. **Historical accuracy**: Order shows what customer paid, not current price
2. **Service independence**: POS Core doesn't break if Inventory Service is down
3. **Performance**: No join across services at query time
4. **Auditability**: Legal requirement to preserve order details exactly as sold

**Trade-off**: Data might be "stale" (product renamed, but old orders show old name). This is acceptable for our business requirements.

## Eventual Consistency

When product stock changes, systems don't update instantly:

```python
# inventory/services/stock_service.py
from typing import List
from events import EventBus, InventoryStockChanged

class StockService:
    def __init__(self, event_bus: EventBus):
        self.event_bus = event_bus
    
    async def update_stock(
        self,
        tenant_id: str,
        product_id: str,
        quantity_change: int
    ):
        # Update MongoDB
        product = await self.product_repo.get(product_id)
        product.stock += quantity_change
        await self.product_repo.save(product)
        
        # Publish event (async - doesn't wait)
        event = InventoryStockChanged(
            tenant_id=tenant_id,
            product_id=product_id,
            new_stock=product.stock,
            timestamp=datetime.utcnow()
        )
        await self.event_bus.publish(event)
        
        # Inventory Service is updated ✅
        # POS Core might not know yet ⏳
        # Eventually consistent
```

POS Core subscribes to events:

```python
# pos_core/event_handlers.py
from events import EventBus, InventoryStockChanged

class InventoryEventHandler:
    def __init__(self, cache_service):
        self.cache = cache_service
    
    async def handle_stock_changed(self, event: InventoryStockChanged):
        # Eventually update our cache
        cache_key = f"tenant:{event.tenant_id}:stock:{event.product_id}"
        await self.cache.set(cache_key, event.new_stock, ttl=300)
        
        # If stock is low, create alert
        if event.new_stock < 5:
            await self.create_low_stock_alert(event)

# Register handler
event_bus = EventBus()
handler = InventoryEventHandler(cache_service)
event_bus.subscribe("inventory.stock_changed", handler.handle_stock_changed)
```

**Eventual consistency window**: 50-500ms between Inventory update and POS Core knowing about it. For our business, this is acceptable.

## The Saga Pattern: Coordinating Distributed Transactions

When an order is placed, we need to:

1. Reserve inventory
2. Process payment
3. Create order record

But we can't use database transactions across services. Enter the **Saga pattern**:

```python
# pos_core/sagas/order_saga.py
from typing import Dict, Any
from enum import Enum
import httpx

class SagaStep(Enum):
    STARTED = "started"
    INVENTORY_RESERVED = "inventory_reserved"
    PAYMENT_PROCESSED = "payment_processed"
    ORDER_CREATED = "order_created"
    FAILED = "failed"

class OrderSaga:
    """Orchestrates order creation across services"""
    
    def __init__(self):
        self.inventory_url = "http://inventory-service:4003"
        self.payment_url = "http://payment-service:4004"
    
    async def execute(
        self,
        tenant_id: str,
        customer_id: str,
        items: List[Dict[str, Any]],
        payment_method: str
    ) -> Dict[str, Any]:
        saga_state = {"step": SagaStep.STARTED}
        
        try:
            # Step 1: Reserve inventory
            reservation = await self._reserve_inventory(tenant_id, items)
            saga_state["step"] = SagaStep.INVENTORY_RESERVED
            saga_state["reservation_id"] = reservation["id"]
            
            # Step 2: Process payment
            total = sum(item["price"] * item["quantity"] for item in items)
            payment = await self._process_payment(
                tenant_id,
                customer_id,
                total,
                payment_method
            )
            saga_state["step"] = SagaStep.PAYMENT_PROCESSED
            saga_state["payment_id"] = payment["id"]
            
            # Step 3: Create order
            order = await self._create_order(
                tenant_id,
                customer_id,
                items,
                payment["id"],
                reservation["id"]
            )
            saga_state["step"] = SagaStep.ORDER_CREATED
            
            return {
                "success": True,
                "order_id": order["id"],
                "payment_id": payment["id"]
            }
            
        except Exception as e:
            # Compensating transactions (rollback)
            saga_state["step"] = SagaStep.FAILED
            saga_state["error"] = str(e)
            await self._compensate(saga_state)
            raise
    
    async def _reserve_inventory(
        self,
        tenant_id: str,
        items: List[Dict]
    ) -> Dict:
        """Reserve stock in Inventory Service"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.inventory_url}/api/reservations",
                json={"items": items},
                headers={"x-tenant-id": tenant_id},
                timeout=10.0
            )
            response.raise_for_status()
            return response.json()
    
    async def _process_payment(
        self,
        tenant_id: str,
        customer_id: str,
        amount: float,
        method: str
    ) -> Dict:
        """Process payment in Payment Service"""
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{self.payment_url}/api/payments",
                json={
                    "customer_id": customer_id,
                    "amount": amount,
                    "method": method
                },
                headers={"x-tenant-id": tenant_id},
                timeout=15.0
            )
            response.raise_for_status()
            return response.json()
    
    async def _create_order(
        self,
        tenant_id: str,
        customer_id: str,
        items: List[Dict],
        payment_id: str,
        reservation_id: str
    ) -> Dict:
        """Create order record in POS Core database"""
        order = {
            "id": generate_id(),
            "tenant_id": tenant_id,
            "customer_id": customer_id,
            "items": items,
            "payment_id": payment_id,
            "reservation_id": reservation_id,
            "status": "completed",
            "created_at": datetime.utcnow()
        }
        await db.orders.insert_one(order)
        return order
    
    async def _compensate(self, saga_state: Dict):
        """Rollback on failure"""
        print(f"Saga failed at step: {saga_state['step']}")
        
        # If payment was processed, refund it
        if saga_state["step"] in [SagaStep.PAYMENT_PROCESSED, SagaStep.ORDER_CREATED]:
            payment_id = saga_state.get("payment_id")
            if payment_id:
                await self._refund_payment(payment_id)
        
        # If inventory was reserved, release it
        if "reservation_id" in saga_state:
            await self._release_inventory(saga_state["reservation_id"])
    
    async def _refund_payment(self, payment_id: str):
        """Compensating transaction: refund"""
        async with httpx.AsyncClient() as client:
            await client.post(
                f"{self.payment_url}/api/payments/{payment_id}/refund",
                timeout=10.0
            )
    
    async def _release_inventory(self, reservation_id: str):
        """Compensating transaction: release reserved stock"""
        async with httpx.AsyncClient() as client:
            await client.delete(
                f"{self.inventory_url}/api/reservations/{reservation_id}",
                timeout=10.0
            )
```

**Saga execution flow:**

{% @mermaid/diagram content="sequenceDiagram
participant Client
participant POS Core
participant Inventory
participant Payment

```
Client->>POS Core: Create Order
POS Core->>Inventory: Reserve Stock
alt Success
    Inventory-->>POS Core: Reservation OK
    POS Core->>Payment: Process Payment
    alt Success
        Payment-->>POS Core: Payment OK
        POS Core->>POS Core: Create Order Record
        POS Core-->>Client: Order Created ✅
    else Payment Failed
        Payment-->>POS Core: Payment Failed ❌
        POS Core->>Inventory: Release Reservation
        POS Core-->>Client: Order Failed
    end
else Inventory Failed
    Inventory-->>POS Core: Out of Stock ❌
    POS Core-->>Client: Order Failed
end" %}
```

## When NOT to Use Database-Per-Service

This pattern isn't always right:

**Don't use if:**

* ❌ Single service application (over-engineering)
* ❌ Need true ACID transactions (banking systems)
* ❌ Complex joins across all data (reporting/analytics)
* ❌ Small team without ops capacity

**Do use if:**

* ✅ Multiple services with clear boundaries
* ✅ Independent scaling requirements
* ✅ Different data access patterns
* ✅ Team autonomy is important

## Production Lessons

### Lesson 1: Data Migration is Hard

When we split the monolith database:

```python
# Migration script - ran at 3 AM on Sunday
async def migrate_shared_db_to_service_dbs():
    # Copy users to Auth Service DB
    users = await monolith_db.users.find({})
    for user in users:
        await auth_db.users.insert_one(user)
    
    # Copy products to Inventory Service DB  
    products = await monolith_db.products.find({})
    for product in products:
        await inventory_db.products.insert_one(product)
    
    # Copy orders to POS Core DB (with denormalized product data!)
    orders = await monolith_db.orders.find({})
    for order in orders:
        # Enrich with product data
        for item in order["items"]:
            product = await monolith_db.products.find_one({"_id": item["product_id"]})
            item["product_snapshot"] = {
                "name": product["name"],
                "price": product["price_at_order_time"] or product["current_price"]
            }
        await pos_core_db.orders.insert_one(order)
```

**Challenges:**

* Data inconsistencies discovered during migration
* Downtime required (we chose 3 AM Sunday)
* Rollback plan needed (we kept monolith DB for 2 weeks)

### Lesson 2: Accept Eventual Consistency

One customer complained: "I see 5 items in stock but checkout says out of stock!"

**What happened:**

1. Customer loaded product page (cached: 5 in stock)
2. Another customer bought all 5 (Inventory Service updated)
3. First customer tried to checkout (POS Core cache not updated yet)

**Solution**: Better UX messaging:

```python
# Instead of "Out of stock" error at checkout:
"Stock levels changed. Please refresh and try again."
```

### Lesson 3: Query Complexity Increases

Reporting queries that were simple joins became complex aggregations:

```python
# Old monolith: simple join
SELECT o.id, o.created_at, p.name, p.price
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.tenant_id = 'restaurant-1';

# New distributed: multiple service calls
async def get_order_report(tenant_id: str):
    # Get orders from POS Core
    orders = await pos_core_client.get(f"/orders?tenant_id={tenant_id}")
    
    # Get product details from Inventory (if needed)
    # Luckily we store product_snapshot in orders!
    
    return [{
        "order_id": o["id"],
        "created_at": o["created_at"],
        "product_name": o["items"][0]["product_snapshot"]["name"],
        "price": o["items"][0]["product_snapshot"]["price"]
    } for o in orders]
```

## Key Learnings

### ✅ Data Ownership

* Each service owns its data completely
* Other services access via API, never direct database queries
* Clear ownership prevents conflicts

### ✅ Polyglot Persistence

* Use the right database for each service's needs
* PostgreSQL for transactions
* MongoDB for flexible schemas

### ✅ Data Duplication is OK

* Duplicate data for service independence
* Snapshots preserve historical accuracy
* Trade consistency for availability

### ✅ Eventual Consistency

* Accept temporary inconsistency
* Use events to propagate changes
* Business requirements determine acceptable delay

### ✅ Sagas for Distributed Transactions

* Orchestrate multi-service operations
* Implement compensating transactions
* Handle partial failures gracefully

## Common Mistakes

**❌ Sharing databases "just this once"**

* Slippery slope back to tight coupling

**❌ No compensating transactions**

* Saga failures leave inconsistent state

**❌ Over-normalizing data**

* Join across services is expensive, duplicate when needed

**❌ Ignoring migration complexity**

* Splitting databases is harder than splitting code

## Next Steps

We've designed data architecture, but services still need to communicate about changes in real-time. Polling databases isn't the answer.

Next, we'll explore **Event-Driven Architecture**—how Inventory Service tells POS Core about stock changes without tight coupling.

**Continue to**: [Event-Driven Architecture Basics](https://blog.htunnthuthu.com/architecture-and-design/architecture-and-patterns/software-architecture-101/08-event-driven-architecture-basics)
