Understanding DataOps
When I started my personal project to analyze my smart home data, fitness metrics, and financial information, I quickly ran into a wall. My notebooks were disorganized, my data was inconsistent, and I spent more time fighting with my pipelines than actually gaining insights. I had automated dozens of data collection points, but the analysis was a mess. After six frustrating months of broken dashboards and unreliable numbers, I decided to apply professional DataOps principles to my personal project.
The results have been transformative. My morning dashboard now refreshes automatically with 99.9% reliability, my financial forecasts use consistent, clean data, and I've cut my maintenance time from hours each week to just minutes. What started as a frustrating side project has become a powerful personal analytics system that I actually trust to make decisions.
How did we get here? By embracing DataOpsโa philosophy and practice that revolutionized how we work with data. In this post, I'll share my personal journey implementing DataOps practices using AWS S3, Lambda, Python, Databricks, and PySpark, with real examples from the trenches.
What DataOps Means to Me: Beyond the Buzzword
When I first encountered the term "DataOps," I was skeptical. Another tech buzzword? But as I dug deeper, I realized it addressed the exact pain points I was experiencing daily. For me, DataOps isn't just about tools or processesโit's a fundamental shift in how we think about data workflows.
At its core, DataOps combines:
DevOps practices (automation, CI/CD, monitoring)
Agile methodologies (iterative development, feedback loops)
Statistical process control (data quality, observability)
But rather than give you abstract definitions, let me show you how I've implemented these principles in real life.
My DataOps Architecture: Medallion Pattern on AWS with Databricks
After experimenting with different approaches, I settled on a medallion architecture implemented across AWS and Databricks. Here's how it works:
1. Bronze Layer: Raw Data Ingestion with AWS S3 and Lambda
I treat raw data as immutableโwe never modify it once ingested. This gives us a "source of truth" we can always return to. Here's an automated ingestion system I built using Python, AWS Lambda, and S3:
# This Lambda function is triggered whenever a file lands in our intake S3 bucket
import boto3
import json
import os
import time
from datetime import datetime
def lambda_handler(event, context):
s3_client = boto3.client('s3')
# Extract bucket and key information
source_bucket = event['Records'][0]['s3']['bucket']['name']
source_key = event['Records'][0]['s3']['object']['key']
file_name = os.path.basename(source_key)
# Generate metadata
now = datetime.now()
timestamp = int(time.time())
date_partition = now.strftime('%Y/%m/%d')
# Copy to bronze layer with partitioning and metadata
destination_key = f"bronze/source={source_bucket}/ingestion_date={date_partition}/{timestamp}_{file_name}"
# Add metadata as a companion JSON file
metadata = {
'source_bucket': source_bucket,
'source_key': source_key,
'ingestion_timestamp': timestamp,
'ingestion_date': now.isoformat(),
'file_size_bytes': event['Records'][0]['s3']['object']['size'],
'aws_region': os.environ['AWS_REGION']
}
# Copy the file to the data lake
s3_client.copy_object(
Bucket=os.environ['DATA_LAKE_BUCKET'],
CopySource={'Bucket': source_bucket, 'Key': source_key},
Key=destination_key
)
# Write metadata alongside the file
s3_client.put_object(
Body=json.dumps(metadata),
Bucket=os.environ['DATA_LAKE_BUCKET'],
Key=f"{destination_key}.metadata.json"
)
print(f"Successfully ingested {source_key} to {destination_key}")
# Trigger Databricks job for processing if needed
if os.environ.get('TRIGGER_DATABRICKS_JOB', 'false').lower() == 'true':
databricks_job_id = os.environ['DATABRICKS_JOB_ID']
trigger_databricks_job(databricks_job_id, destination_key)
return {
'statusCode': 200,
'body': json.dumps(f'File {file_name} ingested successfully')
}
def trigger_databricks_job(job_id, file_path):
# Implementation to trigger Databricks job via API
pass
This Lambda function does more than just copy filesโit preserves metadata, implements partitioning for performance, and can trigger downstream processing. I've found that solid metadata at ingestion saves countless hours of troubleshooting later.
2. Silver Layer: Data Validation and Cleansing with Databricks and PySpark
For the silver layer, I use Databricks notebooks with automated quality checks. Here's a simplified example of how I implement data validation using PySpark:
# Databricks Notebook: Silver Layer Processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, sum as spark_sum
from delta.tables import DeltaTable
import great_expectations as ge
# Assume spark session is already available in Databricks
# Read parameters (when running as job)
dbutils.widgets.text("source_path", "")
source_path = dbutils.widgets.get("source_path")
table_name = source_path.split('/')[-1].split('.')[0]
# Read the bronze data
bronze_df = spark.read.format("parquet").load(f"s3a://data-lake-bucket/{source_path}")
# Apply data quality validation
def validate_data_quality(df, table_name):
"""Validate data quality and log metrics"""
# Get row count
row_count = df.count()
# Check for nulls in critical columns
critical_columns = ["customer_id", "transaction_date", "amount"]
null_counts = {}
for column in critical_columns:
if column in df.columns:
null_count = df.filter(col(column).isNull()).count()
null_counts[column] = null_count
null_pct = (null_count / row_count) * 100 if row_count > 0 else 0
# Log metrics to our monitoring system
log_metric(table_name, f"{column}_null_pct", null_pct)
# Fail the job if null percentage is too high
if null_pct > 10 and row_count > 100:
raise Exception(f"Data quality check failed: {column} has {null_pct}% nulls")
# Check for duplicate records
if "customer_id" in df.columns and "transaction_id" in df.columns:
duplicate_count = df.groupBy("customer_id", "transaction_id").count().filter(col("count") > 1).count()
duplicate_pct = (duplicate_count / row_count) * 100 if row_count > 0 else 0
log_metric(table_name, "duplicate_pct", duplicate_pct)
if duplicate_pct > 5:
raise Exception(f"Data quality check failed: {duplicate_pct}% duplicate records found")
return True
# Perform data cleansing
def cleanse_data(df):
"""Apply standardized cleansing rules"""
# Handle missing values in different columns appropriately
df = df.withColumn("email",
when(col("email").isNull(), "unknown@example.com")
.otherwise(lower(col("email"))))
# Convert date strings to proper date format
if "transaction_date" in df.columns:
df = df.withColumn("transaction_date",
to_date(col("transaction_date"), "yyyy-MM-dd"))
# Remove unnecessary whitespace
string_columns = [f.name for f in df.schema.fields if f.dataType == StringType()]
for column in string_columns:
df = df.withColumn(column, trim(col(column)))
return df
# Validate, cleanse, and save to silver layer
if validate_data_quality(bronze_df, table_name):
silver_df = cleanse_data(bronze_df)
# Write to Delta Lake format in silver layer
silver_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.option("overwriteSchema", "true") \
.save(f"s3a://data-lake-bucket/silver/{table_name}/")
# Create Delta table if it doesn't exist
spark.sql(f"""
CREATE TABLE IF NOT EXISTS silver.{table_name}
USING DELTA
LOCATION 's3a://data-lake-bucket/silver/{table_name}/'
""")
print(f"Successfully processed {row_count} rows to silver.{table_name}")
The silver layer is where I enforce standardized schemas, data types, and quality thresholds. By implementing consistent validation here, downstream consumers can trust the data without duplicating validation logic.
3. Gold Layer: Analytics-Ready Data with PySpark
In the gold layer, I create purpose-built datasets for specific business domains or use cases:
# Databricks Notebook: Gold Layer - Customer 360 View
# Read relevant tables from silver layer
customers_df = spark.table("silver.customers")
transactions_df = spark.table("silver.transactions")
web_events_df = spark.table("silver.web_events")
support_tickets_df = spark.table("silver.support_tickets")
# Create a customer 360 view
customer_360_df = customers_df \
.join(
transactions_df.groupBy("customer_id").agg(
count("transaction_id").alias("transaction_count"),
spark_sum("amount").alias("total_spend"),
max("transaction_date").alias("last_transaction_date")
),
on="customer_id",
how="left"
) \
.join(
web_events_df.groupBy("customer_id").agg(
count("event_id").alias("web_event_count"),
max("event_date").alias("last_web_activity")
),
on="customer_id",
how="left"
) \
.join(
support_tickets_df.groupBy("customer_id").agg(
count("ticket_id").alias("support_ticket_count"),
sum(when(col("status") == "OPEN", 1).otherwise(0)).alias("open_tickets")
),
on="customer_id",
how="left"
)
# Calculate customer health score
customer_360_df = customer_360_df.withColumn(
"customer_health_score",
when(col("total_spend") > 1000, 100)
.when(col("total_spend") > 500, 80)
.when(col("transaction_count") > 5, 60)
.when(col("last_transaction_date") > current_date() - 30, 40)
.when(col("open_tickets") > 3, 20)
.otherwise(30)
)
# Write to gold layer
customer_360_df.write \
.format("delta") \
.mode("overwrite") \
.partitionBy("signup_region") \
.save("s3a://data-lake-bucket/gold/customer_360/")
# Create Delta table
spark.sql("""
CREATE TABLE IF NOT EXISTS gold.customer_360
USING DELTA
LOCATION 's3a://data-lake-bucket/gold/customer_360/'
""")
Automating It All: My CI/CD Pipeline for Data
Where DataOps really shines is in automation. I've created a CI/CD pipeline that takes code from development to production and handles testing at each stage. Here's a simplified version of my GitLab CI configuration:
# .gitlab-ci.yml for DataOps
stages:
- validate
- test
- build
- deploy
- monitor
variables:
DATABRICKS_HOST: $DATABRICKS_HOST
DATABRICKS_TOKEN: $DATABRICKS_TOKEN
# Validate Databricks notebooks and Python code
validate:
stage: validate
image: python:3.9
script:
- pip install dbx pytest black flake8
- flake8 src/
- black --check src/
- python -m pytest tests/unit/ -v
# Test with sample data
integration_test:
stage: test
image: databrickstools/dbx:0.8.0
script:
- dbx deploy --jobs=integration-test --environment=test
- dbx launch --job=integration-test --environment=test --as-run-submit --trace
# Build and publish job artifacts
build:
stage: build
image: databrickstools/dbx:0.8.0
script:
- dbx deploy --jobs=data-pipeline --environment=staging
artifacts:
paths:
- deployment/
expire_in: 1 week
# Deploy to production
deploy_prod:
stage: deploy
image: databrickstools/dbx:0.8.0
script:
- dbx deploy --jobs=data-pipeline --environment=production
rules:
- if: $CI_COMMIT_BRANCH == "main"
when: manual
# Monitor data quality metrics
monitor:
stage: monitor
image: python:3.9
script:
- pip install boto3 pandas matplotlib
- python scripts/data_quality_report.py --environment=production
artifacts:
paths:
- data_quality_report.html
expire_in: 1 week
Setting Up Observability: The Game-Changer
The biggest transformation in my DataOps journey was implementing comprehensive observability. Here's a Lambda function I use to track processing metrics and alert on anomalies:
import boto3
import json
import numpy as np
from datetime import datetime, timedelta
def lambda_handler(event, context):
"""Monitor data quality metrics and alert on anomalies"""
cloudwatch = boto3.client('cloudwatch')
sns = boto3.client('sns')
# Define metrics to monitor
tables_to_monitor = ['customers', 'transactions', 'products']
metrics_to_check = ['row_count', 'processing_time', 'error_count']
alerts = []
for table in tables_to_monitor:
for metric in metrics_to_check:
# Get recent metric values
response = cloudwatch.get_metric_data(
MetricDataQueries=[
{
'Id': 'data',
'MetricStat': {
'Metric': {
'Namespace': 'DataOps',
'MetricName': metric,
'Dimensions': [
{'Name': 'Table', 'Value': table},
]
},
'Period': 3600,
'Stat': 'Average',
},
'ReturnData': True,
},
],
StartTime=datetime.now() - timedelta(days=7),
EndTime=datetime.now(),
)
# Get current value
current_value = response['MetricDataResults'][0]['Values'][0] if response['MetricDataResults'][0]['Values'] else 0
historical_values = response['MetricDataResults'][0]['Values'][1:] if len(response['MetricDataResults'][0]['Values']) > 1 else [0]
# Calculate mean and standard deviation
mean = np.mean(historical_values)
std_dev = np.std(historical_values) if len(historical_values) > 1 else 0
# Check for anomalies (> 3 standard deviations)
if std_dev > 0 and abs(current_value - mean) > 3 * std_dev:
alert_message = f"ANOMALY DETECTED: {table}.{metric} current value {current_value} deviates significantly from historical mean {mean:.2f} (ยฑ{std_dev:.2f})"
alerts.append(alert_message)
# Send alert
sns.publish(
TopicArn=os.environ['ALERT_TOPIC_ARN'],
Subject=f"DataOps Anomaly: {table}.{metric}",
Message=alert_message
)
return {
'statusCode': 200,
'body': json.dumps({
'alerts_triggered': len(alerts),
'alerts': alerts
})
}
Lessons I've Learned the Hard Way
After implementing DataOps across multiple organizations, here are my most valuable lessons:
1. Start with Version Control for Everything
The foundation of DataOps is treating your data pipelines like software. That means version control for:
SQL queries and transformations
Pipeline definitions and configs
Schema definitions
Data quality rules
I've found that simply applying this practice eliminates about 70% of the "what changed?" troubleshooting sessions.
2. Make Data Quality Everyone's Responsibility
I used to think data quality was just for data engineers. Now I embed quality rules at every layer:
Ingestion validation in Lambda functions
Data profiling in the bronze-to-silver process
Business rule validation in silver-to-gold
Automated testing in CI/CD
3. Implement Self-Documenting Systems
One breakthrough was creating self-documenting pipelines. I built a metadata crawler that runs nightly:
# Databricks notebook that crawls our data lake and generates documentation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Get all tables in silver and gold layers
tables_df = spark.sql("""
SELECT
table_catalog,
table_schema,
table_name,
created,
last_altered
FROM system.information_schema.tables
WHERE table_schema IN ('silver', 'gold')
""")
# For each table, get schema and statistics
metadata = []
for row in tables_df.collect():
schema = row['table_schema']
table = row['table_name']
# Get column information
columns_df = spark.sql(f"""
SELECT
column_name,
data_type,
comment
FROM system.information_schema.columns
WHERE table_schema = '{schema}'
AND table_name = '{table}'
""")
# Get basic stats
stats_df = spark.sql(f"""
DESCRIBE EXTENDED {schema}.{table}
""")
# Get row count and size
count_df = spark.sql(f"""
SELECT COUNT(*) as row_count FROM {schema}.{table}
""")
row_count = count_df.collect()[0]['row_count']
# Collect sample values for each column
sample_df = spark.table(f"{schema}.{table}").limit(5)
# Add to metadata collection
metadata.append({
'schema': schema,
'table': table,
'created': row['created'],
'last_altered': row['last_altered'],
'row_count': row_count,
'columns': columns_df.collect(),
'sample_data': sample_df.toJSON().collect()
})
# Write metadata to JSON for documentation site
import json
with open('/dbfs/documentation/metadata.json', 'w') as f:
json.dump(metadata, f)
# Also write as Delta table
metadata_df = spark.createDataFrame(metadata)
metadata_df.write.format('delta').mode('overwrite').save('/documentation/metadata')
This feeds into an internal documentation site that shows data lineage, schema information, and quality metrics in real-time.
4. Test Everything in Isolation First
Early on, I made the mistake of testing entire pipelines end-to-end. Now I:
Write unit tests for individual transformations
Create integration tests for component interfaces
Have end-to-end tests that validate key business metrics
This approach dramatically reduces debugging time when things go wrong.
My DataOps Toolbox
To implement a complete DataOps solution, here are the specific tools and services I use:
Development Environment:
Databricks notebooks for exploratory work
VS Code with PySpark extensions for local development
Git for version control
Storage and Processing:
AWS S3 for data lake storage
Databricks Delta Lake for ACID transactions
AWS Glue for metadata catalog
ETL/ELT:
PySpark on Databricks for large-scale processing
AWS Lambda for event-driven processing
Amazon EMR for cost-optimized batch jobs
Orchestration:
Databricks Jobs for workflow orchestration
AWS Step Functions for complex flow control
Apache Airflow for multi-system orchestration
Testing and Monitoring:
Great Expectations for data validation
Databricks dbx for CI/CD integration
CloudWatch for metric collection and alerting
Getting Started with DataOps
If you're looking to begin your own DataOps journey, here's how I recommend starting:
Map your current data workflows and identify pain points
Start small with a single pipeline or dataset
Implement version control for all code and configurations
Add basic monitoring to understand your current state
Gradually introduce automation to replace manual steps
The beauty of DataOps is that you can implement it incrementally. Each improvement builds on the last, creating a virtuous cycle of better data quality, faster delivery, and increased trust.
Next up in my data engineering series, I'll dive deeper into building real-time data pipelines using AWS Kinesis and Databricks Structured Streaming. Stay tuned!
Last updated