Workflow Orchestration
Introduction
Airflow Fundamentals
DAG (Directed Acyclic Graph)
# Python 3.12 - Basic DAG
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-eng',
'depends_on_past': False,
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'user_metrics_pipeline',
default_args=default_args,
description='Calculate daily user metrics',
schedule='0 2 * * *', # Run at 2 AM daily
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['metrics', 'users'],
) as dag:
def extract_data(**context):
"""Extract user data."""
execution_date = context['ds']
logging.info(f"Extracting data for {execution_date}")
# Extraction logic
return {'records': 1000}
def transform_data(**context):
"""Transform extracted data."""
ti = context['ti']
extract_result = ti.xcom_pull(task_ids='extract')
logging.info(f"Transforming {extract_result['records']} records")
# Transformation logic
return {'records': 950}
def load_data(**context):
"""Load to warehouse."""
ti = context['ti']
transform_result = ti.xcom_pull(task_ids='transform')
logging.info(f"Loading {transform_result['records']} records")
# Load logic
# Define tasks
extract = PythonOperator(
task_id='extract',
python_callable=extract_data,
)
transform = PythonOperator(
task_id='transform',
python_callable=transform_data,
)
load = PythonOperator(
task_id='load',
python_callable=load_data,
)
# Define dependencies
extract >> transform >> loadProduction DAG Patterns
Dynamic Task Generation
Task Groups
Sensors for External Dependencies
XCom for Task Communication
Branching Logic
Error Handling and Retries
Database Operators
Monitoring and Alerting
Best Practices
1. Idempotent Tasks
2. Resource Management
3. Testing DAGs
Conclusion
Last updated