Let me walk you through designing a production-grade URL shortenerβa system I built that handles millions of redirects daily. This case study applies all the concepts from previous articles: scalability, caching, databases, APIs, and more.
Congratulations! You've completed System Design 101. You now have the knowledge to design scalable, reliable, and maintainable distributed systems. Keep learning, keep building, and remember: good system design is about understanding trade-offs.
-- PostgreSQL schema
CREATE TABLE urls (
id BIGSERIAL PRIMARY KEY,
short_code VARCHAR(10) UNIQUE NOT NULL,
original_url TEXT NOT NULL,
user_id UUID,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
expires_at TIMESTAMP,
is_active BOOLEAN DEFAULT true,
CONSTRAINT valid_url CHECK (original_url ~ '^https?://'),
INDEX idx_short_code (short_code),
INDEX idx_user_id (user_id),
INDEX idx_created_at (created_at)
);
CREATE TABLE analytics (
id BIGSERIAL PRIMARY KEY,
short_code VARCHAR(10) NOT NULL,
clicked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address INET,
user_agent TEXT,
referrer TEXT,
country VARCHAR(2),
INDEX idx_short_code_time (short_code, clicked_at)
);
import string
import random
from hashlib import md5
class ShortCodeGenerator:
"""
Generate unique short codes for URLs.
Uses base62 encoding for compact representation.
"""
def __init__(self):
self.alphabet = string.ascii_letters + string.digits # a-zA-Z0-9
self.base = len(self.alphabet)
def encode(self, num: int) -> str:
"""Convert number to base62 string."""
if num == 0:
return self.alphabet[0]
result = []
while num > 0:
result.append(self.alphabet[num % self.base])
num //= self.base
return ''.join(reversed(result))
def decode(self, short_code: str) -> int:
"""Convert base62 string back to number."""
num = 0
for char in short_code:
num = num * self.base + self.alphabet.index(char)
return num
def generate_from_id(self, url_id: int) -> str:
"""Generate short code from database ID."""
return self.encode(url_id).zfill(6) # Min 6 characters
def generate_random(self, length: int = 6) -> str:
"""Generate random short code."""
return ''.join(random.choices(self.alphabet, k=length))
def generate_from_url(self, url: str) -> str:
"""Generate short code from URL hash (for custom codes)."""
hash_value = md5(url.encode()).hexdigest()
num = int(hash_value, 16)
return self.encode(num)[:6]
generator = ShortCodeGenerator()
from fastapi import FastAPI, HTTPException, Request, Depends
from fastapi.responses import RedirectResponse
from pydantic import BaseModel, HttpUrl
from datetime import datetime, timedelta
import redis
import asyncpg
app = FastAPI(title="URL Shortener")
# Database connections
redis_client = redis.Redis(host='redis', port=6379, decode_responses=True)
postgres_pool = None
class ShortenRequest(BaseModel):
url: HttpUrl
custom_code: str = None
expires_in_days: int = None
class URLShortener:
"""URL shortening service with caching."""
def __init__(self, db_pool, cache, code_generator):
self.db = db_pool
self.cache = cache
self.generator = code_generator
self.cache_ttl = 3600 * 24 # 24 hours
async def shorten_url(
self,
url: str,
user_id: str = None,
custom_code: str = None,
expires_in_days: int = None
) -> dict:
"""Create shortened URL."""
# Check if custom code already exists
if custom_code:
existing = await self.get_url(custom_code)
if existing:
raise ValueError(f"Custom code '{custom_code}' already in use")
# Calculate expiration
expires_at = None
if expires_in_days:
expires_at = datetime.utcnow() + timedelta(days=expires_in_days)
async with self.db.acquire() as conn:
# Insert URL
row = await conn.fetchrow("""
INSERT INTO urls (original_url, short_code, user_id, expires_at)
VALUES ($1, $2, $3, $4)
RETURNING id, short_code
""", url, custom_code or "temp", user_id, expires_at)
url_id = row['id']
# Generate short code from ID if not custom
if not custom_code:
short_code = self.generator.generate_from_id(url_id)
await conn.execute("""
UPDATE urls SET short_code = $1 WHERE id = $2
""", short_code, url_id)
else:
short_code = custom_code
# Cache the mapping
await self._cache_url(short_code, url, expires_at)
return {
"short_code": short_code,
"short_url": f"https://short.ly/{short_code}",
"original_url": url,
"created_at": datetime.utcnow().isoformat(),
"expires_at": expires_at.isoformat() if expires_at else None
}
async def get_url(self, short_code: str) -> dict:
"""Get original URL from short code."""
# Try cache first
cached = self.cache.get(f"url:{short_code}")
if cached:
return {
"original_url": cached,
"from_cache": True
}
# Query database
async with self.db.acquire() as conn:
row = await conn.fetchrow("""
SELECT original_url, expires_at, is_active
FROM urls
WHERE short_code = $1
""", short_code)
if not row:
return None
# Check if expired
if row['expires_at'] and row['expires_at'] < datetime.utcnow():
return None
if not row['is_active']:
return None
url = row['original_url']
# Cache for future requests
await self._cache_url(short_code, url, row['expires_at'])
return {
"original_url": url,
"from_cache": False
}
async def _cache_url(self, short_code: str, url: str, expires_at: datetime = None):
"""Cache URL mapping."""
ttl = self.cache_ttl
# Use shorter TTL if URL expires soon
if expires_at:
seconds_until_expiry = (expires_at - datetime.utcnow()).total_seconds()
if seconds_until_expiry < ttl:
ttl = int(seconds_until_expiry)
self.cache.setex(f"url:{short_code}", ttl, url)
async def track_click(
self,
short_code: str,
ip_address: str,
user_agent: str,
referrer: str = None
):
"""Track URL click asynchronously."""
# Publish to Kafka for async processing
event = {
"short_code": short_code,
"clicked_at": datetime.utcnow().isoformat(),
"ip_address": ip_address,
"user_agent": user_agent,
"referrer": referrer
}
await kafka_producer.send("url_clicks", event)
# Initialize service
url_shortener = URLShortener(postgres_pool, redis_client, generator)
@app.post("/api/shorten", status_code=201)
async def create_short_url(request: ShortenRequest, user_id: str = Depends(get_current_user_id)):
"""Create shortened URL."""
try:
result = await url_shortener.shorten_url(
url=str(request.url),
user_id=user_id,
custom_code=request.custom_code,
expires_in_days=request.expires_in_days
)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/{short_code}")
async def redirect_url(short_code: str, request: Request):
"""Redirect to original URL."""
# Get URL
result = await url_shortener.get_url(short_code)
if not result:
raise HTTPException(status_code=404, detail="URL not found or expired")
# Track click (async - don't block redirect)
await url_shortener.track_click(
short_code=short_code,
ip_address=request.client.host,
user_agent=request.headers.get("user-agent", ""),
referrer=request.headers.get("referer")
)
# Redirect
return RedirectResponse(
url=result["original_url"],
status_code=301 # Permanent redirect
)
from kafka import KafkaConsumer
import clickhouse_connect
class AnalyticsProcessor:
"""
Process click events and store in ClickHouse.
ClickHouse is optimized for analytical queries.
"""
def __init__(self, clickhouse_client, kafka_consumer):
self.ch = clickhouse_client
self.consumer = kafka_consumer
self._create_tables()
def _create_tables(self):
"""Create ClickHouse tables."""
self.ch.command("""
CREATE TABLE IF NOT EXISTS url_clicks (
short_code String,
clicked_at DateTime,
ip_address String,
user_agent String,
referrer String,
country String,
city String
) ENGINE = MergeTree()
ORDER BY (short_code, clicked_at)
""")
async def process_events(self):
"""Consume and process click events."""
batch = []
batch_size = 1000
for message in self.consumer:
event = message.value
# Enrich with geolocation
geo = await self.get_geolocation(event['ip_address'])
event['country'] = geo.get('country', '')
event['city'] = geo.get('city', '')
batch.append(event)
# Insert batch
if len(batch) >= batch_size:
self._insert_batch(batch)
batch = []
def _insert_batch(self, events: list):
"""Insert batch of events into ClickHouse."""
self.ch.insert('url_clicks', events)
async def get_analytics(self, short_code: str, days: int = 30):
"""Get analytics for URL."""
result = self.ch.query(f"""
SELECT
toDate(clicked_at) as date,
count() as clicks,
uniq(ip_address) as unique_visitors,
topK(5)(country) as top_countries,
topK(5)(referrer) as top_referrers
FROM url_clicks
WHERE short_code = '{short_code}'
AND clicked_at >= now() - INTERVAL {days} DAY
GROUP BY date
ORDER BY date DESC
""")
return result.result_rows
# Layer 1: CDN (CloudFlare)
# - Cache redirects for popular URLs
# - 99% of requests served from edge
# Layer 2: Redis (Application cache)
# - Cache URL mappings
# - 95% hit rate
# Layer 3: Database read replicas
# - Distribute read load
# - Eventual consistency acceptable for reads
-- Partitioning by creation date
CREATE TABLE urls_2026_01 PARTITION OF urls
FOR VALUES FROM ('2026-01-01') TO ('2026-02-01');
-- Index for fast lookups
CREATE INDEX CONCURRENTLY idx_short_code_hash
ON urls USING hash (short_code);
-- Materialized view for analytics
CREATE MATERIALIZED VIEW url_stats AS
SELECT
short_code,
COUNT(*) as total_clicks,
COUNT(DISTINCT ip_address) as unique_visitors,
MAX(clicked_at) as last_clicked
FROM analytics
GROUP BY short_code;