# Service Layer Architecture

## When Testing Became Impossible

Six months into the POS project, I needed to add a new feature: order discounts. Simple, right? Just calculate the discount and subtract from the total.

Except the order creation logic was a 300-line function that:

* Validated inventory
* Calculated totals
* Called payment APIs
* Updated database
* Sent email notifications
* Logged to external service

Every test required:

* Running PostgreSQL
* Running Redis
* Mocking payment gateway
* Mocking email service
* Mocking logging service

A "simple" discount feature took 3 days because I couldn't test my changes without setting up the entire infrastructure. That's when I learned about layered architecture.

## The Three-Layer Architecture

{% @mermaid/diagram content="graph TB
subgraph "Presentation Layer (FastAPI Routes)"
ROUTE\["/orders endpoint<br/>HTTP handling, validation"]
end

```
subgraph "Application Layer (Services)"
    SERVICE[OrderService<br/>Business logic, orchestration]
end

subgraph "Domain Layer (Models & Rules)"
    DOMAIN[Order Model<br/>Business rules, calculations]
end

subgraph "Infrastructure Layer (Repositories & External)"
    REPO[OrderRepository<br/>Database access]
    PAYMENT[PaymentGateway<br/>External API]
    CACHE[CacheService<br/>Redis access]
end

ROUTE --> SERVICE
SERVICE --> DOMAIN
SERVICE --> REPO
SERVICE --> PAYMENT
SERVICE --> CACHE

style ROUTE fill:#e6f3ff
style SERVICE fill:#ffe6e6
style DOMAIN fill:#f0e6ff
style REPO fill:#e6ffe6
style PAYMENT fill:#e6ffe6
style CACHE fill:#e6ffe6" %}
```

**Key principle:** Dependencies flow inward. Domain layer has no dependencies. Application layer depends on domain. Infrastructure depends on application.

## Auth Service: Complete Layered Implementation

Let me show you the Auth Service with proper layers:

### Domain Layer: Pure Business Logic

```python
# auth/domain/models.py
from dataclasses import dataclass
from datetime import datetime
from typing import Optional
from enum import Enum

class UserRole(Enum):
    ADMIN = "admin"
    MANAGER = "manager"
    CASHIER = "cashier"

@dataclass
class User:
    """Domain model - pure business logic, no database"""
    id: Optional[int]
    tenant_id: str
    username: str
    email: str
    hashed_password: str
    role: UserRole
    is_active: bool
    created_at: datetime
    updated_at: Optional[datetime] = None
    
    def can_manage_users(self) -> bool:
        """Business rule: Only admins can manage users"""
        return self.role == UserRole.ADMIN
    
    def can_process_refunds(self) -> bool:
        """Business rule: Admins and managers can process refunds"""
        return self.role in [UserRole.ADMIN, UserRole.MANAGER]
    
    def can_create_orders(self) -> bool:
        """Business rule: All active users can create orders"""
        return self.is_active
    
    def deactivate(self):
        """Business logic: Deactivate user"""
        if not self.is_active:
            raise ValueError("User already deactivated")
        self.is_active = False
        self.updated_at = datetime.utcnow()
    
    def change_role(self, new_role: UserRole):
        """Business logic: Change user role"""
        if self.role == new_role:
            raise ValueError("User already has this role")
        self.role = new_role
        self.updated_at = datetime.utcnow()

# auth/domain/value_objects.py
from dataclasses import dataclass
import re

@dataclass(frozen=True)
class Email:
    """Value object - validates email format"""
    value: str
    
    def __post_init__(self):
        pattern = r'^[\w\.-]+@[\w\.-]+\.\w+$'
        if not re.match(pattern, self.value):
            raise ValueError(f"Invalid email format: {self.value}")

@dataclass(frozen=True)
class Password:
    """Value object - validates password strength"""
    value: str
    
    def __post_init__(self):
        if len(self.value) < 8:
            raise ValueError("Password must be at least 8 characters")
        if not any(c.isupper() for c in self.value):
            raise ValueError("Password must contain uppercase letter")
        if not any(c.isdigit() for c in self.value):
            raise ValueError("Password must contain digit")

# auth/domain/exceptions.py
class AuthenticationError(Exception):
    """Raised when authentication fails"""
    pass

class AuthorizationError(Exception):
    """Raised when user lacks permissions"""
    pass

class UserNotFoundError(Exception):
    """Raised when user doesn't exist"""
    pass

class DuplicateUserError(Exception):
    """Raised when username already exists"""
    pass
```

### Application Layer: Business Workflows

```python
# auth/application/services.py
from typing import Optional
from datetime import datetime, timedelta
import jwt
from passlib.context import CryptContext

from auth.domain.models import User, UserRole
from auth.domain.value_objects import Email, Password
from auth.domain.exceptions import (
    AuthenticationError, 
    AuthorizationError,
    UserNotFoundError,
    DuplicateUserError
)
from auth.application.interfaces import UserRepository, CacheService
from config.settings import settings

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

class AuthService:
    """Application service - orchestrates business workflows"""
    
    def __init__(self, 
                 user_repository: UserRepository,
                 cache_service: CacheService):
        self.user_repository = user_repository
        self.cache_service = cache_service
    
    def register_user(self,
                     tenant_id: str,
                     username: str,
                     email: str,
                     password: str,
                     role: UserRole) -> User:
        """Register a new user"""
        
        # Validate using value objects
        email_vo = Email(email)
        password_vo = Password(password)
        
        # Check if username already exists
        existing_user = self.user_repository.find_by_username(username, tenant_id)
        if existing_user:
            raise DuplicateUserError(f"Username {username} already exists")
        
        # Create domain model
        user = User(
            id=None,
            tenant_id=tenant_id,
            username=username,
            email=email_vo.value,
            hashed_password=self._hash_password(password_vo.value),
            role=role,
            is_active=True,
            created_at=datetime.utcnow()
        )
        
        # Persist through repository
        saved_user = self.user_repository.save(user)
        
        return saved_user
    
    def authenticate(self,
                    username: str,
                    password: str,
                    tenant_id: str) -> tuple[User, str]:
        """Authenticate user and return user + token"""
        
        # Find user
        user = self.user_repository.find_by_username(username, tenant_id)
        if not user:
            raise AuthenticationError("Invalid username or password")
        
        # Verify password
        if not self._verify_password(password, user.hashed_password):
            raise AuthenticationError("Invalid username or password")
        
        # Check if user is active
        if not user.is_active:
            raise AuthorizationError("User account is deactivated")
        
        # Generate token
        token = self._create_access_token(user)
        
        # Cache user session
        self.cache_service.set(
            f"session:{token[:16]}", 
            user.id,
            ttl=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
        )
        
        return user, token
    
    def verify_token(self, token: str) -> User:
        """Verify JWT token and return user"""
        
        try:
            # Decode token
            payload = jwt.decode(
                token,
                settings.JWT_SECRET_KEY,
                algorithms=[settings.JWT_ALGORITHM]
            )
            
            user_id = payload.get("user_id")
            tenant_id = payload.get("tenant_id")
            
            if not user_id or not tenant_id:
                raise AuthenticationError("Invalid token payload")
            
            # Get user from repository
            user = self.user_repository.find_by_id(user_id, tenant_id)
            if not user:
                raise UserNotFoundError("User not found")
            
            return user
            
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Token expired")
        except jwt.InvalidTokenError:
            raise AuthenticationError("Invalid token")
    
    def change_user_role(self,
                        user_id: int,
                        new_role: UserRole,
                        tenant_id: str,
                        requesting_user: User) -> User:
        """Change user role (requires admin)"""
        
        # Check permissions
        if not requesting_user.can_manage_users():
            raise AuthorizationError("Only admins can change user roles")
        
        # Get user
        user = self.user_repository.find_by_id(user_id, tenant_id)
        if not user:
            raise UserNotFoundError("User not found")
        
        # Domain logic handles validation
        user.change_role(new_role)
        
        # Persist changes
        updated_user = self.user_repository.update(user)
        
        # Invalidate user's sessions
        self._invalidate_user_sessions(user_id)
        
        return updated_user
    
    def _hash_password(self, password: str) -> str:
        return pwd_context.hash(password)
    
    def _verify_password(self, plain: str, hashed: str) -> bool:
        return pwd_context.verify(plain, hashed)
    
    def _create_access_token(self, user: User) -> str:
        payload = {
            "user_id": user.id,
            "tenant_id": user.tenant_id,
            "role": user.role.value,
            "exp": datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
        }
        return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
    
    def _invalidate_user_sessions(self, user_id: int):
        # Implementation to clear user sessions from cache
        pass

# auth/application/interfaces.py
from abc import ABC, abstractmethod
from typing import Optional, List
from auth.domain.models import User

class UserRepository(ABC):
    """Interface for user data access"""
    
    @abstractmethod
    def save(self, user: User) -> User:
        pass
    
    @abstractmethod
    def update(self, user: User) -> User:
        pass
    
    @abstractmethod
    def find_by_id(self, user_id: int, tenant_id: str) -> Optional[User]:
        pass
    
    @abstractmethod
    def find_by_username(self, username: str, tenant_id: str) -> Optional[User]:
        pass
    
    @abstractmethod
    def find_all_by_tenant(self, tenant_id: str) -> List[User]:
        pass

class CacheService(ABC):
    """Interface for caching"""
    
    @abstractmethod
    def get(self, key: str) -> Optional[any]:
        pass
    
    @abstractmethod
    def set(self, key: str, value: any, ttl: int):
        pass
    
    @abstractmethod
    def delete(self, key: str):
        pass
```

### Infrastructure Layer: External Concerns

```python
# auth/infrastructure/repository.py
from sqlalchemy.orm import Session
from typing import Optional, List

from auth.domain.models import User, UserRole
from auth.application.interfaces import UserRepository

# SQLAlchemy ORM model (separate from domain model)
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.sql import func
from config.database import Base

class UserORM(Base):
    """ORM model for database persistence"""
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    tenant_id = Column(String(50), nullable=False, index=True)
    username = Column(String(100), nullable=False, unique=True)
    email = Column(String(255), nullable=False)
    hashed_password = Column(String(255), nullable=False)
    role = Column(String(20), nullable=False)
    is_active = Column(Boolean, default=True)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), onupdate=func.now())

class SQLAlchemyUserRepository(UserRepository):
    """SQLAlchemy implementation of UserRepository"""
    
    def __init__(self, db: Session):
        self.db = db
    
    def save(self, user: User) -> User:
        """Convert domain model to ORM model and save"""
        orm_user = UserORM(
            tenant_id=user.tenant_id,
            username=user.username,
            email=user.email,
            hashed_password=user.hashed_password,
            role=user.role.value,
            is_active=user.is_active,
            created_at=user.created_at
        )
        
        self.db.add(orm_user)
        self.db.commit()
        self.db.refresh(orm_user)
        
        # Convert back to domain model
        return self._to_domain(orm_user)
    
    def update(self, user: User) -> User:
        """Update existing user"""
        orm_user = self.db.query(UserORM).filter(
            UserORM.id == user.id,
            UserORM.tenant_id == user.tenant_id
        ).first()
        
        if not orm_user:
            return None
        
        orm_user.email = user.email
        orm_user.role = user.role.value
        orm_user.is_active = user.is_active
        orm_user.updated_at = user.updated_at
        
        self.db.commit()
        self.db.refresh(orm_user)
        
        return self._to_domain(orm_user)
    
    def find_by_id(self, user_id: int, tenant_id: str) -> Optional[User]:
        orm_user = self.db.query(UserORM).filter(
            UserORM.id == user_id,
            UserORM.tenant_id == tenant_id
        ).first()
        
        return self._to_domain(orm_user) if orm_user else None
    
    def find_by_username(self, username: str, tenant_id: str) -> Optional[User]:
        orm_user = self.db.query(UserORM).filter(
            UserORM.username == username,
            UserORM.tenant_id == tenant_id
        ).first()
        
        return self._to_domain(orm_user) if orm_user else None
    
    def find_all_by_tenant(self, tenant_id: str) -> List[User]:
        orm_users = self.db.query(UserORM).filter(
            UserORM.tenant_id == tenant_id
        ).all()
        
        return [self._to_domain(u) for u in orm_users]
    
    def _to_domain(self, orm_user: UserORM) -> User:
        """Convert ORM model to domain model"""
        return User(
            id=orm_user.id,
            tenant_id=orm_user.tenant_id,
            username=orm_user.username,
            email=orm_user.email,
            hashed_password=orm_user.hashed_password,
            role=UserRole(orm_user.role),
            is_active=orm_user.is_active,
            created_at=orm_user.created_at,
            updated_at=orm_user.updated_at
        )

# auth/infrastructure/cache.py
import redis
import json
from typing import Optional
from auth.application.interfaces import CacheService

class RedisCacheService(CacheService):
    """Redis implementation of CacheService"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def get(self, key: str) -> Optional[any]:
        value = self.redis.get(key)
        if value:
            return json.loads(value)
        return None
    
    def set(self, key: str, value: any, ttl: int):
        self.redis.setex(key, ttl, json.dumps(value))
    
    def delete(self, key: str):
        self.redis.delete(key)
```

### Presentation Layer: HTTP Handling

```python
# auth/presentation/schemas.py
from pydantic import BaseModel, EmailStr
from datetime import datetime

class RegisterRequest(BaseModel):
    username: str
    email: EmailStr
    password: str
    role: str

class LoginRequest(BaseModel):
    username: str
    password: str

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    role: str
    is_active: bool
    created_at: datetime

class TokenResponse(BaseModel):
    access_token: str
    token_type: str
    user: UserResponse

# auth/presentation/router.py
from fastapi import APIRouter, Depends, HTTPException, Header
from sqlalchemy.orm import Session

from auth.domain.models import UserRole
from auth.domain.exceptions import (
    AuthenticationError,
    AuthorizationError,
    DuplicateUserError
)
from auth.application.services import AuthService
from auth.infrastructure.repository import SQLAlchemyUserRepository
from auth.infrastructure.cache import RedisCacheService
from auth.presentation.schemas import (
    RegisterRequest,
    LoginRequest,
    UserResponse,
    TokenResponse
)
from config.database import get_db
from config.redis_client import get_redis

router = APIRouter()

def get_auth_service(
    db: Session = Depends(get_db),
    redis = Depends(get_redis)
) -> AuthService:
    """Dependency injection for AuthService"""
    repository = SQLAlchemyUserRepository(db)
    cache = RedisCacheService(redis)
    return AuthService(repository, cache)

@router.post("/register", response_model=UserResponse)
async def register(
    request: RegisterRequest,
    tenant_id: str = Header(..., alias="x-tenant-id"),
    service: AuthService = Depends(get_auth_service)
):
    """Register a new user"""
    try:
        user = service.register_user(
            tenant_id=tenant_id,
            username=request.username,
            email=request.email,
            password=request.password,
            role=UserRole(request.role)
        )
        return user
    except DuplicateUserError as e:
        raise HTTPException(409, str(e))
    except ValueError as e:
        raise HTTPException(400, str(e))

@router.post("/login", response_model=TokenResponse)
async def login(
    request: LoginRequest,
    tenant_id: str = Header(..., alias="x-tenant-id"),
    service: AuthService = Depends(get_auth_service)
):
    """Authenticate user"""
    try:
        user, token = service.authenticate(
            username=request.username,
            password=request.password,
            tenant_id=tenant_id
        )
        return TokenResponse(
            access_token=token,
            token_type="bearer",
            user=UserResponse(**user.__dict__)
        )
    except AuthenticationError as e:
        raise HTTPException(401, str(e))
    except AuthorizationError as e:
        raise HTTPException(403, str(e))

# Dependency for routes that need authentication
async def get_current_user(
    authorization: str = Header(...),
    service: AuthService = Depends(get_auth_service)
):
    """Extract and verify user from JWT token"""
    if not authorization.startswith("Bearer "):
        raise HTTPException(401, "Invalid authorization header")
    
    token = authorization.replace("Bearer ", "")
    
    try:
        user = service.verify_token(token)
        return user
    except AuthenticationError as e:
        raise HTTPException(401, str(e))
```

## Dependency Injection with FastAPI

The key to testability is dependency injection:

```python
# Without DI (hard to test)
class OrderService:
    def __init__(self):
        self.db = SessionLocal()  # Hardcoded dependency
        self.payment_gateway = StripeGateway()  # Hardcoded dependency

# With DI (easy to test)
class OrderService:
    def __init__(self, repository: OrderRepository, payment_gateway: PaymentGateway):
        self.repository = repository  # Injected
        self.payment_gateway = payment_gateway  # Injected

# Testing with mocks
def test_create_order():
    mock_repository = Mock(spec=OrderRepository)
    mock_payment = Mock(spec=PaymentGateway)
    
    service = OrderService(mock_repository, mock_payment)
    # Test without real database or payment gateway!
```

## Why Layers Matter for Testing

```python
# tests/auth/test_domain.py
def test_user_can_change_role():
    """Test domain logic without any infrastructure"""
    user = User(
        id=1,
        tenant_id="test",
        username="john",
        email="john@example.com",
        hashed_password="hash",
        role=UserRole.CASHIER,
        is_active=True,
        created_at=datetime.utcnow()
    )
    
    # Pure domain logic test
    user.change_role(UserRole.MANAGER)
    
    assert user.role == UserRole.MANAGER
    assert user.updated_at is not None

# tests/auth/test_service.py
def test_register_user():
    """Test application logic with mocked dependencies"""
    mock_repository = Mock(spec=UserRepository)
    mock_repository.find_by_username.return_value = None  # User doesn't exist
    mock_repository.save.return_value = User(...)  # Return saved user
    
    mock_cache = Mock(spec=CacheService)
    
    service = AuthService(mock_repository, mock_cache)
    
    # Test without database
    user = service.register_user(
        tenant_id="test",
        username="john",
        email="john@example.com",
        password="SecurePass123",
        role=UserRole.CASHIER
    )
    
    assert user.username == "john"
    mock_repository.save.assert_called_once()

# tests/auth/test_repository.py
def test_repository_save(test_db):
    """Integration test with real database"""
    repository = SQLAlchemyUserRepository(test_db)
    
    user = User(
        id=None,
        tenant_id="test",
        username="john",
        email="john@example.com",
        hashed_password="hash",
        role=UserRole.CASHIER,
        is_active=True,
        created_at=datetime.utcnow()
    )
    
    saved_user = repository.save(user)
    
    assert saved_user.id is not None
    
    # Verify in database
    found_user = repository.find_by_username("john", "test")
    assert found_user.id == saved_user.id
```

## Key Learnings

1. **Separate domain logic from infrastructure**
   * Domain models are pure business logic
   * No database, HTTP, or framework dependencies
2. **Define interfaces, depend on abstractions**
   * Services depend on interfaces, not implementations
   * Easy to swap implementations (SQL → MongoDB)
3. **Dependency injection enables testing**
   * Inject mocks instead of real dependencies
   * Test business logic in isolation
4. **Layer boundaries prevent coupling**
   * Infrastructure can't call domain directly
   * Clear flow of dependencies
5. **Value objects enforce validation**
   * Email, Password validate at creation
   * Impossible to have invalid state

## Common Mistakes

1. **Mixing layers**

   ```python
   # Bad: Domain model knows about database
   class User(Base):  # SQLAlchemy Base in domain
       def save(self):
           db.session.add(self)

   # Good: Domain model is pure
   @dataclass
   class User:
       username: str
   ```
2. **Fat services**

   ```python
   # Bad: All logic in service
   class OrderService:
       def create_order(self, items):
           # 500 lines of logic here

   # Good: Logic in domain, service orchestrates
   class Order:
       def calculate_total(self, items):
           # Business logic here

   class OrderService:
       def create_order(self, items):
           order = Order(items)
           total = order.calculate_total(items)
           return self.repository.save(order)
   ```
3. **Anemic domain model**

   ```python
   # Bad: Domain model is just data
   @dataclass
   class Order:
       id: int
       total: float

   # All logic in service
   class OrderService:
       def calculate_discount(self, order):
           # Should be in domain!

   # Good: Domain model has behavior
   @dataclass
   class Order:
       def apply_discount(self, percent):
           self.total *= (1 - percent / 100)
   ```

## When to Use Layered Architecture

**Use When:**

* Application will grow complex
* Need to test business logic
* Multiple developers/teams
* Expect technology changes

**Skip When:**

* Simple CRUD application
* Prototyping/MVP
* Single developer project
* No complex business rules

## Next Steps

Now that you understand service layers, the next article covers API design and contracts—how to design stable, versioned APIs that clients depend on.

**Next Article:** [05-api-design-contracts.md](https://blog.htunnthuthu.com/architecture-and-design/architecture-and-patterns/software-architecture-101/05-api-design-contracts)

***

*Remember: Layers aren't bureaucracy. They're boundaries that let you change one part of the system without breaking everything else.*
