Six months into the POS project, I needed to add a new feature: order discounts. Simple, right? Just calculate the discount and subtract from the total.
Except the order creation logic was a 300-line function that:
Validated inventory
Calculated totals
Called payment APIs
Updated database
Sent email notifications
Logged to external service
Every test required:
Running PostgreSQL
Running Redis
Mocking payment gateway
Mocking email service
Mocking logging service
A "simple" discount feature took 3 days because I couldn't test my changes without setting up the entire infrastructure. That's when I learned about layered architecture.
The Three-Layer Architecture
Key principle: Dependencies flow inward. Domain layer has no dependencies. Application layer depends on domain. Infrastructure depends on application.
Auth Service: Complete Layered Implementation
Let me show you the Auth Service with proper layers:
Domain Layer: Pure Business Logic
Application Layer: Business Workflows
Infrastructure Layer: External Concerns
Presentation Layer: HTTP Handling
Dependency Injection with FastAPI
The key to testability is dependency injection:
Why Layers Matter for Testing
Key Learnings
Separate domain logic from infrastructure
Domain models are pure business logic
No database, HTTP, or framework dependencies
Define interfaces, depend on abstractions
Services depend on interfaces, not implementations
Easy to swap implementations (SQL → MongoDB)
Dependency injection enables testing
Inject mocks instead of real dependencies
Test business logic in isolation
Layer boundaries prevent coupling
Infrastructure can't call domain directly
Clear flow of dependencies
Value objects enforce validation
Email, Password validate at creation
Impossible to have invalid state
Common Mistakes
Mixing layers
Fat services
Anemic domain model
When to Use Layered Architecture
Use When:
Application will grow complex
Need to test business logic
Multiple developers/teams
Expect technology changes
Skip When:
Simple CRUD application
Prototyping/MVP
Single developer project
No complex business rules
Next Steps
Now that you understand service layers, the next article covers API design and contracts—how to design stable, versioned APIs that clients depend on.
# auth/domain/models.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from enum import Enum
class UserRole(Enum):
ADMIN = "admin"
MANAGER = "manager"
CASHIER = "cashier"
@dataclass
class User:
"""Domain model - pure business logic, no database"""
id: Optional[int]
tenant_id: str
username: str
email: str
hashed_password: str
role: UserRole
is_active: bool
created_at: datetime
updated_at: Optional[datetime] = None
def can_manage_users(self) -> bool:
"""Business rule: Only admins can manage users"""
return self.role == UserRole.ADMIN
def can_process_refunds(self) -> bool:
"""Business rule: Admins and managers can process refunds"""
return self.role in [UserRole.ADMIN, UserRole.MANAGER]
def can_create_orders(self) -> bool:
"""Business rule: All active users can create orders"""
return self.is_active
def deactivate(self):
"""Business logic: Deactivate user"""
if not self.is_active:
raise ValueError("User already deactivated")
self.is_active = False
self.updated_at = datetime.utcnow()
def change_role(self, new_role: UserRole):
"""Business logic: Change user role"""
if self.role == new_role:
raise ValueError("User already has this role")
self.role = new_role
self.updated_at = datetime.utcnow()
# auth/domain/value_objects.py
from dataclasses import dataclass
import re
@dataclass(frozen=True)
class Email:
"""Value object - validates email format"""
value: str
def __post_init__(self):
pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
if not re.match(pattern, self.value):
raise ValueError(f"Invalid email format: {self.value}")
@dataclass(frozen=True)
class Password:
"""Value object - validates password strength"""
value: str
def __post_init__(self):
if len(self.value) < 8:
raise ValueError("Password must be at least 8 characters")
if not any(c.isupper() for c in self.value):
raise ValueError("Password must contain uppercase letter")
if not any(c.isdigit() for c in self.value):
raise ValueError("Password must contain digit")
# auth/domain/exceptions.py
class AuthenticationError(Exception):
"""Raised when authentication fails"""
pass
class AuthorizationError(Exception):
"""Raised when user lacks permissions"""
pass
class UserNotFoundError(Exception):
"""Raised when user doesn't exist"""
pass
class DuplicateUserError(Exception):
"""Raised when username already exists"""
pass
# auth/application/services.py
from typing import Optional
from datetime import datetime, timedelta
import jwt
from passlib.context import CryptContext
from auth.domain.models import User, UserRole
from auth.domain.value_objects import Email, Password
from auth.domain.exceptions import (
AuthenticationError,
AuthorizationError,
UserNotFoundError,
DuplicateUserError
)
from auth.application.interfaces import UserRepository, CacheService
from config.settings import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class AuthService:
"""Application service - orchestrates business workflows"""
def __init__(self,
user_repository: UserRepository,
cache_service: CacheService):
self.user_repository = user_repository
self.cache_service = cache_service
def register_user(self,
tenant_id: str,
username: str,
email: str,
password: str,
role: UserRole) -> User:
"""Register a new user"""
# Validate using value objects
email_vo = Email(email)
password_vo = Password(password)
# Check if username already exists
existing_user = self.user_repository.find_by_username(username, tenant_id)
if existing_user:
raise DuplicateUserError(f"Username {username} already exists")
# Create domain model
user = User(
id=None,
tenant_id=tenant_id,
username=username,
email=email_vo.value,
hashed_password=self._hash_password(password_vo.value),
role=role,
is_active=True,
created_at=datetime.utcnow()
)
# Persist through repository
saved_user = self.user_repository.save(user)
return saved_user
def authenticate(self,
username: str,
password: str,
tenant_id: str) -> tuple[User, str]:
"""Authenticate user and return user + token"""
# Find user
user = self.user_repository.find_by_username(username, tenant_id)
if not user:
raise AuthenticationError("Invalid username or password")
# Verify password
if not self._verify_password(password, user.hashed_password):
raise AuthenticationError("Invalid username or password")
# Check if user is active
if not user.is_active:
raise AuthorizationError("User account is deactivated")
# Generate token
token = self._create_access_token(user)
# Cache user session
self.cache_service.set(
f"session:{token[:16]}",
user.id,
ttl=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
return user, token
def verify_token(self, token: str) -> User:
"""Verify JWT token and return user"""
try:
# Decode token
payload = jwt.decode(
token,
settings.JWT_SECRET_KEY,
algorithms=[settings.JWT_ALGORITHM]
)
user_id = payload.get("user_id")
tenant_id = payload.get("tenant_id")
if not user_id or not tenant_id:
raise AuthenticationError("Invalid token payload")
# Get user from repository
user = self.user_repository.find_by_id(user_id, tenant_id)
if not user:
raise UserNotFoundError("User not found")
return user
except jwt.ExpiredSignatureError:
raise AuthenticationError("Token expired")
except jwt.InvalidTokenError:
raise AuthenticationError("Invalid token")
def change_user_role(self,
user_id: int,
new_role: UserRole,
tenant_id: str,
requesting_user: User) -> User:
"""Change user role (requires admin)"""
# Check permissions
if not requesting_user.can_manage_users():
raise AuthorizationError("Only admins can change user roles")
# Get user
user = self.user_repository.find_by_id(user_id, tenant_id)
if not user:
raise UserNotFoundError("User not found")
# Domain logic handles validation
user.change_role(new_role)
# Persist changes
updated_user = self.user_repository.update(user)
# Invalidate user's sessions
self._invalidate_user_sessions(user_id)
return updated_user
def _hash_password(self, password: str) -> str:
return pwd_context.hash(password)
def _verify_password(self, plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def _create_access_token(self, user: User) -> str:
payload = {
"user_id": user.id,
"tenant_id": user.tenant_id,
"role": user.role.value,
"exp": datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
}
return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
def _invalidate_user_sessions(self, user_id: int):
# Implementation to clear user sessions from cache
pass
# auth/application/interfaces.py
from abc import ABC, abstractmethod
from typing import Optional, List
from auth.domain.models import User
class UserRepository(ABC):
"""Interface for user data access"""
@abstractmethod
def save(self, user: User) -> User:
pass
@abstractmethod
def update(self, user: User) -> User:
pass
@abstractmethod
def find_by_id(self, user_id: int, tenant_id: str) -> Optional[User]:
pass
@abstractmethod
def find_by_username(self, username: str, tenant_id: str) -> Optional[User]:
pass
@abstractmethod
def find_all_by_tenant(self, tenant_id: str) -> List[User]:
pass
class CacheService(ABC):
"""Interface for caching"""
@abstractmethod
def get(self, key: str) -> Optional[any]:
pass
@abstractmethod
def set(self, key: str, value: any, ttl: int):
pass
@abstractmethod
def delete(self, key: str):
pass
# auth/infrastructure/repository.py
from sqlalchemy.orm import Session
from typing import Optional, List
from auth.domain.models import User, UserRole
from auth.application.interfaces import UserRepository
# SQLAlchemy ORM model (separate from domain model)
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from config.database import Base
class UserORM(Base):
"""ORM model for database persistence"""
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
tenant_id = Column(String(50), nullable=False, index=True)
username = Column(String(100), nullable=False, unique=True)
email = Column(String(255), nullable=False)
hashed_password = Column(String(255), nullable=False)
role = Column(String(20), nullable=False)
is_active = Column(Boolean, default=True)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), onupdate=func.now())
class SQLAlchemyUserRepository(UserRepository):
"""SQLAlchemy implementation of UserRepository"""
def __init__(self, db: Session):
self.db = db
def save(self, user: User) -> User:
"""Convert domain model to ORM model and save"""
orm_user = UserORM(
tenant_id=user.tenant_id,
username=user.username,
email=user.email,
hashed_password=user.hashed_password,
role=user.role.value,
is_active=user.is_active,
created_at=user.created_at
)
self.db.add(orm_user)
self.db.commit()
self.db.refresh(orm_user)
# Convert back to domain model
return self._to_domain(orm_user)
def update(self, user: User) -> User:
"""Update existing user"""
orm_user = self.db.query(UserORM).filter(
UserORM.id == user.id,
UserORM.tenant_id == user.tenant_id
).first()
if not orm_user:
return None
orm_user.email = user.email
orm_user.role = user.role.value
orm_user.is_active = user.is_active
orm_user.updated_at = user.updated_at
self.db.commit()
self.db.refresh(orm_user)
return self._to_domain(orm_user)
def find_by_id(self, user_id: int, tenant_id: str) -> Optional[User]:
orm_user = self.db.query(UserORM).filter(
UserORM.id == user_id,
UserORM.tenant_id == tenant_id
).first()
return self._to_domain(orm_user) if orm_user else None
def find_by_username(self, username: str, tenant_id: str) -> Optional[User]:
orm_user = self.db.query(UserORM).filter(
UserORM.username == username,
UserORM.tenant_id == tenant_id
).first()
return self._to_domain(orm_user) if orm_user else None
def find_all_by_tenant(self, tenant_id: str) -> List[User]:
orm_users = self.db.query(UserORM).filter(
UserORM.tenant_id == tenant_id
).all()
return [self._to_domain(u) for u in orm_users]
def _to_domain(self, orm_user: UserORM) -> User:
"""Convert ORM model to domain model"""
return User(
id=orm_user.id,
tenant_id=orm_user.tenant_id,
username=orm_user.username,
email=orm_user.email,
hashed_password=orm_user.hashed_password,
role=UserRole(orm_user.role),
is_active=orm_user.is_active,
created_at=orm_user.created_at,
updated_at=orm_user.updated_at
)
# auth/infrastructure/cache.py
import redis
import json
from typing import Optional
from auth.application.interfaces import CacheService
class RedisCacheService(CacheService):
"""Redis implementation of CacheService"""
def __init__(self, redis_client: redis.Redis):
self.redis = redis_client
def get(self, key: str) -> Optional[any]:
value = self.redis.get(key)
if value:
return json.loads(value)
return None
def set(self, key: str, value: any, ttl: int):
self.redis.setex(key, ttl, json.dumps(value))
def delete(self, key: str):
self.redis.delete(key)
# auth/presentation/schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime
class RegisterRequest(BaseModel):
username: str
email: EmailStr
password: str
role: str
class LoginRequest(BaseModel):
username: str
password: str
class UserResponse(BaseModel):
id: int
username: str
email: str
role: str
is_active: bool
created_at: datetime
class TokenResponse(BaseModel):
access_token: str
token_type: str
user: UserResponse
# auth/presentation/router.py
from fastapi import APIRouter, Depends, HTTPException, Header
from sqlalchemy.orm import Session
from auth.domain.models import UserRole
from auth.domain.exceptions import (
AuthenticationError,
AuthorizationError,
DuplicateUserError
)
from auth.application.services import AuthService
from auth.infrastructure.repository import SQLAlchemyUserRepository
from auth.infrastructure.cache import RedisCacheService
from auth.presentation.schemas import (
RegisterRequest,
LoginRequest,
UserResponse,
TokenResponse
)
from config.database import get_db
from config.redis_client import get_redis
router = APIRouter()
def get_auth_service(
db: Session = Depends(get_db),
redis = Depends(get_redis)
) -> AuthService:
"""Dependency injection for AuthService"""
repository = SQLAlchemyUserRepository(db)
cache = RedisCacheService(redis)
return AuthService(repository, cache)
@router.post("/register", response_model=UserResponse)
async def register(
request: RegisterRequest,
tenant_id: str = Header(..., alias="x-tenant-id"),
service: AuthService = Depends(get_auth_service)
):
"""Register a new user"""
try:
user = service.register_user(
tenant_id=tenant_id,
username=request.username,
email=request.email,
password=request.password,
role=UserRole(request.role)
)
return user
except DuplicateUserError as e:
raise HTTPException(409, str(e))
except ValueError as e:
raise HTTPException(400, str(e))
@router.post("/login", response_model=TokenResponse)
async def login(
request: LoginRequest,
tenant_id: str = Header(..., alias="x-tenant-id"),
service: AuthService = Depends(get_auth_service)
):
"""Authenticate user"""
try:
user, token = service.authenticate(
username=request.username,
password=request.password,
tenant_id=tenant_id
)
return TokenResponse(
access_token=token,
token_type="bearer",
user=UserResponse(**user.__dict__)
)
except AuthenticationError as e:
raise HTTPException(401, str(e))
except AuthorizationError as e:
raise HTTPException(403, str(e))
# Dependency for routes that need authentication
async def get_current_user(
authorization: str = Header(...),
service: AuthService = Depends(get_auth_service)
):
"""Extract and verify user from JWT token"""
if not authorization.startswith("Bearer "):
raise HTTPException(401, "Invalid authorization header")
token = authorization.replace("Bearer ", "")
try:
user = service.verify_token(token)
return user
except AuthenticationError as e:
raise HTTPException(401, str(e))
# Without DI (hard to test)
class OrderService:
def __init__(self):
self.db = SessionLocal() # Hardcoded dependency
self.payment_gateway = StripeGateway() # Hardcoded dependency
# With DI (easy to test)
class OrderService:
def __init__(self, repository: OrderRepository, payment_gateway: PaymentGateway):
self.repository = repository # Injected
self.payment_gateway = payment_gateway # Injected
# Testing with mocks
def test_create_order():
mock_repository = Mock(spec=OrderRepository)
mock_payment = Mock(spec=PaymentGateway)
service = OrderService(mock_repository, mock_payment)
# Test without real database or payment gateway!
# tests/auth/test_domain.py
def test_user_can_change_role():
"""Test domain logic without any infrastructure"""
user = User(
id=1,
tenant_id="test",
username="john",
email="[email protected]",
hashed_password="hash",
role=UserRole.CASHIER,
is_active=True,
created_at=datetime.utcnow()
)
# Pure domain logic test
user.change_role(UserRole.MANAGER)
assert user.role == UserRole.MANAGER
assert user.updated_at is not None
# tests/auth/test_service.py
def test_register_user():
"""Test application logic with mocked dependencies"""
mock_repository = Mock(spec=UserRepository)
mock_repository.find_by_username.return_value = None # User doesn't exist
mock_repository.save.return_value = User(...) # Return saved user
mock_cache = Mock(spec=CacheService)
service = AuthService(mock_repository, mock_cache)
# Test without database
user = service.register_user(
tenant_id="test",
username="john",
email="[email protected]",
password="SecurePass123",
role=UserRole.CASHIER
)
assert user.username == "john"
mock_repository.save.assert_called_once()
# tests/auth/test_repository.py
def test_repository_save(test_db):
"""Integration test with real database"""
repository = SQLAlchemyUserRepository(test_db)
user = User(
id=None,
tenant_id="test",
username="john",
email="[email protected]",
hashed_password="hash",
role=UserRole.CASHIER,
is_active=True,
created_at=datetime.utcnow()
)
saved_user = repository.save(user)
assert saved_user.id is not None
# Verify in database
found_user = repository.find_by_username("john", "test")
assert found_user.id == saved_user.id
# Bad: Domain model knows about database
class User(Base): # SQLAlchemy Base in domain
def save(self):
db.session.add(self)
# Good: Domain model is pure
@dataclass
class User:
username: str
# Bad: All logic in service
class OrderService:
def create_order(self, items):
# 500 lines of logic here
# Good: Logic in domain, service orchestrates
class Order:
def calculate_total(self, items):
# Business logic here
class OrderService:
def create_order(self, items):
order = Order(items)
total = order.calculate_total(items)
return self.repository.save(order)
# Bad: Domain model is just data
@dataclass
class Order:
id: int
total: float
# All logic in service
class OrderService:
def calculate_discount(self, order):
# Should be in domain!
# Good: Domain model has behavior
@dataclass
class Order:
def apply_discount(self, percent):
self.total *= (1 - percent / 100)