ETL and ELT are the core patterns of data engineering. After building dozens of pipelines, I've learned when to use each approach. This article covers production pipeline patterns.
Idempotent pipelines can run multiple times without duplicating data.
Incremental Load Pattern
Change Data Capture (CDC)
Data Lineage
Pipeline Error Handling
Conclusion
ETL/ELT patterns form the backbone of data pipelines. Choose ETL for complex transformations, ELT for modern cloud warehouses. Always design for idempotency and incremental loads.
Key takeaways:
Use ELT for cloud warehouses, ETL for legacy systems
# Python 3.12 - ETL Pipeline
class ETLPipeline:
"""
Traditional ETL: transform before loading.
I use this when transformation logic is complex.
"""
def __init__(self, source_db, target_db):
self.source = source_db
self.target = target_db
def extract(self) -> pd.DataFrame:
"""Extract from source."""
query = """
SELECT * FROM transactions
WHERE created_at > :last_processed
"""
return pd.read_sql(query, self.source)
def transform(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transform data."""
# Clean
df = df.dropna(subset=['user_id', 'amount'])
# Enrich
df['total'] = df['amount'] * df['quantity']
# Aggregate
df_agg = df.groupby('user_id').agg({
'amount': 'sum',
'quantity': 'sum',
'transaction_id': 'count'
}).reset_index()
return df_agg
def load(self, df: pd.DataFrame):
"""Load to target."""
df.to_sql('user_metrics', self.target, if_exists='append', index=False)
def run(self):
"""Execute ETL pipeline."""
df = self.extract()
df = self.transform(df)
self.load(df)
# Python 3.12 - ELT Pipeline
class ELTPipeline:
"""
Modern ELT: leverage warehouse for transformations.
Faster and more scalable for big data.
"""
def __init__(self, source_db, warehouse_db):
self.source = source_db
self.warehouse = warehouse_db
def extract_and_load(self):
"""Extract and load raw data."""
# Read from source
df = pd.read_sql("SELECT * FROM transactions", self.source)
# Load raw to warehouse
df.to_sql('raw_transactions', self.warehouse, if_exists='append', index=False)
def transform_in_warehouse(self):
"""Run transformations in warehouse using SQL."""
with self.warehouse.connect() as conn:
conn.execute(text("""
INSERT INTO user_metrics
SELECT
user_id,
SUM(amount) as total_amount,
SUM(quantity) as total_quantity,
COUNT(*) as transaction_count
FROM raw_transactions
GROUP BY user_id
"""))
conn.commit()
def run(self):
"""Execute ELT pipeline."""
self.extract_and_load()
self.transform_in_warehouse()
# Python 3.12 - Idempotent pipeline
class IdempotentPipeline:
"""
Pipeline that's safe to re-run.
Critical for production reliability.
"""
def load_with_upsert(self, df: pd.DataFrame, table: str, key_cols: list):
"""
Upsert instead of append.
Prevents duplicates on re-runs.
"""
from sqlalchemy import text
# Create temp table
temp_table = f"{table}_temp"
df.to_sql(temp_table, self.engine, if_exists='replace', index=False)
# Merge (upsert) from temp to target
key_clause = ' AND '.join([f"t.{k} = s.{k}" for k in key_cols])
update_cols = [c for c in df.columns if c not in key_cols]
update_clause = ', '.join([f"{c} = s.{c}" for c in update_cols])
insert_cols = ', '.join(df.columns)
insert_vals = ', '.join([f"s.{c}" for c in df.columns])
merge_query = f"""
INSERT INTO {table} ({insert_cols})
SELECT {insert_vals}
FROM {temp_table} s
ON CONFLICT ({', '.join(key_cols)})
DO UPDATE SET {update_clause}
"""
with self.engine.connect() as conn:
conn.execute(text(merge_query))
conn.execute(text(f"DROP TABLE {temp_table}"))
conn.commit()
# Python 3.12 - Incremental loading
class IncrementalLoader:
"""
Load only new/changed data.
Essential for large datasets.
"""
def __init__(self, db_engine):
self.db = db_engine
self.metadata_table = 'pipeline_checkpoints'
def get_checkpoint(self, pipeline_name: str) -> datetime:
"""Get last processed timestamp."""
query = f"""
SELECT max_timestamp
FROM {self.metadata_table}
WHERE pipeline_name = :name
"""
result = pd.read_sql(
text(query),
self.db,
params={'name': pipeline_name}
)
return result.iloc[0, 0] if not result.empty else datetime(2020, 1, 1)
def update_checkpoint(self, pipeline_name: str, timestamp: datetime):
"""Update checkpoint."""
query = f"""
INSERT INTO {self.metadata_table} (pipeline_name, max_timestamp, updated_at)
VALUES (:name, :ts, :now)
ON CONFLICT (pipeline_name) DO UPDATE
SET max_timestamp = EXCLUDED.max_timestamp,
updated_at = EXCLUDED.updated_at
"""
with self.db.connect() as conn:
conn.execute(
text(query),
{'name': pipeline_name, 'ts': timestamp, 'now': datetime.utcnow()}
)
conn.commit()
def load_incremental(self, source_table: str, dest_table: str, timestamp_col: str):
"""Load only new records."""
# Get checkpoint
last_ts = self.get_checkpoint(f"{source_table}_to_{dest_table}")
# Extract new records
query = f"""
SELECT * FROM {source_table}
WHERE {timestamp_col} > :last_ts
ORDER BY {timestamp_col}
"""
df = pd.read_sql(text(query), self.db, params={'last_ts': last_ts})
if df.empty:
logging.info("No new records")
return
# Load
df.to_sql(dest_table, self.db, if_exists='append', index=False)
# Update checkpoint
max_ts = df[timestamp_col].max()
self.update_checkpoint(f"{source_table}_to_{dest_table}", max_ts)
logging.info(f"Loaded {len(df)} new records")
# Python 3.12 - CDC pattern
class CDCProcessor:
"""
Capture and process data changes.
Efficient for large tables with few changes.
"""
def __init__(self, source_db, target_db):
self.source = source_db
self.target = target_db
def extract_changes(self, table: str, last_sync: datetime) -> pd.DataFrame:
"""
Extract changed records.
Assumes source has updated_at column.
"""
query = f"""
SELECT *,
CASE
WHEN deleted_at IS NOT NULL THEN 'DELETE'
WHEN created_at > :last_sync THEN 'INSERT'
ELSE 'UPDATE'
END as change_type
FROM {table}
WHERE updated_at > :last_sync
"""
return pd.read_sql(text(query), self.source, params={'last_sync': last_sync})
def apply_changes(self, changes: pd.DataFrame, table: str):
"""Apply changes to target."""
for change_type in ['DELETE', 'UPDATE', 'INSERT']:
subset = changes[changes['change_type'] == change_type]
if subset.empty:
continue
if change_type == 'DELETE':
self._delete_records(subset, table)
elif change_type == 'UPDATE':
self._update_records(subset, table)
else: # INSERT
self._insert_records(subset, table)
def _delete_records(self, df: pd.DataFrame, table: str):
"""Delete records."""
ids = df['id'].tolist()
with self.target.connect() as conn:
conn.execute(
text(f"DELETE FROM {table} WHERE id IN :ids"),
{'ids': tuple(ids)}
)
conn.commit()
def _update_records(self, df: pd.DataFrame, table: str):
"""Update records."""
# Use upsert pattern
pass
def _insert_records(self, df: pd.DataFrame, table: str):
"""Insert new records."""
df.to_sql(table, self.target, if_exists='append', index=False)
# Python 3.12 - Track data lineage
from dataclasses import dataclass
from typing import List
@dataclass
class LineageNode:
"""Represents a dataset in lineage graph."""
name: str
type: str # 'source', 'transformation', 'target'
description: str
@dataclass
class LineageEdge:
"""Represents data flow."""
from_node: str
to_node: str
transformation: str
class DataLineageTracker:
"""
Track data lineage for compliance and debugging.
Helped me trace data quality issues many times.
"""
def __init__(self):
self.nodes: List[LineageNode] = []
self.edges: List[LineageEdge] = []
def add_source(self, name: str, description: str):
"""Register data source."""
self.nodes.append(LineageNode(name, 'source', description))
def add_transformation(self, name: str, description: str):
"""Register transformation."""
self.nodes.append(LineageNode(name, 'transformation', description))
def add_target(self, name: str, description: str):
"""Register target dataset."""
self.nodes.append(LineageNode(name, 'target', description))
def add_flow(self, from_node: str, to_node: str, transformation: str):
"""Register data flow."""
self.edges.append(LineageEdge(from_node, to_node, transformation))
def get_upstream(self, node_name: str) -> List[str]:
"""Get all upstream dependencies."""
upstream = []
for edge in self.edges:
if edge.to_node == node_name:
upstream.append(edge.from_node)
# Recursively get upstream of upstream
upstream.extend(self.get_upstream(edge.from_node))
return list(set(upstream))
def export_lineage(self) -> dict:
"""Export lineage for visualization."""
return {
'nodes': [
{'id': n.name, 'type': n.type, 'desc': n.description}
for n in self.nodes
],
'edges': [
{'from': e.from_node, 'to': e.to_node, 'transform': e.transformation}
for e in self.edges
]
}
# Usage
lineage = DataLineageTracker()
lineage.add_source('raw_transactions', 'Raw transaction data from API')
lineage.add_transformation('clean_transactions', 'Clean and validate')
lineage.add_target('user_metrics', 'Aggregated user metrics')
lineage.add_flow('raw_transactions', 'clean_transactions', 'Data cleaning')
lineage.add_flow('clean_transactions', 'user_metrics', 'Aggregation')
# Python 3.12 - Robust error handling
class ResilientPipeline:
"""
Pipeline with comprehensive error handling.
Learned these patterns from production failures.
"""
def __init__(self):
self.errors = []
self.metrics = {
'records_processed': 0,
'records_failed': 0,
'start_time': None,
'end_time': None
}
def run(self):
"""Run pipeline with error handling."""
self.metrics['start_time'] = datetime.utcnow()
try:
# Extract
data = self._extract_with_retry()
# Transform with error tracking
transformed = self._transform_with_error_tracking(data)
# Load with transaction
self._load_with_transaction(transformed)
self._log_success()
except Exception as e:
self._handle_failure(e)
raise
finally:
self.metrics['end_time'] = datetime.utcnow()
self._send_metrics()
def _extract_with_retry(self) -> pd.DataFrame:
"""Extract with exponential backoff."""
max_retries = 3
for attempt in range(max_retries):
try:
return self.extract()
except Exception as e:
if attempt == max_retries - 1:
raise
wait_time = 2 ** attempt
logging.warning(f"Extract failed, retry in {wait_time}s: {e}")
time.sleep(wait_time)
def _transform_with_error_tracking(self, df: pd.DataFrame) -> pd.DataFrame:
"""Transform and track row-level errors."""
results = []
for idx, row in df.iterrows():
try:
transformed = self.transform_row(row)
results.append(transformed)
self.metrics['records_processed'] += 1
except Exception as e:
self.errors.append({
'row_id': row.get('id'),
'error': str(e),
'timestamp': datetime.utcnow()
})
self.metrics['records_failed'] += 1
# Continue processing other rows
continue
return pd.DataFrame(results)
def _load_with_transaction(self, df: pd.DataFrame):
"""Load within database transaction."""
with self.db.connect() as conn:
trans = conn.begin()
try:
df.to_sql('target_table', conn, if_exists='append', index=False)
trans.commit()
except Exception as e:
trans.rollback()
raise
def _handle_failure(self, error: Exception):
"""Handle pipeline failure."""
logging.error(f"Pipeline failed: {error}")
# Save error state
self._save_error_state()
# Alert team
self._send_alert(f"Pipeline failed: {error}")
def _send_alert(self, message: str):
"""Send alert to team."""
# In production: PagerDuty, Slack, email
logging.critical(f"ALERT: {message}")