It was 11 PM on a Friday when I realized we had a serious problem. A customer's order showed "Mohinga - 3500 MMK" but our Inventory Service said the current price was 4000 MMK. The order was placed two weeks ago, but now we couldn't tell what the customer actually paid because we'd joined live data from the Inventory Service.
That's when I learned data architecture isn't just about choosing databasesβit's about data ownership, consistency models, and accepting trade-offs between perfect accuracy and system independence.
The Problem: Shared Database Chaos
Initially, all services shared one PostgreSQL database:
# All services accessing the same database - BADfrom shared_db import SessionLocal, Product, Orderdefcreate_order(product_id:str): db =SessionLocal()# Order Service reading Product table (owned by Inventory Service) product = db.query(Product).filter(Product.id == product_id).first() order =Order(product_id=product.id,product_name=product.name,# β Reading another service's dataprice=product.price,# β Will change when Inventory updates it) db.add(order) db.commit()
Problems we hit:
Tight coupling: Can't deploy Inventory Service without coordinating with Order Service
Schema conflicts: Inventory wants to change Product.price type, breaks Order queries
Performance: Inventory's slow product search query locks tables, slowing down order creation
No clear ownership: Who owns the products table?
Database-Per-Service Pattern
Each service owns its database and data:
Why Different Databases?
PostgreSQL for transactional services:
Auth Service: ACID for user data
POS Core: Strong consistency for financial records
Payment Service: Transactional integrity
Restaurant Service: Relational menu structure
MongoDB for Inventory Service:
Product schemas vary wildly across restaurantsβsome need nutritional info, some need halal certification, some need multiple photos. MongoDB's flexibility was essential.
Data Duplication: The Necessary Evil
When POS Core creates an order, it duplicates product data:
Why duplicate?
Historical accuracy: Order shows what customer paid, not current price
Service independence: POS Core doesn't break if Inventory Service is down
Performance: No join across services at query time
Auditability: Legal requirement to preserve order details exactly as sold
Trade-off: Data might be "stale" (product renamed, but old orders show old name). This is acceptable for our business requirements.
Eventual Consistency
When product stock changes, systems don't update instantly:
POS Core subscribes to events:
Eventual consistency window: 50-500ms between Inventory update and POS Core knowing about it. For our business, this is acceptable.
The Saga Pattern: Coordinating Distributed Transactions
When an order is placed, we need to:
Reserve inventory
Process payment
Create order record
But we can't use database transactions across services. Enter the Saga pattern:
Saga execution flow:
When NOT to Use Database-Per-Service
This pattern isn't always right:
Don't use if:
β Single service application (over-engineering)
β Need true ACID transactions (banking systems)
β Complex joins across all data (reporting/analytics)
β Small team without ops capacity
Do use if:
β Multiple services with clear boundaries
β Independent scaling requirements
β Different data access patterns
β Team autonomy is important
Production Lessons
Lesson 1: Data Migration is Hard
When we split the monolith database:
Challenges:
Data inconsistencies discovered during migration
Downtime required (we chose 3 AM Sunday)
Rollback plan needed (we kept monolith DB for 2 weeks)
Lesson 2: Accept Eventual Consistency
One customer complained: "I see 5 items in stock but checkout says out of stock!"
What happened:
Customer loaded product page (cached: 5 in stock)
Another customer bought all 5 (Inventory Service updated)
First customer tried to checkout (POS Core cache not updated yet)
Solution: Better UX messaging:
Lesson 3: Query Complexity Increases
Reporting queries that were simple joins became complex aggregations:
Key Learnings
β Data Ownership
Each service owns its data completely
Other services access via API, never direct database queries
Clear ownership prevents conflicts
β Polyglot Persistence
Use the right database for each service's needs
PostgreSQL for transactions
MongoDB for flexible schemas
β Data Duplication is OK
Duplicate data for service independence
Snapshots preserve historical accuracy
Trade consistency for availability
β Eventual Consistency
Accept temporary inconsistency
Use events to propagate changes
Business requirements determine acceptable delay
β Sagas for Distributed Transactions
Orchestrate multi-service operations
Implement compensating transactions
Handle partial failures gracefully
Common Mistakes
β Sharing databases "just this once"
Slippery slope back to tight coupling
β No compensating transactions
Saga failures leave inconsistent state
β Over-normalizing data
Join across services is expensive, duplicate when needed
β Ignoring migration complexity
Splitting databases is harder than splitting code
Next Steps
We've designed data architecture, but services still need to communicate about changes in real-time. Polling databases isn't the answer.
Next, we'll explore Event-Driven Architectureβhow Inventory Service tells POS Core about stock changes without tight coupling.
# pos_core/services/order_service.py
from typing import Optional
from pydantic import BaseModel
import httpx
class ProductSnapshot(BaseModel):
"""Snapshot of product at order time"""
product_id: str
name: str
price: float
category: str
class OrderItem(BaseModel):
id: str
tenant_id: str
order_id: str
# Duplicated product data (not a foreign key!)
product_snapshot: ProductSnapshot
quantity: int
subtotal: float
async def create_order_item(
tenant_id: str,
order_id: str,
product_id: str,
quantity: int
) -> OrderItem:
# Fetch product data from Inventory Service
async with httpx.AsyncClient() as client:
response = await client.get(
f"http://inventory-service:4003/api/products/{product_id}",
headers={"x-tenant-id": tenant_id},
timeout=5.0
)
product = response.json()
# Create snapshot - data duplicated at this moment
snapshot = ProductSnapshot(
product_id=product["id"],
name=product["name"]["en"], # English name
price=product["price"],
category=product["category"]
)
order_item = OrderItem(
id=generate_id(),
tenant_id=tenant_id,
order_id=order_id,
product_snapshot=snapshot, # β Duplicated, not referenced
quantity=quantity,
subtotal=snapshot.price * quantity
)
await save_order_item(order_item)
return order_item
# inventory/services/stock_service.py
from typing import List
from events import EventBus, InventoryStockChanged
class StockService:
def __init__(self, event_bus: EventBus):
self.event_bus = event_bus
async def update_stock(
self,
tenant_id: str,
product_id: str,
quantity_change: int
):
# Update MongoDB
product = await self.product_repo.get(product_id)
product.stock += quantity_change
await self.product_repo.save(product)
# Publish event (async - doesn't wait)
event = InventoryStockChanged(
tenant_id=tenant_id,
product_id=product_id,
new_stock=product.stock,
timestamp=datetime.utcnow()
)
await self.event_bus.publish(event)
# Inventory Service is updated β
# POS Core might not know yet β³
# Eventually consistent
# pos_core/event_handlers.py
from events import EventBus, InventoryStockChanged
class InventoryEventHandler:
def __init__(self, cache_service):
self.cache = cache_service
async def handle_stock_changed(self, event: InventoryStockChanged):
# Eventually update our cache
cache_key = f"tenant:{event.tenant_id}:stock:{event.product_id}"
await self.cache.set(cache_key, event.new_stock, ttl=300)
# If stock is low, create alert
if event.new_stock < 5:
await self.create_low_stock_alert(event)
# Register handler
event_bus = EventBus()
handler = InventoryEventHandler(cache_service)
event_bus.subscribe("inventory.stock_changed", handler.handle_stock_changed)
# pos_core/sagas/order_saga.py
from typing import Dict, Any
from enum import Enum
import httpx
class SagaStep(Enum):
STARTED = "started"
INVENTORY_RESERVED = "inventory_reserved"
PAYMENT_PROCESSED = "payment_processed"
ORDER_CREATED = "order_created"
FAILED = "failed"
class OrderSaga:
"""Orchestrates order creation across services"""
def __init__(self):
self.inventory_url = "http://inventory-service:4003"
self.payment_url = "http://payment-service:4004"
async def execute(
self,
tenant_id: str,
customer_id: str,
items: List[Dict[str, Any]],
payment_method: str
) -> Dict[str, Any]:
saga_state = {"step": SagaStep.STARTED}
try:
# Step 1: Reserve inventory
reservation = await self._reserve_inventory(tenant_id, items)
saga_state["step"] = SagaStep.INVENTORY_RESERVED
saga_state["reservation_id"] = reservation["id"]
# Step 2: Process payment
total = sum(item["price"] * item["quantity"] for item in items)
payment = await self._process_payment(
tenant_id,
customer_id,
total,
payment_method
)
saga_state["step"] = SagaStep.PAYMENT_PROCESSED
saga_state["payment_id"] = payment["id"]
# Step 3: Create order
order = await self._create_order(
tenant_id,
customer_id,
items,
payment["id"],
reservation["id"]
)
saga_state["step"] = SagaStep.ORDER_CREATED
return {
"success": True,
"order_id": order["id"],
"payment_id": payment["id"]
}
except Exception as e:
# Compensating transactions (rollback)
saga_state["step"] = SagaStep.FAILED
saga_state["error"] = str(e)
await self._compensate(saga_state)
raise
async def _reserve_inventory(
self,
tenant_id: str,
items: List[Dict]
) -> Dict:
"""Reserve stock in Inventory Service"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.inventory_url}/api/reservations",
json={"items": items},
headers={"x-tenant-id": tenant_id},
timeout=10.0
)
response.raise_for_status()
return response.json()
async def _process_payment(
self,
tenant_id: str,
customer_id: str,
amount: float,
method: str
) -> Dict:
"""Process payment in Payment Service"""
async with httpx.AsyncClient() as client:
response = await client.post(
f"{self.payment_url}/api/payments",
json={
"customer_id": customer_id,
"amount": amount,
"method": method
},
headers={"x-tenant-id": tenant_id},
timeout=15.0
)
response.raise_for_status()
return response.json()
async def _create_order(
self,
tenant_id: str,
customer_id: str,
items: List[Dict],
payment_id: str,
reservation_id: str
) -> Dict:
"""Create order record in POS Core database"""
order = {
"id": generate_id(),
"tenant_id": tenant_id,
"customer_id": customer_id,
"items": items,
"payment_id": payment_id,
"reservation_id": reservation_id,
"status": "completed",
"created_at": datetime.utcnow()
}
await db.orders.insert_one(order)
return order
async def _compensate(self, saga_state: Dict):
"""Rollback on failure"""
print(f"Saga failed at step: {saga_state['step']}")
# If payment was processed, refund it
if saga_state["step"] in [SagaStep.PAYMENT_PROCESSED, SagaStep.ORDER_CREATED]:
payment_id = saga_state.get("payment_id")
if payment_id:
await self._refund_payment(payment_id)
# If inventory was reserved, release it
if "reservation_id" in saga_state:
await self._release_inventory(saga_state["reservation_id"])
async def _refund_payment(self, payment_id: str):
"""Compensating transaction: refund"""
async with httpx.AsyncClient() as client:
await client.post(
f"{self.payment_url}/api/payments/{payment_id}/refund",
timeout=10.0
)
async def _release_inventory(self, reservation_id: str):
"""Compensating transaction: release reserved stock"""
async with httpx.AsyncClient() as client:
await client.delete(
f"{self.inventory_url}/api/reservations/{reservation_id}",
timeout=10.0
)
# Migration script - ran at 3 AM on Sunday
async def migrate_shared_db_to_service_dbs():
# Copy users to Auth Service DB
users = await monolith_db.users.find({})
for user in users:
await auth_db.users.insert_one(user)
# Copy products to Inventory Service DB
products = await monolith_db.products.find({})
for product in products:
await inventory_db.products.insert_one(product)
# Copy orders to POS Core DB (with denormalized product data!)
orders = await monolith_db.orders.find({})
for order in orders:
# Enrich with product data
for item in order["items"]:
product = await monolith_db.products.find_one({"_id": item["product_id"]})
item["product_snapshot"] = {
"name": product["name"],
"price": product["price_at_order_time"] or product["current_price"]
}
await pos_core_db.orders.insert_one(order)
# Instead of "Out of stock" error at checkout:
"Stock levels changed. Please refresh and try again."
# Old monolith: simple join
SELECT o.id, o.created_at, p.name, p.price
FROM orders o
JOIN products p ON o.product_id = p.id
WHERE o.tenant_id = 'restaurant-1';
# New distributed: multiple service calls
async def get_order_report(tenant_id: str):
# Get orders from POS Core
orders = await pos_core_client.get(f"/orders?tenant_id={tenant_id}")
# Get product details from Inventory (if needed)
# Luckily we store product_snapshot in orders!
return [{
"order_id": o["id"],
"created_at": o["created_at"],
"product_name": o["items"][0]["product_snapshot"]["name"],
"price": o["items"][0]["product_snapshot"]["price"]
} for o in orders]