This final article brings everything together in a complete real-world project. I'll walk through building an end-to-end data pipeline for e-commerce analyticsβsomething I've built variations of multiple times in production.
This isn't a toy example. It's a realistic implementation using Python 3.12 that processes orders, customers, and product data to power analytics dashboards and ML models. We'll cover extraction from multiple sources, transformation with data quality checks, loading to a warehouse, and orchestration with Airflow.
Project Overview
Business Requirements
Our e-commerce company needs:
Daily sales reports: Revenue, orders, popular products
Data Sources:
βββ PostgreSQL (transactional database)
β βββ orders table
β βββ customers table
β βββ products table
βββ Stripe API (payment data)
βββ Google Analytics API (web events)
Data Lake (S3):
βββ raw/ (landed data)
βββ staging/ (cleaned data)
βββ processed/ (aggregated data)
Data Warehouse (Snowflake):
βββ fact_orders
βββ dim_customers
βββ dim_products
βββ agg_daily_sales
Orchestration:
βββ Apache Airflow (scheduled DAGs)
Analytics:
βββ Dashboards (Tableau/Metabase)
βββ ML Models (customer churn prediction)
Project Setup
Configuration
Data Models
Data Extraction
Data Transformation
Data Quality Checks
Data Loading
Airflow DAG
Analytics Queries
Once data is in Snowflake, analysts can run queries:
Key Takeaways
This real-world project demonstrates:
End-to-end pipeline: Extract, transform, load with proper orchestration
Data quality: Validation checks at every stage
Scalable architecture: S3 data lake + Snowflake warehouse
Production practices: Logging, error handling, monitoring
Real business value: Powers dashboards and analytics
You now have a template for building production data pipelines. The patterns hereβmodular code, data quality checks, proper orchestrationβapply to any data engineering project.
# models/schemas.py
from pydantic import BaseModel, EmailStr, Field
from datetime import datetime, date
from decimal import Decimal
from typing import Optional
class Order(BaseModel):
"""Order data model"""
order_id: int
customer_id: int
order_date: datetime
status: str # pending, processing, shipped, delivered, cancelled
total_amount: Decimal
currency: str = 'USD'
payment_method: str
shipping_address: str
created_at: datetime
updated_at: datetime
class Customer(BaseModel):
"""Customer data model"""
customer_id: int
email: EmailStr
first_name: str
last_name: str
registration_date: date
country: str
segment: Optional[str] = None # high_value, medium_value, low_value
total_lifetime_value: Decimal = Decimal('0')
created_at: datetime
updated_at: datetime
class Product(BaseModel):
"""Product data model"""
product_id: int
product_name: str
category: str
price: Decimal
cost: Decimal
stock_quantity: int
reorder_level: int = 10
created_at: datetime
updated_at: datetime
class OrderItem(BaseModel):
"""Order line item model"""
order_item_id: int
order_id: int
product_id: int
quantity: int
unit_price: Decimal
total_price: Decimal
# extractors/postgres_extractor.py
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime, timedelta
from typing import Optional
import logging
logger = logging.getLogger(__name__)
class PostgresExtractor:
"""Extract data from PostgreSQL database"""
def __init__(self, connection_string: str):
"""
Initialize PostgreSQL extractor.
Args:
connection_string: SQLAlchemy connection string
"""
self.engine = create_engine(connection_string)
def extract_orders(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
) -> pd.DataFrame:
"""
Extract orders within date range.
Args:
start_date: Start date (inclusive)
end_date: End date (inclusive)
Returns:
DataFrame with orders
"""
query = """
SELECT
o.order_id,
o.customer_id,
o.order_date,
o.status,
o.total_amount,
o.currency,
o.payment_method,
o.shipping_address,
o.created_at,
o.updated_at
FROM orders o
WHERE 1=1
"""
params = {}
if start_date:
query += " AND o.order_date >= :start_date"
params['start_date'] = start_date
if end_date:
query += " AND o.order_date <= :end_date"
params['end_date'] = end_date
query += " ORDER BY o.order_date DESC"
logger.info(f"Extracting orders from {start_date} to {end_date}")
with self.engine.connect() as conn:
df = pd.read_sql(text(query), conn, params=params)
logger.info(f"Extracted {len(df)} orders")
return df
def extract_order_items(self, order_ids: list[int]) -> pd.DataFrame:
"""
Extract order line items for given order IDs.
Args:
order_ids: List of order IDs
Returns:
DataFrame with order items
"""
if not order_ids:
return pd.DataFrame()
query = """
SELECT
oi.order_item_id,
oi.order_id,
oi.product_id,
oi.quantity,
oi.unit_price,
oi.total_price,
p.product_name,
p.category
FROM order_items oi
JOIN products p ON oi.product_id = p.product_id
WHERE oi.order_id = ANY(:order_ids)
"""
with self.engine.connect() as conn:
df = pd.read_sql(
text(query),
conn,
params={'order_ids': order_ids}
)
logger.info(f"Extracted {len(df)} order items")
return df
def extract_customers(self) -> pd.DataFrame:
"""Extract all customers"""
query = """
SELECT
customer_id,
email,
first_name,
last_name,
registration_date,
country,
created_at,
updated_at
FROM customers
"""
with self.engine.connect() as conn:
df = pd.read_sql(query, conn)
logger.info(f"Extracted {len(df)} customers")
return df
def extract_products(self) -> pd.DataFrame:
"""Extract all products"""
query = """
SELECT
product_id,
product_name,
category,
price,
cost,
stock_quantity,
reorder_level,
created_at,
updated_at
FROM products
"""
with self.engine.connect() as conn:
df = pd.read_sql(query, conn)
logger.info(f"Extracted {len(df)} products")
return df
# transformers/order_transformer.py
import pandas as pd
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
class OrderTransformer:
"""Transform order data"""
def transform(self, orders_df: pd.DataFrame, order_items_df: pd.DataFrame) -> pd.DataFrame:
"""
Transform orders and order items into fact table.
Args:
orders_df: Orders DataFrame
order_items_df: Order items DataFrame
Returns:
Transformed fact_orders DataFrame
"""
logger.info("Starting order transformation")
# Merge orders with order items
fact_orders = orders_df.merge(
order_items_df,
on='order_id',
how='inner'
)
# Calculate derived fields
fact_orders['profit'] = (
fact_orders['total_price'] -
(fact_orders['quantity'] * fact_orders.get('cost', 0))
)
# Extract date dimensions
fact_orders['order_year'] = pd.to_datetime(fact_orders['order_date']).dt.year
fact_orders['order_month'] = pd.to_datetime(fact_orders['order_date']).dt.month
fact_orders['order_day'] = pd.to_datetime(fact_orders['order_date']).dt.day
fact_orders['order_dayofweek'] = pd.to_datetime(fact_orders['order_date']).dt.dayofweek
# Create composite keys
fact_orders['order_date_key'] = pd.to_datetime(
fact_orders['order_date']
).dt.strftime('%Y%m%d').astype(int)
# Select final columns
final_columns = [
'order_id',
'order_item_id',
'customer_id',
'product_id',
'order_date',
'order_date_key',
'order_year',
'order_month',
'order_day',
'order_dayofweek',
'quantity',
'unit_price',
'total_price',
'profit',
'status',
'payment_method',
'category',
'product_name'
]
fact_orders = fact_orders[final_columns]
logger.info(f"Transformed {len(fact_orders)} order records")
return fact_orders
class CustomerTransformer:
"""Transform customer data with segmentation"""
def transform(
self,
customers_df: pd.DataFrame,
orders_df: pd.DataFrame
) -> pd.DataFrame:
"""
Transform customers with RFM segmentation.
Args:
customers_df: Customers DataFrame
orders_df: Orders DataFrame
Returns:
Transformed dim_customers DataFrame
"""
logger.info("Starting customer transformation")
# Calculate customer metrics
customer_metrics = orders_df.groupby('customer_id').agg({
'order_id': 'count',
'total_amount': 'sum',
'order_date': 'max'
}).reset_index()
customer_metrics.columns = [
'customer_id',
'total_orders',
'total_lifetime_value',
'last_order_date'
]
# Calculate recency (days since last order)
customer_metrics['recency_days'] = (
datetime.now() - pd.to_datetime(customer_metrics['last_order_date'])
).dt.days
# Merge with customer data
dim_customers = customers_df.merge(
customer_metrics,
on='customer_id',
how='left'
)
# Fill nulls for customers with no orders
dim_customers['total_orders'] = dim_customers['total_orders'].fillna(0)
dim_customers['total_lifetime_value'] = dim_customers['total_lifetime_value'].fillna(0)
# Segment customers based on RFM
dim_customers['segment'] = dim_customers.apply(
self._calculate_segment,
axis=1
)
logger.info(f"Transformed {len(dim_customers)} customers")
return dim_customers
def _calculate_segment(self, row) -> str:
"""Calculate customer segment based on RFM"""
if row['total_orders'] == 0:
return 'inactive'
elif row['recency_days'] > 180:
return 'at_risk'
elif row['total_lifetime_value'] > 1000 and row['total_orders'] > 5:
return 'high_value'
elif row['total_lifetime_value'] > 500:
return 'medium_value'
else:
return 'low_value'
# quality/validators.py
import pandas as pd
from typing import List, Dict, Any
import great_expectations as gx
import logging
logger = logging.getLogger(__name__)
class DataQualityValidator:
"""Validate data quality using Great Expectations"""
def __init__(self):
"""Initialize validator"""
self.context = gx.get_context()
def validate_orders(self, df: pd.DataFrame) -> bool:
"""
Validate orders data quality.
Args:
df: Orders DataFrame
Returns:
True if validation passes, False otherwise
"""
logger.info("Validating orders data quality")
# Create in-memory datasource
datasource = self.context.sources.add_pandas("orders_source")
data_asset = datasource.add_dataframe_asset(name="orders")
batch_request = data_asset.build_batch_request(dataframe=df)
# Create expectation suite
suite = self.context.add_expectation_suite("orders_suite")
validator = self.context.get_validator(
batch_request=batch_request,
expectation_suite_name="orders_suite"
)
# Define expectations
validator.expect_table_row_count_to_be_between(min_value=1)
validator.expect_column_values_to_not_be_null(column="order_id")
validator.expect_column_values_to_be_unique(column="order_id")
validator.expect_column_values_to_not_be_null(column="customer_id")
validator.expect_column_values_to_not_be_null(column="order_date")
validator.expect_column_values_to_be_between(
column="total_amount",
min_value=0,
mostly=0.99
)
validator.expect_column_values_to_be_in_set(
column="status",
value_set=['pending', 'processing', 'shipped', 'delivered', 'cancelled']
)
# Run validation
results = validator.validate()
if results['success']:
logger.info("β Orders data quality validation passed")
return True
else:
logger.error("β Orders data quality validation failed")
for result in results['results']:
if not result['success']:
logger.error(f" - {result['expectation_config']['expectation_type']}")
return False
# loaders/snowflake_loader.py
import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import logging
logger = logging.getLogger(__name__)
class SnowflakeLoader:
"""Load data to Snowflake warehouse"""
def __init__(
self,
account: str,
user: str,
password: str,
warehouse: str,
database: str,
schema: str
):
"""Initialize Snowflake loader"""
self.connection_params = {
'account': account,
'user': user,
'password': password,
'warehouse': warehouse,
'database': database,
'schema': schema
}
def load_fact_orders(self, df: pd.DataFrame) -> None:
"""
Load fact_orders table with upsert logic.
Args:
df: Orders DataFrame
"""
logger.info(f"Loading {len(df)} records to fact_orders")
with snowflake.connector.connect(**self.connection_params) as conn:
# Create staging table
staging_table = 'fact_orders_staging'
# Write to staging
success, _, nrows, _ = write_pandas(
conn=conn,
df=df,
table_name=staging_table,
database=self.connection_params['database'],
schema=self.connection_params['schema'],
overwrite=True
)
if not success:
raise Exception("Failed to write to staging table")
logger.info(f"Wrote {nrows} rows to staging table")
# Merge into target table
merge_query = f"""
MERGE INTO fact_orders AS target
USING {staging_table} AS source
ON target.order_id = source.order_id
AND target.order_item_id = source.order_item_id
WHEN MATCHED THEN
UPDATE SET
status = source.status,
total_price = source.total_price,
profit = source.profit
WHEN NOT MATCHED THEN
INSERT (
order_id, order_item_id, customer_id, product_id,
order_date, order_date_key, order_year, order_month,
order_day, order_dayofweek, quantity, unit_price,
total_price, profit, status, payment_method,
category, product_name
)
VALUES (
source.order_id, source.order_item_id, source.customer_id,
source.product_id, source.order_date, source.order_date_key,
source.order_year, source.order_month, source.order_day,
source.order_dayofweek, source.quantity, source.unit_price,
source.total_price, source.profit, source.status,
source.payment_method, source.category, source.product_name
)
"""
cursor = conn.cursor()
cursor.execute(merge_query)
rows_inserted = cursor.fetchone()[0]
rows_updated = cursor.fetchone()[1]
logger.info(f"Inserted {rows_inserted} rows, updated {rows_updated} rows")
def load_dim_customers(self, df: pd.DataFrame) -> None:
"""Load dim_customers dimension table"""
logger.info(f"Loading {len(df)} customers to dim_customers")
with snowflake.connector.connect(**self.connection_params) as conn:
# SCD Type 2 implementation
# ... similar merge logic ...
pass
# airflow/dags/ecommerce_daily_etl.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from datetime import timedelta, datetime
import sys
sys.path.insert(0, '/opt/airflow/dags/repo')
from config.settings import settings
from extractors.postgres_extractor import PostgresExtractor
from transformers.order_transformer import OrderTransformer, CustomerTransformer
from quality.validators import DataQualityValidator
from loaders.snowflake_loader import SnowflakeLoader
import logging
logger = logging.getLogger(__name__)
# Default arguments
default_args = {
'owner': 'data-engineering',
'depends_on_past': False,
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5),
}
# Create DAG
dag = DAG(
'ecommerce_daily_etl',
default_args=default_args,
description='Daily E-Commerce ETL Pipeline',
schedule_interval='0 2 * * *', # Run at 2 AM daily
start_date=days_ago(1),
catchup=False,
tags=['ecommerce', 'etl'],
)
def extract_orders(**context):
"""Extract orders from PostgreSQL"""
logger.info("Extracting orders")
extractor = PostgresExtractor(
connection_string=f"postgresql://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}"
)
# Extract last 7 days
end_date = datetime.now()
start_date = end_date - timedelta(days=settings.LOOKBACK_DAYS)
orders_df = extractor.extract_orders(start_date, end_date)
# Save to XCom
context['task_instance'].xcom_push(key='orders_count', value=len(orders_df))
# Save to S3 raw zone
orders_df.to_parquet(
f's3://{settings.AWS_S3_BUCKET}/raw/orders/dt={end_date.date()}/orders.parquet',
index=False
)
logger.info(f"Extracted {len(orders_df)} orders")
def transform_orders(**context):
"""Transform orders data"""
logger.info("Transforming orders")
# Read from S3
import pandas as pd
execution_date = context['execution_date'].date()
orders_df = pd.read_parquet(
f's3://{settings.AWS_S3_BUCKET}/raw/orders/dt={execution_date}/orders.parquet'
)
order_items_df = pd.read_parquet(
f's3://{settings.AWS_S3_BUCKET}/raw/order_items/dt={execution_date}/order_items.parquet'
)
# Transform
transformer = OrderTransformer()
fact_orders = transformer.transform(orders_df, order_items_df)
# Save to S3 staging zone
fact_orders.to_parquet(
f's3://{settings.AWS_S3_BUCKET}/staging/fact_orders/dt={execution_date}/fact_orders.parquet',
index=False
)
logger.info(f"Transformed {len(fact_orders)} order records")
def validate_data_quality(**context):
"""Validate data quality"""
logger.info("Validating data quality")
import pandas as pd
execution_date = context['execution_date'].date()
fact_orders = pd.read_parquet(
f's3://{settings.AWS_S3_BUCKET}/staging/fact_orders/dt={execution_date}/fact_orders.parquet'
)
validator = DataQualityValidator()
if not validator.validate_orders(fact_orders):
raise ValueError("Data quality validation failed!")
logger.info("Data quality validation passed")
def load_to_snowflake(**context):
"""Load data to Snowflake"""
logger.info("Loading to Snowflake")
import pandas as pd
execution_date = context['execution_date'].date()
fact_orders = pd.read_parquet(
f's3://{settings.AWS_S3_BUCKET}/staging/fact_orders/dt={execution_date}/fact_orders.parquet'
)
loader = SnowflakeLoader(
account=settings.SNOWFLAKE_ACCOUNT,
user=settings.SNOWFLAKE_USER,
password=settings.SNOWFLAKE_PASSWORD,
warehouse=settings.SNOWFLAKE_WAREHOUSE,
database=settings.SNOWFLAKE_DATABASE,
schema=settings.SNOWFLAKE_SCHEMA
)
loader.load_fact_orders(fact_orders)
logger.info("Loaded data to Snowflake")
# Define tasks
extract_task = PythonOperator(
task_id='extract_orders',
python_callable=extract_orders,
dag=dag,
)
transform_task = PythonOperator(
task_id='transform_orders',
python_callable=transform_orders,
dag=dag,
)
validate_task = PythonOperator(
task_id='validate_data_quality',
python_callable=validate_data_quality,
dag=dag,
)
load_task = PythonOperator(
task_id='load_to_snowflake',
python_callable=load_to_snowflake,
dag=dag,
)
# Define dependencies
extract_task >> transform_task >> validate_task >> load_task
-- Daily revenue trend
SELECT
order_date,
COUNT(DISTINCT order_id) as orders,
SUM(total_price) as revenue,
SUM(profit) as profit,
AVG(total_price) as avg_order_value
FROM fact_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY order_date
ORDER BY order_date DESC;
-- Top products by revenue
SELECT
product_name,
category,
COUNT(DISTINCT order_id) as orders,
SUM(quantity) as units_sold,
SUM(total_price) as revenue,
SUM(profit) as profit
FROM fact_orders
WHERE order_date >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY product_name, category
ORDER BY revenue DESC
LIMIT 20;
-- Customer segmentation analysis
SELECT
segment,
COUNT(DISTINCT customer_id) as customers,
AVG(total_lifetime_value) as avg_ltv,
AVG(total_orders) as avg_orders
FROM dim_customers
GROUP BY segment
ORDER BY avg_ltv DESC;