# Caching Strategies & Session Management

## Table of Contents

* [Introduction](#introduction)
* [The Performance Problem](#the-performance-problem)
* [Redis in the Auth Service](#redis-in-the-auth-service)
* [Redis in the Chatbot Service](#redis-in-the-chatbot-service)
* [Cache-Aside Pattern Implementation](#cache-aside-pattern-implementation)
* [TTL Strategies](#ttl-strategies)
* [Multi-Tenant Cache Isolation](#multi-tenant-cache-isolation)
* [Cache Invalidation Patterns](#cache-invalidation-patterns)
* [Production Lessons Learned](#production-lessons-learned)
* [Best Practices](#best-practices)

## Introduction

One of the hardest lessons I learned running my multi-tenant POS system was this: **databases are slow, and users are impatient**. When the Chatbot Service needed to aggregate data from 5 different services to answer "What are my top-selling products today?", response times hit 800ms. Customers complained about the lag.

The solution wasn't faster databases or more powerful servers—it was **strategic caching**. By introducing Redis as a caching layer, I reduced chatbot response times to 150ms (an 80% improvement) and cut database load by 70%.

In this article, I'll share how I implemented Redis caching across my POS architecture, from simple session management in the Auth Service to complex aggregation caching in the Chatbot. We'll cover cache-aside patterns, TTL strategies, invalidation techniques, and the critical multi-tenant isolation concerns.

## The Performance Problem

Before caching, here's what happened when a user asked the chatbot "Show me today's sales":

1. Chatbot → POS Core: Get all orders (200ms, query 500+ orders)
2. Chatbot → Payment: Get payment details (150ms, join payment records)
3. Chatbot → Inventory: Get product names (180ms, MongoDB query)
4. Chatbot → Restaurant: Get table assignments (120ms)
5. Chatbot: Aggregate and calculate (100ms, in-memory processing)

**Total: \~750ms** for a query that users ran dozens of times per day—with the **exact same results** for subsequent requests within the same hour.

This was wasteful. The data didn't change every second, yet we hit the database every time. Classic caching opportunity.

## Redis in the Auth Service

The Auth Service (port 4001) uses Redis for two purposes: **session storage** and **JWT token blacklisting**.

### Session Management

After a user logs in with email/password, I store their session in Redis instead of a database:

```python
# services/auth_service.py
import redis.asyncio as redis
from datetime import datetime, timedelta
import json
from typing import Optional

class SessionManager:
    """Manages user sessions in Redis."""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
        self.session_ttl = 3600  # 1 hour
    
    def _session_key(self, tenant_id: str, session_id: str) -> str:
        """Generate tenant-isolated session key."""
        return f"session:{tenant_id}:{session_id}"
    
    async def create_session(
        self,
        tenant_id: str,
        session_id: str,
        user_id: str,
        user_email: str,
        roles: list[str]
    ) -> bool:
        """
        Create a new session in Redis.
        Returns True if successful.
        """
        session_data = {
            "user_id": user_id,
            "user_email": user_email,
            "roles": roles,
            "tenant_id": tenant_id,
            "created_at": datetime.utcnow().isoformat(),
            "last_accessed": datetime.utcnow().isoformat()
        }
        
        key = self._session_key(tenant_id, session_id)
        
        # Store with TTL
        await self.redis.setex(
            key,
            self.session_ttl,
            json.dumps(session_data)
        )
        
        return True
    
    async def get_session(
        self,
        tenant_id: str,
        session_id: str
    ) -> Optional[dict]:
        """
        Retrieve session data and refresh TTL.
        Returns None if session expired or doesn't exist.
        """
        key = self._session_key(tenant_id, session_id)
        
        data = await self.redis.get(key)
        if not data:
            return None
        
        session = json.loads(data)
        
        # Update last accessed time and refresh TTL
        session["last_accessed"] = datetime.utcnow().isoformat()
        await self.redis.setex(
            key,
            self.session_ttl,
            json.dumps(session)
        )
        
        return session
    
    async def delete_session(self, tenant_id: str, session_id: str) -> bool:
        """Delete session (used for logout)."""
        key = self._session_key(tenant_id, session_id)
        result = await self.redis.delete(key)
        return result > 0
    
    async def extend_session(
        self,
        tenant_id: str,
        session_id: str,
        additional_seconds: int = 3600
    ) -> bool:
        """Extend session TTL (e.g., on user activity)."""
        key = self._session_key(tenant_id, session_id)
        
        # Check if exists
        if not await self.redis.exists(key):
            return False
        
        # Extend TTL
        await self.redis.expire(key, additional_seconds)
        return True
```

### JWT Blacklist

When users log out, their JWT is still valid until expiration. To handle this, I blacklist tokens in Redis:

```python
class TokenBlacklist:
    """Manages blacklisted JWT tokens."""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def _blacklist_key(self, jti: str) -> str:
        """Key for blacklisted token (jti = JWT ID)."""
        return f"blacklist:token:{jti}"
    
    async def blacklist_token(self, jti: str, expires_at: datetime) -> bool:
        """
        Add token to blacklist.
        TTL matches token expiration (no need to store expired tokens).
        """
        key = self._blacklist_key(jti)
        ttl = int((expires_at - datetime.utcnow()).total_seconds())
        
        if ttl > 0:
            await self.redis.setex(key, ttl, "1")
            return True
        return False
    
    async def is_blacklisted(self, jti: str) -> bool:
        """Check if token is blacklisted."""
        key = self._blacklist_key(jti)
        return await self.redis.exists(key) > 0
```

FastAPI middleware checks the blacklist on every request:

```python
# middleware/auth_middleware.py
from fastapi import Request, HTTPException
from jose import jwt

async def verify_token(request: Request):
    """Verify JWT and check blacklist."""
    token = request.headers.get("Authorization", "").replace("Bearer ", "")
    
    if not token:
        raise HTTPException(status_code=401, detail="Missing token")
    
    try:
        # Decode token
        payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
        jti = payload.get("jti")
        
        # Check blacklist
        if await token_blacklist.is_blacklisted(jti):
            raise HTTPException(status_code=401, detail="Token revoked")
        
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token expired")
```

This pattern gives me:

* **Instant logout**: Tokens are blacklisted immediately
* **Automatic cleanup**: Redis TTL removes expired tokens
* **Tenant isolation**: Session keys include tenant\_id

## Redis in the Chatbot Service

The Chatbot Service (port 4006) aggregates data from 5 services. Without caching, it was the slowest service in my architecture. Here's how I fixed it:

### Aggregation Cache

```python
# services/chatbot_cache.py
import redis.asyncio as redis
import json
from typing import Optional, Any
from datetime import timedelta

class ChatbotCache:
    """Caching layer for chatbot aggregations."""
    
    def __init__(self, redis_client: redis.Redis):
        self.redis = redis_client
    
    def _cache_key(self, tenant_id: str, query_type: str, *args) -> str:
        """Generate cache key for query."""
        key_parts = [f"chatbot:{tenant_id}:{query_type}"]
        key_parts.extend(str(arg) for arg in args)
        return ":".join(key_parts)
    
    async def get_cached_response(
        self,
        tenant_id: str,
        query_type: str,
        *args
    ) -> Optional[dict]:
        """Get cached chatbot response."""
        key = self._cache_key(tenant_id, query_type, *args)
        data = await self.redis.get(key)
        
        if data:
            return json.loads(data)
        return None
    
    async def cache_response(
        self,
        tenant_id: str,
        query_type: str,
        response: dict,
        ttl: int,
        *args
    ) -> bool:
        """Cache chatbot response with TTL."""
        key = self._cache_key(tenant_id, query_type, *args)
        await self.redis.setex(
            key,
            ttl,
            json.dumps(response)
        )
        return True
    
    async def invalidate_query(
        self,
        tenant_id: str,
        query_type: str,
        *args
    ) -> bool:
        """Invalidate specific cached query."""
        key = self._cache_key(tenant_id, query_type, *args)
        result = await self.redis.delete(key)
        return result > 0
    
    async def invalidate_tenant_cache(self, tenant_id: str) -> int:
        """Invalidate all cache for a tenant."""
        pattern = f"chatbot:{tenant_id}:*"
        keys = []
        
        # Scan for matching keys
        async for key in self.redis.scan_iter(match=pattern):
            keys.append(key)
        
        if keys:
            return await self.redis.delete(*keys)
        return 0
```

### Using the Cache

Here's how the chatbot uses this cache for "top selling products" queries:

```python
# services/chatbot_orchestrator.py
from datetime import datetime, date
import httpx

class ChatbotOrchestrator:
    """Orchestrates data from multiple services for chatbot queries."""
    
    def __init__(self, cache: ChatbotCache):
        self.cache = cache
    
    async def get_top_selling_products(
        self,
        tenant_id: str,
        date: date,
        limit: int = 10
    ) -> dict:
        """
        Get top selling products for a specific date.
        Uses cache with 1-hour TTL (data doesn't change minute-to-minute).
        """
        query_type = "top_products"
        cache_args = (date.isoformat(), limit)
        
        # Try cache first
        cached = await self.cache.get_cached_response(
            tenant_id,
            query_type,
            *cache_args
        )
        if cached:
            cached["from_cache"] = True
            return cached
        
        # Cache miss - aggregate from services
        # This is the slow path (800ms total)
        async with httpx.AsyncClient() as client:
            # Get orders for the date
            orders_resp = await client.get(
                f"http://localhost:4002/orders",
                params={"date": date.isoformat()},
                headers={"x-tenant-id": tenant_id}
            )
            orders = orders_resp.json()
            
            # Get product details from inventory
            product_ids = self._extract_product_ids(orders)
            products_resp = await client.get(
                f"http://localhost:4003/products",
                params={"ids": ",".join(product_ids)},
                headers={"x-tenant-id": tenant_id}
            )
            products = products_resp.json()
            
            # Get payment totals
            payments_resp = await client.get(
                f"http://localhost:4004/payments/totals",
                params={"date": date.isoformat()},
                headers={"x-tenant-id": tenant_id}
            )
            payments = payments_resp.json()
        
        # Aggregate data
        result = self._calculate_top_products(orders, products, payments, limit)
        result["from_cache"] = False
        result["generated_at"] = datetime.utcnow().isoformat()
        
        # Cache for 1 hour (3600 seconds)
        await self.cache.cache_response(
            tenant_id,
            query_type,
            result,
            ttl=3600,
            *cache_args
        )
        
        return result
    
    def _extract_product_ids(self, orders: list) -> list[str]:
        """Extract unique product IDs from orders."""
        product_ids = set()
        for order in orders:
            for item in order.get("items", []):
                product_ids.add(item["product_id"])
        return list(product_ids)
    
    def _calculate_top_products(
        self,
        orders: list,
        products: dict,
        payments: dict,
        limit: int
    ) -> dict:
        """Calculate top products by revenue and quantity."""
        product_stats = {}
        
        for order in orders:
            for item in order.get("items", []):
                pid = item["product_id"]
                if pid not in product_stats:
                    product_stats[pid] = {
                        "product_id": pid,
                        "name": products.get(pid, {}).get("name", "Unknown"),
                        "quantity_sold": 0,
                        "revenue": 0.0
                    }
                
                product_stats[pid]["quantity_sold"] += item["quantity"]
                product_stats[pid]["revenue"] += item["subtotal"]
        
        # Sort by revenue
        top_products = sorted(
            product_stats.values(),
            key=lambda x: x["revenue"],
            reverse=True
        )[:limit]
        
        return {
            "products": top_products,
            "total_products": len(product_stats)
        }
```

**Performance impact**:

* **First request**: 750ms (cache miss, aggregates from 5 services)
* **Subsequent requests**: 12ms (cache hit, just Redis lookup)
* **Cache hit rate**: 85% in production (same queries repeated throughout the day)

## Cache-Aside Pattern Implementation

The cache-aside (lazy loading) pattern I use follows this flow:

```python
async def get_data(key: str) -> dict:
    """Cache-aside pattern implementation."""
    # 1. Check cache
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)
    
    # 2. Cache miss - load from database
    data = await database.query(...)
    
    # 3. Store in cache
    await redis.setex(key, ttl=300, value=json.dumps(data))
    
    # 4. Return data
    return data
```

Here's a reusable decorator I built:

```python
# infrastructure/cache_decorator.py
from functools import wraps
import json
from typing import Callable

def cached(
    key_prefix: str,
    ttl: int,
    key_builder: Callable = None
):
    """
    Decorator for cache-aside pattern.
    
    Usage:
        @cached(key_prefix="product", ttl=300)
        async def get_product(tenant_id: str, product_id: str):
            return await db.get_product(tenant_id, product_id)
    """
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Build cache key
            if key_builder:
                cache_key = key_builder(*args, **kwargs)
            else:
                # Default: use function args
                key_parts = [key_prefix]
                key_parts.extend(str(arg) for arg in args)
                key_parts.extend(f"{k}:{v}" for k, v in kwargs.items())
                cache_key = ":".join(key_parts)
            
            # Try cache
            cached = await redis.get(cache_key)
            if cached:
                return json.loads(cached)
            
            # Execute function
            result = await func(*args, **kwargs)
            
            # Store in cache
            await redis.setex(cache_key, ttl, json.dumps(result))
            
            return result
        return wrapper
    return decorator


# Usage example
@cached(key_prefix="products", ttl=300)
async def get_product(tenant_id: str, product_id: str) -> dict:
    """Get product with automatic caching."""
    return await db.query(
        "SELECT * FROM products WHERE tenant_id = $1 AND id = $2",
        tenant_id,
        product_id
    )
```

## TTL Strategies

Different data types need different Time-To-Live (TTL) values. Here's what I learned works best:

```python
class CacheTTL:
    """Centralized TTL configuration."""
    
    # Session data - 1 hour, refreshed on activity
    SESSION = 3600
    
    # Product catalog - 5 minutes (frequent updates during business hours)
    PRODUCTS = 300
    
    # Daily sales reports - 1 hour (stable throughout the day)
    DAILY_REPORTS = 3600
    
    # User permissions - 15 minutes (security-sensitive)
    PERMISSIONS = 900
    
    # Top products aggregation - 1 hour (expensive query)
    TOP_PRODUCTS = 3600
    
    # Inventory count - 30 seconds (real-time critical)
    INVENTORY_COUNT = 30
    
    # Menu items - 1 day (rarely change)
    MENU = 86400
    
    # Tenant configuration - 1 hour (infrequent updates)
    TENANT_CONFIG = 3600


# Dynamic TTL based on time of day
def get_dynamic_ttl(query_type: str) -> int:
    """Adjust TTL based on business hours."""
    hour = datetime.utcnow().hour
    
    # During business hours (9 AM - 10 PM), use shorter TTL
    if 9 <= hour <= 22:
        return CacheTTL.PRODUCTS  # 5 minutes
    else:
        # Off-hours, data changes slowly
        return CacheTTL.PRODUCTS * 6  # 30 minutes
```

## Multi-Tenant Cache Isolation

**Critical lesson**: Always include `tenant_id` in cache keys to prevent data leakage between tenants.

```python
def build_cache_key(tenant_id: str, resource_type: str, resource_id: str) -> str:
    """
    Build cache key with tenant isolation.
    
    Format: {resource_type}:{tenant_id}:{resource_id}
    Example: product:acme_corp:prod_123
    """
    return f"{resource_type}:{tenant_id}:{resource_id}"


# Bad (vulnerable to cross-tenant access):
bad_key = f"product:{product_id}"  # No tenant isolation!

# Good (tenant-isolated):
good_key = f"product:{tenant_id}:{product_id}"
```

I also namespace by service:

```python
def build_service_cache_key(
    service: str,
    tenant_id: str,
    resource: str,
    resource_id: str
) -> str:
    """
    Service-namespaced cache key.
    Example: chatbot:acme_corp:top_products:2024-01-15
    """
    return f"{service}:{tenant_id}:{resource}:{resource_id}"
```

## Cache Invalidation Patterns

> "There are only two hard things in Computer Science: cache invalidation and naming things." — Phil Karlton

### Pattern 1: Event-Based Invalidation

Using the event bus from the previous article:

```python
# Event handler for cache invalidation
async def invalidate_product_cache(event: InventoryStockChanged):
    """Invalidate product cache when inventory changes."""
    cache_key = f"product:{event.tenant_id}:{event.product_id}"
    await redis.delete(cache_key)
    
    # Also invalidate aggregations
    await chatbot_cache.invalidate_query(
        event.tenant_id,
        "top_products",
        datetime.utcnow().date().isoformat()
    )

# Subscribe to events
event_bus.subscribe(InventoryStockChanged, invalidate_product_cache)
```

### Pattern 2: Write-Through Cache

Update cache and database simultaneously:

```python
async def update_product(tenant_id: str, product_id: str, data: dict):
    """Update product with write-through caching."""
    # Update database
    await db.update_product(tenant_id, product_id, data)
    
    # Update cache
    cache_key = f"product:{tenant_id}:{product_id}"
    await redis.setex(cache_key, CacheTTL.PRODUCTS, json.dumps(data))
```

### Pattern 3: Time-Based Invalidation

Let TTL handle invalidation for read-heavy, write-light data:

```python
# Menu items change rarely - just let them expire
@cached(key_prefix="menu", ttl=CacheTTL.MENU)  # 24 hours
async def get_menu(tenant_id: str, menu_id: str):
    return await db.get_menu(tenant_id, menu_id)
```

## Production Lessons Learned

### Lesson 1: Cache Stampede

During a deployment, all caches expired simultaneously. Thousands of requests hit the database at once, causing a 30-second outage.

**Solution**: Stagger TTLs with jitter:

```python
import random

def get_ttl_with_jitter(base_ttl: int, jitter_percent: float = 0.1) -> int:
    """Add random jitter to TTL to prevent stampede."""
    jitter = int(base_ttl * jitter_percent)
    return base_ttl + random.randint(-jitter, jitter)

# Usage
ttl = get_ttl_with_jitter(CacheTTL.PRODUCTS)  # 300 ± 30 seconds
```

### Lesson 2: Stale Data During Outages

When the Inventory Service was down, the cache served stale data, causing customers to order out-of-stock items.

**Solution**: Add health check to cache decorator:

```python
async def cached_with_health_check(key, ttl, fetch_func):
    """Only use cache if source service is healthy."""
    cached = await redis.get(key)
    
    # Check if source is healthy
    service_healthy = await check_service_health("inventory_service")
    
    if cached and service_healthy:
        return json.loads(cached)
    
    # Fetch fresh data
    data = await fetch_func()
    await redis.setex(key, ttl, json.dumps(data))
    return data
```

### Lesson 3: Memory Exhaustion

Redis memory filled up with cached aggregations, causing evictions of session data (critical).

**Solution**: Use different Redis instances or databases:

```python
# Redis DB 0: Critical data (sessions, auth)
redis_critical = redis.from_url("redis://localhost:6379/0")

# Redis DB 1: Cache data (can be evicted)
redis_cache = redis.from_url("redis://localhost:6379/1")

# Configure eviction policy for cache DB
# In redis.conf: maxmemory-policy allkeys-lru
```

## Best Practices

Based on production experience:

1. **Always include tenant\_id in keys** to prevent cross-tenant data leakage
2. **Use TTL jitter** to prevent cache stampede
3. **Separate critical from cacheable data** (different Redis instances/DBs)
4. **Monitor cache hit rates** - low hit rate means bad caching strategy
5. **Invalidate on events** for data that changes unpredictably
6. **Use write-through** for data that must be consistent
7. **Let TTL handle invalidation** for read-heavy, rarely changing data
8. **Add circuit breakers** - don't let cache failures take down your service

## Next Steps

Caching and session management are foundational to performant distributed systems. In my POS architecture, Redis reduced:

* Chatbot response time by 80% (800ms → 150ms)
* Database load by 70%
* Auth Service response time by 60%

In the next article, we'll explore **Integration & Orchestration Patterns**, where the Chatbot Service orchestrates calls to 5 downstream services—and how caching plays a crucial role in making that pattern fast and reliable.

***

*This is part of the Software Architecture 101 series, where I share lessons learned building a production multi-tenant POS system with 6 microservices.*
