Let me tell you about an expensive lesson. When I started building the POS system, I was fresh off reading about microservices at tech giants. Netflix! Uber! Amazon! They all used microservices, so obviously, my POS system for small restaurants needed them too, right?
Wrong. So incredibly wrong.
For the first three months, my two-person team spent more time:
Debugging network calls between services
Managing six different deployment pipelines
Troubleshooting distributed tracing
Coordinating database schemas across services
...than actually building features customers wanted.
One particularly painful day, I spent 6 hours debugging why orders weren't being created. The issue? The Inventory Service was returning a 200 OK status with an error message in the response body, and the POS Core Service wasn't checking the body. In a monolith, this would have been a function call with a clear exception. In microservices, it was a day of my life I'll never get back.
That's when I discovered the modular monolith—the architecture that could have saved me months of frustration.
What Is a Modular Monolith?
A modular monolith is a single deployable application with well-defined internal module boundaries. Think of it as microservices discipline without microservices complexity.
Key difference from microservices: Everything runs in the same process, uses the same database (or carefully separated schemas), and deploys as one unit. But modules communicate through well-defined interfaces, just like they would in microservices.
The POS System as a Modular Monolith
Here's how I refactored the POS system into a modular monolith:
Project Structure
Core Application Setup
Shared Configuration
Module Example: Auth Module
Module Example: POS Core Module
Internal Event Bus
Tenant Isolation Middleware
Benefits of Modular Monolith
1. Simpler Deployment
2. Easier Debugging
3. Lower Operational Cost
Modular Monolith:
1 server (can scale vertically)
1 database
1 deployment pipeline
1 set of logs to monitor
Microservices:
6+ servers (need orchestration)
6 databases (or complex shared DB)
6 deployment pipelines
6 sets of logs (need aggregation)
Service mesh for communication
Distributed tracing infrastructure
4. ACID Transactions
When to Split Into Microservices
Here are the signals that told me it was time to split:
Signal 1: Team Scaling
Signal 2: Different Scaling Requirements
Signal 3: Different Technology Needs
Migration Pattern: Strangler Fig
When we eventually split into microservices, we used the Strangler Fig pattern:
Key Learnings
Start with modular monolith, not microservices
Easier to refactor modules than distributed services
Can split later when you have real data about bottlenecks
Well-defined module boundaries are critical
Each module should have clear responsibilities
Communication through interfaces (service layer)
No direct database access across modules
Internal API discipline prepares for microservices
Design modules like they're separate services
Makes actual split much easier later
Events enable loose coupling within monolith
Modules react to events instead of direct calls
Easier to extract into message-based microservices
Operational simplicity has real value
Less infrastructure = less to break
Faster development = more features shipped
Common Mistakes
Tight coupling between modules
Shared database without boundaries
Skipping the modular monolith phase
Jumping straight to microservices is rarely justified
Build modular monolith first, prove the need for split
Not planning for eventual extraction
Design like each module could become a service
Avoid shared state that would be hard to split
When to Use Modular Monolith
Use Modular Monolith When:
Team size < 10-15 developers
Traffic can be handled by vertical scaling
Rapid iteration is priority
Operational simplicity matters
You're not sure about service boundaries yet
Consider Microservices When:
Multiple independent teams (15+ developers)
Clear scaling bottlenecks in specific modules
Need technology diversity (different DBs, languages)
Independent deployment is critical
Can handle operational complexity
Next Steps
Now that you understand how to build a well-structured modular monolith, the next article explores multi-tenant architecture patterns—how to securely isolate data for different customers within the same system.
We'll cover:
Tenant isolation strategies (separate DB vs shared DB)
How the x-tenant-id flows through all modules
PostgreSQL row-level security
Real bug: the cross-tenant data leak I caused
Next Article:03-multi-tenant-architecture-patterns.md - Learn how to build secure multi-tenant systems that prevent data leaks and maintain performance at scale.
Remember: The modular monolith is not a compromise or a stepping stone. For many systems, it's the optimal architecture. Don't let FOMO drive you to microservices before you've exhausted simpler options.
# config/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from .settings import settings
engine = create_engine(
settings.DATABASE_URL,
pool_size=10,
max_overflow=20,
pool_pre_ping=True # Verify connections before using
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
def get_db():
"""Dependency for FastAPI routes"""
db = SessionLocal()
try:
yield db
finally:
db.close()
# auth/models.py
from sqlalchemy import Column, Integer, String, DateTime, Boolean
from sqlalchemy.sql import func
from config.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
username = Column(String(100), nullable=False, unique=True, index=True)
email = Column(String(255), nullable=False)
hashed_password = Column(String(255), nullable=False)
role = Column(String(50), nullable=False) # admin, manager, cashier
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# auth/schemas.py
from pydantic import BaseModel, EmailStr
from typing import Optional
from datetime import datetime
class UserCreate(BaseModel):
username: str
email: EmailStr
password: str
role: str
tenant_id: str
class UserResponse(BaseModel):
id: int
username: str
email: str
role: str
tenant_id: str
is_active: bool
created_at: datetime
class Config:
from_attributes = True
class TokenResponse(BaseModel):
access_token: str
token_type: str
expires_in: int
user: UserResponse
# auth/repository.py
from sqlalchemy.orm import Session
from typing import Optional
from .models import User
class UserRepository:
"""Data access layer for users"""
def __init__(self, db: Session):
self.db = db
def get_by_username(self, username: str, tenant_id: str) -> Optional[User]:
return self.db.query(User).filter(
User.username == username,
User.tenant_id == tenant_id
).first()
def get_by_id(self, user_id: int, tenant_id: str) -> Optional[User]:
return self.db.query(User).filter(
User.id == user_id,
User.tenant_id == tenant_id
).first()
def create(self, user: User) -> User:
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def update(self, user: User) -> User:
self.db.commit()
self.db.refresh(user)
return user
def delete(self, user: User) -> None:
self.db.delete(user)
self.db.commit()
def list_by_tenant(self, tenant_id: str, skip: int = 0, limit: int = 100):
return self.db.query(User).filter(
User.tenant_id == tenant_id
).offset(skip).limit(limit).all()
# auth/service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException
from passlib.context import CryptContext
import jwt
from datetime import datetime, timedelta
from typing import Optional
from .repository import UserRepository
from .models import User
from .schemas import UserCreate, TokenResponse, UserResponse
from config.settings import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthService:
"""Business logic for authentication"""
def __init__(self, db: Session):
self.repository = UserRepository(db)
def _hash_password(self, password: str) -> str:
return pwd_context.hash(password)
def _verify_password(self, plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def _create_access_token(self, user: User) -> str:
payload = {
"user_id": user.id,
"tenant_id": user.tenant_id,
"role": user.role,
"exp": datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
}
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def register_user(self, user_data: UserCreate) -> User:
# Check if user already exists
existing_user = self.repository.get_by_username(
user_data.username,
user_data.tenant_id
)
if existing_user:
raise HTTPException(400, "Username already registered")
# Create new user
user = User(
tenant_id=user_data.tenant_id,
username=user_data.username,
email=user_data.email,
hashed_password=self._hash_password(user_data.password),
role=user_data.role
)
return self.repository.create(user)
def authenticate(self, username: str, password: str, tenant_id: str) -> TokenResponse:
# Get user
user = self.repository.get_by_username(username, tenant_id)
if not user:
raise HTTPException(401, "Invalid credentials")
# Verify password
if not self._verify_password(password, user.hashed_password):
raise HTTPException(401, "Invalid credentials")
# Check if user is active
if not user.is_active:
raise HTTPException(403, "User account is disabled")
# Create access token
access_token = self._create_access_token(user)
return TokenResponse(
access_token=access_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
user=UserResponse.from_orm(user)
)
def verify_token(self, token: str) -> dict:
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
def get_current_user(self, user_id: int, tenant_id: str) -> User:
user = self.repository.get_by_id(user_id, tenant_id)
if not user:
raise HTTPException(404, "User not found")
return user
# auth/router.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from config.database import get_db
from .service import AuthService
from .schemas import UserCreate, TokenResponse, UserResponse
router = APIRouter()
@router.post("/register", response_model=UserResponse)
async def register(user_data: UserCreate, db: Session = Depends(get_db)):
"""Register a new user"""
service = AuthService(db)
user = service.register_user(user_data)
return user
@router.post("/login", response_model=TokenResponse)
async def login(username: str, password: str, tenant_id: str, db: Session = Depends(get_db)):
"""Authenticate user and return JWT token"""
service = AuthService(db)
return service.authenticate(username, password, tenant_id)
@router.get("/me", response_model=UserResponse)
async def get_current_user(
user_id: int,
tenant_id: str,
db: Session = Depends(get_db)
):
"""Get current user profile"""
service = AuthService(db)
return service.get_current_user(user_id, tenant_id)
# pos_core/models.py
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey, JSON
from sqlalchemy.sql import func
from sqlalchemy.orm import relationship
from config.database import Base
class Order(Base):
__tablename__ = "orders"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
order_number = Column(String(50), nullable=False, unique=True)
user_id = Column(Integer, ForeignKey("users.id"))
status = Column(String(20), nullable=False) # pending, completed, cancelled
total_amount = Column(Float, nullable=False)
items = Column(JSON, nullable=False) # Store order items as JSON
payment_status = Column(String(20), nullable=False) # pending, paid, failed
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
# pos_core/service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException
from typing import List, Optional
from datetime import datetime
import uuid
from .repository import OrderRepository
from .models import Order
from .schemas import OrderCreate, OrderResponse, OrderItem
from shared.events import EventBus, OrderCreatedEvent, OrderCompletedEvent
# Import other module services (internal API)
from inventory.service import InventoryService
from payments.service import PaymentService
class POSCoreService:
"""Business logic for order management"""
def __init__(self, db: Session):
self.repository = OrderRepository(db)
self.inventory_service = InventoryService(db)
self.payment_service = PaymentService(db)
self.event_bus = EventBus()
def _generate_order_number(self, tenant_id: str) -> str:
"""Generate unique order number"""
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
unique_id = str(uuid.uuid4())[:8]
return f"{tenant_id}-{timestamp}-{unique_id}"
def _calculate_total(self, items: List[OrderItem], tenant_id: str) -> float:
"""Calculate order total from items"""
total = 0.0
for item in items:
# Get product price from inventory
product = self.inventory_service.get_product(item.product_id, tenant_id)
if not product:
raise HTTPException(404, f"Product {item.product_id} not found")
total += product.price * item.quantity
return total
async def create_order(
self,
order_data: OrderCreate,
user_id: int,
tenant_id: str
) -> Order:
"""Create a new order with inventory and payment validation"""
# Step 1: Validate inventory availability
for item in order_data.items:
available = self.inventory_service.check_stock(
item.product_id,
item.quantity,
tenant_id
)
if not available:
raise HTTPException(
400,
f"Insufficient stock for product {item.product_id}"
)
# Step 2: Calculate total
total_amount = self._calculate_total(order_data.items, tenant_id)
# Step 3: Reserve inventory (reduce stock)
try:
for item in order_data.items:
self.inventory_service.reduce_stock(
item.product_id,
item.quantity,
tenant_id
)
except Exception as e:
# Rollback: restore inventory
# In a real system, use database transactions
raise HTTPException(500, f"Failed to reserve inventory: {str(e)}")
# Step 4: Process payment
try:
payment_result = self.payment_service.process_payment(
total_amount,
order_data.payment_method,
tenant_id
)
except Exception as e:
# Rollback: restore inventory
for item in order_data.items:
self.inventory_service.restore_stock(
item.product_id,
item.quantity,
tenant_id
)
raise HTTPException(503, f"Payment failed: {str(e)}")
# Step 5: Create order
order = Order(
tenant_id=tenant_id,
order_number=self._generate_order_number(tenant_id),
user_id=user_id,
status="completed",
total_amount=total_amount,
items=[item.dict() for item in order_data.items],
payment_status="paid"
)
created_order = self.repository.create(order)
# Step 6: Publish event (for other modules to react)
self.event_bus.publish(OrderCreatedEvent(
order_id=created_order.id,
tenant_id=tenant_id,
total_amount=total_amount
))
return created_order
def get_order(self, order_id: int, tenant_id: str) -> Optional[Order]:
"""Get order by ID"""
return self.repository.get_by_id(order_id, tenant_id)
def list_orders(
self,
tenant_id: str,
status: Optional[str] = None,
skip: int = 0,
limit: int = 100
) -> List[Order]:
"""List orders for tenant"""
return self.repository.list_by_tenant(tenant_id, status, skip, limit)
def cancel_order(self, order_id: int, tenant_id: str) -> Order:
"""Cancel an order and restore inventory"""
order = self.get_order(order_id, tenant_id)
if not order:
raise HTTPException(404, "Order not found")
if order.status == "cancelled":
raise HTTPException(400, "Order already cancelled")
# Restore inventory
for item in order.items:
self.inventory_service.restore_stock(
item["product_id"],
item["quantity"],
tenant_id
)
# Refund payment
if order.payment_status == "paid":
self.payment_service.refund_payment(order.id, tenant_id)
# Update order status
order.status = "cancelled"
order.payment_status = "refunded"
return self.repository.update(order)
# shared/events.py
from typing import Callable, Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class Event:
"""Base event class"""
timestamp: datetime = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.utcnow()
@dataclass
class OrderCreatedEvent(Event):
order_id: int
tenant_id: str
total_amount: float
@dataclass
class OrderCompletedEvent(Event):
order_id: int
tenant_id: str
@dataclass
class InventoryUpdatedEvent(Event):
product_id: int
tenant_id: str
new_quantity: int
class EventBus:
"""Simple in-process event bus for module communication"""
_handlers: Dict[type, List[Callable]] = {}
@classmethod
def subscribe(cls, event_type: type, handler: Callable):
"""Subscribe to an event type"""
if event_type not in cls._handlers:
cls._handlers[event_type] = []
cls._handlers[event_type].append(handler)
@classmethod
def publish(cls, event: Event):
"""Publish an event to all subscribers"""
event_type = type(event)
if event_type in cls._handlers:
for handler in cls._handlers[event_type]:
try:
handler(event)
except Exception as e:
# Log error but don't fail
print(f"Error handling event {event_type}: {e}")
# Example usage in inventory module
def handle_order_created(event: OrderCreatedEvent):
"""React to order creation"""
print(f"Order {event.order_id} created for tenant {event.tenant_id}")
# Could trigger analytics, notifications, etc.
# Subscribe to events
EventBus.subscribe(OrderCreatedEvent, handle_order_created)
# shared/middleware.py
from fastapi import Request, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import jwt
from config.settings import settings
class TenantMiddleware(BaseHTTPMiddleware):
"""Middleware to extract and validate tenant ID from requests"""
async def dispatch(self, request: Request, call_next):
# Skip middleware for public endpoints
if request.url.path in ["/health", "/docs", "/openapi.json"]:
return await call_next(request)
# Extract tenant ID from header
tenant_id = request.headers.get("x-tenant-id")
if not tenant_id:
raise HTTPException(400, "Missing x-tenant-id header")
# Store tenant ID in request state for access in routes
request.state.tenant_id = tenant_id
response = await call_next(request)
return response
class AuthMiddleware(BaseHTTPMiddleware):
"""Middleware to validate JWT tokens"""
async def dispatch(self, request: Request, call_next):
# Skip middleware for auth endpoints and health checks
public_paths = ["/health", "/docs", "/openapi.json", "/api/v1/auth/login", "/api/v1/auth/register"]
if request.url.path in public_paths:
return await call_next(request)
# Extract token from Authorization header
auth_header = request.headers.get("authorization")
if not auth_header or not auth_header.startswith("Bearer "):
raise HTTPException(401, "Missing or invalid authorization header")
token = auth_header.replace("Bearer ", "")
# Verify token
try:
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
# Verify tenant ID matches token
if payload.get("tenant_id") != request.state.tenant_id:
raise HTTPException(403, "Tenant ID mismatch")
# Store user info in request state
request.state.user_id = payload.get("user_id")
request.state.user_role = payload.get("role")
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
response = await call_next(request)
return response
# Monolith: One deployment
docker build -t pos-system:latest .
docker run -p 8000:8000 pos-system:latest
# vs Microservices: Six deployments
docker-compose up -d auth-service
docker-compose up -d pos-core-service
docker-compose up -d inventory-service
docker-compose up -d payment-service
docker-compose up -d restaurant-service
docker-compose up -d chatbot-service
# Monolith: Set breakpoint, trace entire flow
@app.post("/orders")
async def create_order(order_data: OrderCreate):
# Breakpoint here
order = pos_service.create_order(order_data) # Step into
inventory_service.reduce_stock(...) # Step into
payment_service.process_payment(...) # Step into
return order
# vs Microservices: Need distributed tracing, log aggregation
# Trace request across 3 different services, 3 different logs
# Modular Monolith: Database transactions work
from sqlalchemy.orm import Session
def create_order_transactional(db: Session, order_data: OrderCreate):
try:
# All operations in one transaction
order = create_order(db, order_data)
reduce_inventory(db, order.items)
process_payment(db, order.total)
db.commit() # All or nothing
except Exception as e:
db.rollback() # Automatic rollback
raise
# Microservices: Need Saga pattern, eventual consistency
# Complex compensating transactions if one step fails
Before (Modular Monolith):
- Team of 5 developers
- Everyone works on shared codebase
- Easy coordination
After (Growing to 15 developers):
- 3 teams:
* Team A: Auth + Frontend
* Team B: Orders + Payments
* Team C: Inventory + Restaurant
- Merge conflicts increasing
- Deployment coordination needed
# Observability showed different loads
# (collected over 1 month of production)
Module | Requests/min | CPU Usage | Memory
----------------|--------------|-----------|--------
Auth | 100 | 10% | 200MB
POS Core | 500 | 40% | 800MB
Inventory | 50 | 5% | 300MB
Payments | 400 | 35% | 600MB
Restaurant | 80 | 8% | 250MB
Chatbot | 200 | 25% | 1GB
# POS Core and Payments need more resources
# But we're scaling everything together
# Waste of money!
# Inventory needs flexible schema (product attributes vary)
# Better suited for MongoDB
# But Auth, Orders, Payments need ACID transactions
# Better suited for PostgreSQL
# In modular monolith: Must choose one database
# In microservices: Each service picks optimal tech
# Step 1: Keep payment logic in monolith but route to external service
class PaymentService:
def process_payment(self, amount: float, tenant_id: str):
# Check feature flag
if settings.USE_PAYMENT_MICROSERVICE:
# Call external payment service
return self._call_payment_microservice(amount, tenant_id)
else:
# Use local payment logic
return self._process_locally(amount, tenant_id)
def _call_payment_microservice(self, amount: float, tenant_id: str):
response = httpx.post(
"http://payment-service:4004/payments/process",
headers={"x-tenant-id": tenant_id},
json={"amount": amount}
)
return response.json()
def _process_locally(self, amount: float, tenant_id: str):
# Original monolith logic
pass
# Step 2: Gradually move tenants to microservice
# Step 3: Once all tenants migrated, remove local logic
# Step 4: Repeat for next service
# Bad: Direct model access across modules
from inventory.models import Product
def create_order():
product = db.query(Product).get(product_id) # Don't do this!
# Good: Use service layer
def create_order():
product = inventory_service.get_product(product_id)
# Bad: All modules share same tables freely
# Good: Each module owns its tables, others use APIs