# Authentication & Authorization Architecture

## The Cashier Who Became Admin (Accidentally)

Two months into production, during a routine audit, I discovered something terrifying: a cashier account had full admin privileges. They could:

* Delete other users
* Access financial reports
* Modify system settings
* See data from other restaurant tenants

How did this happen? One line of code:

```python
# The bug - in token verification
def get_current_user(token: str):
    payload = jwt.decode(token, SECRET_KEY)
    user_id = payload["user_id"]
    # BUG: Returning user without checking token claims!
    user = db.query(User).filter(User.id == user_id).first()
    return user  # User object from DB, not token claims

# User had been promoted to cashier, but old JWT still had "admin" role
# I was using DB role, not token role!
```

The cashier had logged in as admin months ago (testing), got a JWT with `role: admin`, then was demoted to cashier in the database. But the old JWT was still valid for 24 hours, and I was checking the wrong role.

That day I learned: **Authentication (who you are) and Authorization (what you can do) are different problems requiring different solutions.**

## JWT Structure with Multi-Tenant Claims

Here's the correct JWT structure for the POS system:

```python
# auth/security/jwt_manager.py
import jwt
from datetime import datetime, timedelta
from typing import Dict, Optional
from config.settings import settings

class JWTManager:
    """Manages JWT token creation and validation"""
    
    @staticmethod
    def create_access_token(
        user_id: int,
        tenant_id: str,
        role: str,
        permissions: list[str]
    ) -> str:
        """Create JWT access token with claims"""
        
        payload = {
            # Standard claims
            "sub": str(user_id),  # Subject (user ID)
            "iat": datetime.utcnow(),  # Issued at
            "exp": datetime.utcnow() + timedelta(hours=settings.ACCESS_TOKEN_EXPIRE_HOURS),
            "iss": "pos-system",  # Issuer
            
            # Custom claims
            "tenant_id": tenant_id,
            "role": role,
            "permissions": permissions,
            "token_type": "access"
        }
        
        return jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm="HS256")
    
    @staticmethod
    def create_refresh_token(user_id: int, tenant_id: str) -> str:
        """Create refresh token (longer-lived, no permissions)"""
        
        payload = {
            "sub": str(user_id),
            "iat": datetime.utcnow(),
            "exp": datetime.utcnow() + timedelta(days=30),
            "iss": "pos-system",
            "tenant_id": tenant_id,
            "token_type": "refresh"
        }
        
        return jwt.encode(payload, settings.JWT_REFRESH_SECRET, algorithm="HS256")
    
    @staticmethod
    def verify_access_token(token: str) -> Dict:
        """Verify and decode access token"""
        try:
            payload = jwt.decode(
                token,
                settings.JWT_SECRET_KEY,
                algorithms=["HS256"],
                issuer="pos-system"
            )
            
            # Verify token type
            if payload.get("token_type") != "access":
                raise ValueError("Invalid token type")
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Token expired")
        except jwt.InvalidIssuerError:
            raise AuthenticationError("Invalid token issuer")
        except jwt.InvalidTokenError:
            raise AuthenticationError("Invalid token")
    
    @staticmethod
    def verify_refresh_token(token: str) -> Dict:
        """Verify and decode refresh token"""
        try:
            payload = jwt.decode(
                token,
                settings.JWT_REFRESH_SECRET,
                algorithms=["HS256"],
                issuer="pos-system"
            )
            
            if payload.get("token_type") != "refresh":
                raise ValueError("Invalid token type")
            
            return payload
            
        except jwt.ExpiredSignatureError:
            raise AuthenticationError("Refresh token expired")
        except jwt.InvalidTokenError:
            raise AuthenticationError("Invalid refresh token")
```

## Auth Service Implementation

Complete authentication flow:

```python
# auth/service.py
from sqlalchemy.orm import Session
from typing import Tuple
from auth.domain.models import User
from auth.security.jwt_manager import JWTManager
from auth.security.password_hasher import PasswordHasher
from auth.infrastructure.repository import UserRepository
from auth.infrastructure.session_store import SessionStore

class AuthService:
    """Handles authentication and token management"""
    
    def __init__(self,
                 user_repository: UserRepository,
                 session_store: SessionStore):
        self.user_repository = user_repository
        self.session_store = session_store
        self.jwt_manager = JWTManager()
        self.password_hasher = PasswordHasher()
    
    def login(self, username: str, password: str, tenant_id: str) -> Tuple[str, str, User]:
        """
        Authenticate user and return access token, refresh token, and user
        
        Returns:
            (access_token, refresh_token, user)
        """
        
        # 1. Find user
        user = self.user_repository.find_by_username(username, tenant_id)
        if not user:
            raise AuthenticationError("Invalid credentials")
        
        # 2. Verify password
        if not self.password_hasher.verify(password, user.hashed_password):
            raise AuthenticationError("Invalid credentials")
        
        # 3. Check if user is active
        if not user.is_active:
            raise AuthorizationError("Account is deactivated")
        
        # 4. Get user permissions based on role
        permissions = self._get_role_permissions(user.role)
        
        # 5. Create tokens
        access_token = self.jwt_manager.create_access_token(
            user_id=user.id,
            tenant_id=tenant_id,
            role=user.role.value,
            permissions=permissions
        )
        
        refresh_token = self.jwt_manager.create_refresh_token(
            user_id=user.id,
            tenant_id=tenant_id
        )
        
        # 6. Store session in Redis
        session_id = self._generate_session_id()
        self.session_store.create_session(
            session_id=session_id,
            user_id=user.id,
            tenant_id=tenant_id,
            access_token=access_token[:16],  # Store token prefix for validation
            refresh_token_hash=self.password_hasher.hash(refresh_token),
            expires_at=datetime.utcnow() + timedelta(hours=24)
        )
        
        return access_token, refresh_token, user
    
    def logout(self, access_token: str):
        """Invalidate user session"""
        
        # Verify token
        payload = self.jwt_manager.verify_access_token(access_token)
        
        # Delete session from Redis
        token_prefix = access_token[:16]
        self.session_store.delete_session_by_token(token_prefix)
    
    def refresh_access_token(self, refresh_token: str) -> str:
        """Generate new access token using refresh token"""
        
        # 1. Verify refresh token
        payload = self.jwt_manager.verify_refresh_token(refresh_token)
        user_id = int(payload["sub"])
        tenant_id = payload["tenant_id"]
        
        # 2. Verify session exists
        refresh_token_hash = self.password_hasher.hash(refresh_token)
        session = self.session_store.find_by_refresh_token(refresh_token_hash)
        if not session:
            raise AuthenticationError("Invalid refresh token")
        
        # 3. Get fresh user data (role might have changed)
        user = self.user_repository.find_by_id(user_id, tenant_id)
        if not user or not user.is_active:
            raise AuthorizationError("User not found or deactivated")
        
        # 4. Create new access token with current permissions
        permissions = self._get_role_permissions(user.role)
        new_access_token = self.jwt_manager.create_access_token(
            user_id=user.id,
            tenant_id=tenant_id,
            role=user.role.value,
            permissions=permissions
        )
        
        # 5. Update session
        self.session_store.update_access_token(session.id, new_access_token[:16])
        
        return new_access_token
    
    def _get_role_permissions(self, role) -> list[str]:
        """Get permissions for a role"""
        
        permissions_map = {
            "admin": [
                "users.create", "users.read", "users.update", "users.delete",
                "orders.create", "orders.read", "orders.update", "orders.delete",
                "inventory.create", "inventory.read", "inventory.update", "inventory.delete",
                "payments.read", "payments.refund",
                "reports.read", "reports.export",
                "settings.read", "settings.update"
            ],
            "manager": [
                "users.read",
                "orders.create", "orders.read", "orders.update",
                "inventory.create", "inventory.read", "inventory.update",
                "payments.read", "payments.refund",
                "reports.read"
            ],
            "cashier": [
                "orders.create", "orders.read",
                "inventory.read",
                "payments.read"
            ]
        }
        
        return permissions_map.get(role.value, [])
    
    def _generate_session_id(self) -> str:
        import uuid
        return str(uuid.uuid4())
```

## Token Validation Middleware

Middleware for protecting routes:

```python
# shared/middleware/auth_middleware.py
from fastapi import Request, HTTPException, Depends
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
from auth.security.jwt_manager import JWTManager
from auth.infrastructure.session_store import SessionStore

security = HTTPBearer()

class AuthenticatedUser:
    """Represents the authenticated user from JWT"""
    
    def __init__(self, user_id: int, tenant_id: str, role: str, permissions: list[str]):
        self.user_id = user_id
        self.tenant_id = tenant_id
        self.role = role
        self.permissions = permissions
    
    def has_permission(self, permission: str) -> bool:
        """Check if user has specific permission"""
        return permission in self.permissions
    
    def has_any_permission(self, *permissions: str) -> bool:
        """Check if user has any of the specified permissions"""
        return any(p in self.permissions for p in permissions)
    
    def has_all_permissions(self, *permissions: str) -> bool:
        """Check if user has all of the specified permissions"""
        return all(p in self.permissions for p in permissions)

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
    session_store: SessionStore = Depends(get_session_store)
) -> AuthenticatedUser:
    """
    Dependency to extract and validate user from JWT token
    
    Usage:
        @app.get("/orders")
        async def get_orders(user: AuthenticatedUser = Depends(get_current_user)):
            # user.user_id, user.tenant_id, user.role available
    """
    
    token = credentials.credentials
    
    # Verify JWT
    try:
        payload = JWTManager.verify_access_token(token)
    except AuthenticationError as e:
        raise HTTPException(401, str(e))
    
    # Verify session still exists (not logged out)
    token_prefix = token[:16]
    if not session_store.session_exists(token_prefix):
        raise HTTPException(401, "Session expired or invalidated")
    
    # Extract claims
    user_id = int(payload["sub"])
    tenant_id = payload["tenant_id"]
    role = payload["role"]
    permissions = payload.get("permissions", [])
    
    return AuthenticatedUser(
        user_id=user_id,
        tenant_id=tenant_id,
        role=role,
        permissions=permissions
    )

def require_permission(*required_permissions: str):
    """
    Dependency to require specific permissions
    
    Usage:
        @app.delete("/users/{user_id}")
        async def delete_user(
            user_id: int,
            user: AuthenticatedUser = Depends(require_permission("users.delete"))
        ):
            # Only users with "users.delete" permission can access
    """
    
    async def permission_checker(
        user: AuthenticatedUser = Depends(get_current_user)
    ) -> AuthenticatedUser:
        if not user.has_all_permissions(*required_permissions):
            raise HTTPException(
                403,
                f"Missing required permissions: {', '.join(required_permissions)}"
            )
        return user
    
    return permission_checker

def require_role(*allowed_roles: str):
    """
    Dependency to require specific roles
    
    Usage:
        @app.get("/admin/settings")
        async def get_settings(
            user: AuthenticatedUser = Depends(require_role("admin"))
        ):
            # Only admins can access
    """
    
    async def role_checker(
        user: AuthenticatedUser = Depends(get_current_user)
    ) -> AuthenticatedUser:
        if user.role not in allowed_roles:
            raise HTTPException(
                403,
                f"Requires one of roles: {', '.join(allowed_roles)}"
            )
        return user
    
    return role_checker
```

## RBAC (Role-Based Access Control)

Implementing roles and permissions:

```python
# Using the middleware in routes

from fastapi import APIRouter, Depends
from shared.middleware.auth_middleware import (
    get_current_user,
    require_permission,
    require_role,
    AuthenticatedUser
)

router = APIRouter(prefix="/api/v1")

# Public endpoint (no auth)
@router.get("/health")
async def health_check():
    return {"status": "healthy"}

# Authenticated endpoint (any logged-in user)
@router.get("/orders")
async def list_orders(user: AuthenticatedUser = Depends(get_current_user)):
    # user.tenant_id automatically filtered
    return service.get_orders(user.tenant_id, user.user_id)

# Permission-based access
@router.post("/orders/{order_id}/refund")
async def refund_order(
    order_id: int,
    user: AuthenticatedUser = Depends(require_permission("payments.refund"))
):
    # Only users with "payments.refund" permission can access
    # (admins and managers)
    return service.refund_order(order_id, user.tenant_id)

# Role-based access
@router.delete("/users/{user_id}")
async def delete_user(
    user_id: int,
    current_user: AuthenticatedUser = Depends(require_role("admin"))
):
    # Only admins can delete users
    return service.delete_user(user_id, current_user.tenant_id)

# Multiple permissions required
@router.post("/reports/financial")
async def generate_financial_report(
    user: AuthenticatedUser = Depends(
        require_permission("reports.read", "reports.export")
    )
):
    # Requires both permissions
    return service.generate_report(user.tenant_id)

# Custom permission check
@router.patch("/orders/{order_id}")
async def update_order(
    order_id: int,
    updates: OrderUpdate,
    user: AuthenticatedUser = Depends(get_current_user)
):
    # Custom logic
    order = service.get_order(order_id, user.tenant_id)
    
    # Cashiers can only update their own orders
    if user.role == "cashier" and order.created_by_user_id != user.user_id:
        raise HTTPException(403, "Can only update your own orders")
    
    # Managers and admins can update any order
    return service.update_order(order_id, updates, user.tenant_id)
```

## Refresh Token Flow

Implementing token refresh for better security:

```python
# auth/router.py

@router.post("/auth/refresh", response_model=TokenResponse)
async def refresh_token(
    refresh_token: str,
    service: AuthService = Depends(get_auth_service)
):
    """
    Get new access token using refresh token
    
    Flow:
    1. Client's access token expires (24 hours)
    2. Client sends refresh token (valid 30 days)
    3. Server issues new access token with current permissions
    4. Client continues without re-login
    """
    try:
        new_access_token = service.refresh_access_token(refresh_token)
        
        # Optionally rotate refresh token for better security
        # new_refresh_token = service.rotate_refresh_token(refresh_token)
        
        return TokenResponse(
            access_token=new_access_token,
            token_type="bearer",
            expires_in=settings.ACCESS_TOKEN_EXPIRE_HOURS * 3600
        )
        
    except AuthenticationError as e:
        raise HTTPException(401, str(e))

# Client usage:
"""
// JavaScript client
async function apiCall(endpoint) {
    try {
        const response = await fetch(endpoint, {
            headers: {
                'Authorization': `Bearer ${accessToken}`
            }
        });
        
        if (response.status === 401) {
            // Access token expired, refresh it
            const newAccessToken = await refreshAccessToken();
            
            // Retry with new token
            return fetch(endpoint, {
                headers: {
                    'Authorization': `Bearer ${newAccessToken}`
                }
            });
        }
        
        return response;
    } catch (error) {
        console.error('API call failed:', error);
    }
}

async function refreshAccessToken() {
    const response = await fetch('/auth/refresh', {
        method: 'POST',
        body: JSON.stringify({refresh_token: refreshToken})
    });
    
    const data = await response.json();
    accessToken = data.access_token;
    return accessToken;
}
"""
```

## Session Management with Redis

```python
# auth/infrastructure/session_store.py
import redis
import json
from datetime import datetime, timedelta
from typing import Optional, Dict

class Session:
    def __init__(self, session_id: str, user_id: int, tenant_id: str,
                 access_token_prefix: str, refresh_token_hash: str,
                 expires_at: datetime):
        self.session_id = session_id
        self.user_id = user_id
        self.tenant_id = tenant_id
        self.access_token_prefix = access_token_prefix
        self.refresh_token_hash = refresh_token_hash
        self.expires_at = expires_at

class SessionStore:
    """Manages user sessions in Redis"""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def create_session(self,
                      session_id: str,
                      user_id: int,
                      tenant_id: str,
                      access_token: str,
                      refresh_token_hash: str,
                      expires_at: datetime):
        """Create a new session"""
        
        session_data = {
            "session_id": session_id,
            "user_id": user_id,
            "tenant_id": tenant_id,
            "access_token_prefix": access_token,
            "refresh_token_hash": refresh_token_hash,
            "expires_at": expires_at.isoformat(),
            "created_at": datetime.utcnow().isoformat()
        }
        
        # Store session by ID
        key = f"session:{session_id}"
        ttl = int((expires_at - datetime.utcnow()).total_seconds())
        self.redis.setex(key, ttl, json.dumps(session_data))
        
        # Store session by token prefix (for quick validation)
        token_key = f"session:token:{access_token}"
        self.redis.setex(token_key, ttl, session_id)
        
        # Store session by user (for listing user sessions)
        user_key = f"sessions:user:{user_id}:{tenant_id}"
        self.redis.sadd(user_key, session_id)
        self.redis.expire(user_key, ttl)
    
    def session_exists(self, access_token_prefix: str) -> bool:
        """Check if session with this token exists"""
        token_key = f"session:token:{access_token_prefix}"
        return self.redis.exists(token_key)
    
    def find_by_refresh_token(self, refresh_token_hash: str) -> Optional[Session]:
        """Find session by refresh token hash"""
        # Note: This requires scanning, consider indexing if performance issue
        # For production, store reverse mapping: refresh_token_hash -> session_id
        pass
    
    def delete_session_by_token(self, access_token_prefix: str):
        """Delete session by access token"""
        token_key = f"session:token:{access_token_prefix}"
        session_id = self.redis.get(token_key)
        
        if session_id:
            session_key = f"session:{session_id.decode()}"
            self.redis.delete(session_key)
            self.redis.delete(token_key)
    
    def delete_all_user_sessions(self, user_id: int, tenant_id: str):
        """Delete all sessions for a user (force logout everywhere)"""
        user_key = f"sessions:user:{user_id}:{tenant_id}"
        session_ids = self.redis.smembers(user_key)
        
        for session_id in session_ids:
            session_key = f"session:{session_id.decode()}"
            self.redis.delete(session_key)
        
        self.redis.delete(user_key)
```

## Key Learnings

1. **Separate authentication from authorization**
   * Authentication: Who are you? (JWT verification)
   * Authorization: What can you do? (Permissions check)
2. **Use claims from token, not database**
   * Token represents user at time of login
   * If role changes, force re-login or refresh token
3. **Refresh tokens improve security**
   * Short-lived access tokens (hours)
   * Long-lived refresh tokens (days/weeks)
   * Can revoke refresh tokens
4. **Session management prevents replay attacks**
   * Store active sessions in Redis
   * Logout invalidates session
   * Check session on every request
5. **Permission-based access is more flexible than roles**
   * Roles map to permissions
   * Can change role permissions without code changes

## Common Mistakes

1. **Trusting client-sent role**
   * Always get role from verified JWT
   * Never accept role from request body
2. **Not invalidating sessions on logout**
   * JWT is stateless but sessions aren't
   * Must track and invalidate
3. **Using same secret for access and refresh tokens**
   * Different secrets for different token types
   * Limits blast radius if secret leaks
4. **Not refreshing permissions**
   * User promoted to admin but token still says cashier
   * Force token refresh on role change

## When to Use

**JWT + Sessions (My approach):**

* Best security (stateless + revocable)
* Supports logout
* Can track active sessions

**Pure JWT (Stateless):**

* Simpler implementation
* Better horizontal scaling
* Can't revoke tokens

**Session-only (No JWT):**

* Traditional approach
* Requires sticky sessions
* Limited in microservices

## Next Steps

**Next Article:** [07-data-architecture-patterns.md](https://blog.htunnthuthu.com/architecture-and-design/architecture-and-patterns/software-architecture-101/07-data-architecture-patterns) - Learn database patterns for microservices.

***

*Remember: Security is not a feature. Get authentication and authorization right from the start, because fixing security holes in production is exponentially harder than building it correctly.*
