Tech With Htunn
  • Blog Content
  • ๐Ÿค–Artificial Intelligence
    • ๐Ÿง Building an Intelligent Agent with Local LLMs and Azure OpenAI
    • ๐Ÿ“ŠRevolutionizing IoT Monitoring: My Personal Journey with LLM-Powered Observability
  • ๐Ÿ“˜Core Concepts
    • ๐Ÿ”„Understanding DevSecOps
    • โฌ…๏ธShifting Left in DevSecOps
    • ๐Ÿ“ฆUnderstanding Containerization
    • โš™๏ธWhat is Site Reliability Engineering?
    • โฑ๏ธUnderstanding Toil in SRE
    • ๐Ÿ”What is Identity and Access Management?
    • ๐Ÿ“ŠMicrosoft Graph API: An Overview
    • ๐Ÿ”„Understanding Identity Brokers
  • ๐Ÿ”ŽSecurity Testing
    • ๐Ÿ”SAST vs DAST: Understanding the Differences
    • ๐ŸงฉSoftware Composition Analysis (SCA)
    • ๐Ÿ“‹Software Bill of Materials (SBOM)
    • ๐ŸงชDependency Scanning in DevSecOps
    • ๐ŸณContainer Scanning in DevSecOps
  • ๐Ÿ”„CI/CD Pipeline
    • ๐Ÿ”My Journey with Continuous Integration in DevOps
    • ๐Ÿš€My Journey with Continuous Delivery and Deployment in DevOps
  • ๐ŸงฎFundamentals
    • ๐Ÿ’พWhat is Data Engineering?
    • ๐Ÿ”„Understanding DataOps
    • ๐Ÿ‘ทThe Role of a Cloud Architect
    • ๐Ÿ›๏ธCloud Native Architecture
    • ๐Ÿ’ปCloud Native Applications
  • ๐Ÿ›๏ธArchitecture & Patterns
    • ๐Ÿ…Medallion Architecture in Data Engineering
    • ๐Ÿ”„ETL vs ELT Pipeline: Understanding the Differences
  • ๐Ÿ”’Authentication & Authorization
    • ๐Ÿ”‘OAuth 2.0 vs OIDC: Key Differences
    • ๐Ÿ”Understanding PKCE in OAuth 2.0
    • ๐Ÿ”„Service Provider vs Identity Provider Initiated SAML Flows
  • ๐Ÿ“‹Provisioning Standards
    • ๐Ÿ“ŠSCIM in Identity and Access Management
    • ๐Ÿ“กUnderstanding SCIM Streaming
  • ๐Ÿ—๏ธDesign Patterns
    • โšกEvent-Driven Architecture
    • ๐Ÿ”’Web Application Firewalls
  • ๐Ÿ“ŠReliability Metrics
    • ๐Ÿ’ฐError Budgets in SRE
    • ๐Ÿ“SLA vs SLO vs SLI: Understanding the Differences
    • โฑ๏ธMean Time to Recovery (MTTR)
Powered by GitBook
On this page
  • What DataOps Means to Me: Beyond the Buzzword
  • My DataOps Architecture: Medallion Pattern on AWS with Databricks
  • 1. Bronze Layer: Raw Data Ingestion with AWS S3 and Lambda
  • 2. Silver Layer: Data Validation and Cleansing with Databricks and PySpark
  • 3. Gold Layer: Analytics-Ready Data with PySpark
  • Automating It All: My CI/CD Pipeline for Data
  • Setting Up Observability: The Game-Changer
  • Lessons I've Learned the Hard Way
  • 1. Start with Version Control for Everything
  • 2. Make Data Quality Everyone's Responsibility
  • 3. Implement Self-Documenting Systems
  • 4. Test Everything in Isolation First
  • My DataOps Toolbox
  • Getting Started with DataOps
  1. Fundamentals

Understanding DataOps

When I started my personal project to analyze my smart home data, fitness metrics, and financial information, I quickly ran into a wall. My notebooks were disorganized, my data was inconsistent, and I spent more time fighting with my pipelines than actually gaining insights. I had automated dozens of data collection points, but the analysis was a mess. After six frustrating months of broken dashboards and unreliable numbers, I decided to apply professional DataOps principles to my personal project.

The results have been transformative. My morning dashboard now refreshes automatically with 99.9% reliability, my financial forecasts use consistent, clean data, and I've cut my maintenance time from hours each week to just minutes. What started as a frustrating side project has become a powerful personal analytics system that I actually trust to make decisions.

How did we get here? By embracing DataOpsโ€”a philosophy and practice that revolutionized how we work with data. In this post, I'll share my personal journey implementing DataOps practices using AWS S3, Lambda, Python, Databricks, and PySpark, with real examples from the trenches.

What DataOps Means to Me: Beyond the Buzzword

When I first encountered the term "DataOps," I was skeptical. Another tech buzzword? But as I dug deeper, I realized it addressed the exact pain points I was experiencing daily. For me, DataOps isn't just about tools or processesโ€”it's a fundamental shift in how we think about data workflows.

At its core, DataOps combines:

  • DevOps practices (automation, CI/CD, monitoring)

  • Agile methodologies (iterative development, feedback loops)

  • Statistical process control (data quality, observability)

But rather than give you abstract definitions, let me show you how I've implemented these principles in real life.

My DataOps Architecture: Medallion Pattern on AWS with Databricks

After experimenting with different approaches, I settled on a medallion architecture implemented across AWS and Databricks. Here's how it works:

1. Bronze Layer: Raw Data Ingestion with AWS S3 and Lambda

I treat raw data as immutableโ€”we never modify it once ingested. This gives us a "source of truth" we can always return to. Here's an automated ingestion system I built using Python, AWS Lambda, and S3:

# This Lambda function is triggered whenever a file lands in our intake S3 bucket
import boto3
import json
import os
import time
from datetime import datetime

def lambda_handler(event, context):
    s3_client = boto3.client('s3')
    
    # Extract bucket and key information
    source_bucket = event['Records'][0]['s3']['bucket']['name']
    source_key = event['Records'][0]['s3']['object']['key']
    file_name = os.path.basename(source_key)
    
    # Generate metadata
    now = datetime.now()
    timestamp = int(time.time())
    date_partition = now.strftime('%Y/%m/%d')
    
    # Copy to bronze layer with partitioning and metadata
    destination_key = f"bronze/source={source_bucket}/ingestion_date={date_partition}/{timestamp}_{file_name}"
    
    # Add metadata as a companion JSON file
    metadata = {
        'source_bucket': source_bucket,
        'source_key': source_key,
        'ingestion_timestamp': timestamp,
        'ingestion_date': now.isoformat(),
        'file_size_bytes': event['Records'][0]['s3']['object']['size'],
        'aws_region': os.environ['AWS_REGION']
    }
    
    # Copy the file to the data lake
    s3_client.copy_object(
        Bucket=os.environ['DATA_LAKE_BUCKET'],
        CopySource={'Bucket': source_bucket, 'Key': source_key},
        Key=destination_key
    )
    
    # Write metadata alongside the file
    s3_client.put_object(
        Body=json.dumps(metadata),
        Bucket=os.environ['DATA_LAKE_BUCKET'],
        Key=f"{destination_key}.metadata.json"
    )
    
    print(f"Successfully ingested {source_key} to {destination_key}")
    
    # Trigger Databricks job for processing if needed
    if os.environ.get('TRIGGER_DATABRICKS_JOB', 'false').lower() == 'true':
        databricks_job_id = os.environ['DATABRICKS_JOB_ID']
        trigger_databricks_job(databricks_job_id, destination_key)
    
    return {
        'statusCode': 200,
        'body': json.dumps(f'File {file_name} ingested successfully')
    }

def trigger_databricks_job(job_id, file_path):
    # Implementation to trigger Databricks job via API
    pass

This Lambda function does more than just copy filesโ€”it preserves metadata, implements partitioning for performance, and can trigger downstream processing. I've found that solid metadata at ingestion saves countless hours of troubleshooting later.

2. Silver Layer: Data Validation and Cleansing with Databricks and PySpark

For the silver layer, I use Databricks notebooks with automated quality checks. Here's a simplified example of how I implement data validation using PySpark:

# Databricks Notebook: Silver Layer Processing
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, sum as spark_sum
from delta.tables import DeltaTable
import great_expectations as ge

# Assume spark session is already available in Databricks

# Read parameters (when running as job)
dbutils.widgets.text("source_path", "")
source_path = dbutils.widgets.get("source_path")
table_name = source_path.split('/')[-1].split('.')[0]

# Read the bronze data
bronze_df = spark.read.format("parquet").load(f"s3a://data-lake-bucket/{source_path}")

# Apply data quality validation
def validate_data_quality(df, table_name):
    """Validate data quality and log metrics"""
    
    # Get row count
    row_count = df.count()
    
    # Check for nulls in critical columns
    critical_columns = ["customer_id", "transaction_date", "amount"]
    null_counts = {}
    
    for column in critical_columns:
        if column in df.columns:
            null_count = df.filter(col(column).isNull()).count()
            null_counts[column] = null_count
            null_pct = (null_count / row_count) * 100 if row_count > 0 else 0
            
            # Log metrics to our monitoring system
            log_metric(table_name, f"{column}_null_pct", null_pct)
            
            # Fail the job if null percentage is too high
            if null_pct > 10 and row_count > 100:
                raise Exception(f"Data quality check failed: {column} has {null_pct}% nulls")
    
    # Check for duplicate records
    if "customer_id" in df.columns and "transaction_id" in df.columns:
        duplicate_count = df.groupBy("customer_id", "transaction_id").count().filter(col("count") > 1).count()
        duplicate_pct = (duplicate_count / row_count) * 100 if row_count > 0 else 0
        
        log_metric(table_name, "duplicate_pct", duplicate_pct)
        
        if duplicate_pct > 5:
            raise Exception(f"Data quality check failed: {duplicate_pct}% duplicate records found")
    
    return True

# Perform data cleansing
def cleanse_data(df):
    """Apply standardized cleansing rules"""
    
    # Handle missing values in different columns appropriately
    df = df.withColumn("email", 
                      when(col("email").isNull(), "unknown@example.com")
                      .otherwise(lower(col("email"))))
    
    # Convert date strings to proper date format
    if "transaction_date" in df.columns:
        df = df.withColumn("transaction_date", 
                          to_date(col("transaction_date"), "yyyy-MM-dd"))
    
    # Remove unnecessary whitespace
    string_columns = [f.name for f in df.schema.fields if f.dataType == StringType()]
    for column in string_columns:
        df = df.withColumn(column, trim(col(column)))
    
    return df

# Validate, cleanse, and save to silver layer
if validate_data_quality(bronze_df, table_name):
    silver_df = cleanse_data(bronze_df)
    
    # Write to Delta Lake format in silver layer
    silver_df.write \
        .format("delta") \
        .mode("overwrite") \
        .partitionBy("year", "month") \
        .option("overwriteSchema", "true") \
        .save(f"s3a://data-lake-bucket/silver/{table_name}/")
    
    # Create Delta table if it doesn't exist
    spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver.{table_name}
    USING DELTA
    LOCATION 's3a://data-lake-bucket/silver/{table_name}/'
    """)
    
    print(f"Successfully processed {row_count} rows to silver.{table_name}")

The silver layer is where I enforce standardized schemas, data types, and quality thresholds. By implementing consistent validation here, downstream consumers can trust the data without duplicating validation logic.

3. Gold Layer: Analytics-Ready Data with PySpark

In the gold layer, I create purpose-built datasets for specific business domains or use cases:

# Databricks Notebook: Gold Layer - Customer 360 View

# Read relevant tables from silver layer
customers_df = spark.table("silver.customers")
transactions_df = spark.table("silver.transactions")
web_events_df = spark.table("silver.web_events")
support_tickets_df = spark.table("silver.support_tickets")

# Create a customer 360 view
customer_360_df = customers_df \
    .join(
        transactions_df.groupBy("customer_id").agg(
            count("transaction_id").alias("transaction_count"),
            spark_sum("amount").alias("total_spend"),
            max("transaction_date").alias("last_transaction_date")
        ),
        on="customer_id",
        how="left"
    ) \
    .join(
        web_events_df.groupBy("customer_id").agg(
            count("event_id").alias("web_event_count"),
            max("event_date").alias("last_web_activity")
        ),
        on="customer_id",
        how="left"
    ) \
    .join(
        support_tickets_df.groupBy("customer_id").agg(
            count("ticket_id").alias("support_ticket_count"),
            sum(when(col("status") == "OPEN", 1).otherwise(0)).alias("open_tickets")
        ),
        on="customer_id",
        how="left"
    )

# Calculate customer health score
customer_360_df = customer_360_df.withColumn(
    "customer_health_score",
    when(col("total_spend") > 1000, 100)
    .when(col("total_spend") > 500, 80)
    .when(col("transaction_count") > 5, 60)
    .when(col("last_transaction_date") > current_date() - 30, 40)
    .when(col("open_tickets") > 3, 20)
    .otherwise(30)
)

# Write to gold layer
customer_360_df.write \
    .format("delta") \
    .mode("overwrite") \
    .partitionBy("signup_region") \
    .save("s3a://data-lake-bucket/gold/customer_360/")

# Create Delta table
spark.sql("""
CREATE TABLE IF NOT EXISTS gold.customer_360
USING DELTA
LOCATION 's3a://data-lake-bucket/gold/customer_360/'
""")

Automating It All: My CI/CD Pipeline for Data

Where DataOps really shines is in automation. I've created a CI/CD pipeline that takes code from development to production and handles testing at each stage. Here's a simplified version of my GitLab CI configuration:

# .gitlab-ci.yml for DataOps
stages:
  - validate
  - test
  - build
  - deploy
  - monitor

variables:
  DATABRICKS_HOST: $DATABRICKS_HOST
  DATABRICKS_TOKEN: $DATABRICKS_TOKEN

# Validate Databricks notebooks and Python code
validate:
  stage: validate
  image: python:3.9
  script:
    - pip install dbx pytest black flake8
    - flake8 src/
    - black --check src/
    - python -m pytest tests/unit/ -v

# Test with sample data
integration_test:
  stage: test
  image: databrickstools/dbx:0.8.0
  script:
    - dbx deploy --jobs=integration-test --environment=test
    - dbx launch --job=integration-test --environment=test --as-run-submit --trace

# Build and publish job artifacts
build:
  stage: build
  image: databrickstools/dbx:0.8.0
  script:
    - dbx deploy --jobs=data-pipeline --environment=staging
  artifacts:
    paths:
      - deployment/
    expire_in: 1 week

# Deploy to production
deploy_prod:
  stage: deploy
  image: databrickstools/dbx:0.8.0
  script:
    - dbx deploy --jobs=data-pipeline --environment=production
  rules:
    - if: $CI_COMMIT_BRANCH == "main"
      when: manual

# Monitor data quality metrics
monitor:
  stage: monitor
  image: python:3.9
  script:
    - pip install boto3 pandas matplotlib
    - python scripts/data_quality_report.py --environment=production
  artifacts:
    paths:
      - data_quality_report.html
    expire_in: 1 week

Setting Up Observability: The Game-Changer

The biggest transformation in my DataOps journey was implementing comprehensive observability. Here's a Lambda function I use to track processing metrics and alert on anomalies:

import boto3
import json
import numpy as np
from datetime import datetime, timedelta

def lambda_handler(event, context):
    """Monitor data quality metrics and alert on anomalies"""
    cloudwatch = boto3.client('cloudwatch')
    sns = boto3.client('sns')
    
    # Define metrics to monitor
    tables_to_monitor = ['customers', 'transactions', 'products']
    metrics_to_check = ['row_count', 'processing_time', 'error_count']
    
    alerts = []
    
    for table in tables_to_monitor:
        for metric in metrics_to_check:
            # Get recent metric values
            response = cloudwatch.get_metric_data(
                MetricDataQueries=[
                    {
                        'Id': 'data',
                        'MetricStat': {
                            'Metric': {
                                'Namespace': 'DataOps',
                                'MetricName': metric,
                                'Dimensions': [
                                    {'Name': 'Table', 'Value': table},
                                ]
                            },
                            'Period': 3600,
                            'Stat': 'Average',
                        },
                        'ReturnData': True,
                    },
                ],
                StartTime=datetime.now() - timedelta(days=7),
                EndTime=datetime.now(),
            )
            
            # Get current value
            current_value = response['MetricDataResults'][0]['Values'][0] if response['MetricDataResults'][0]['Values'] else 0
            historical_values = response['MetricDataResults'][0]['Values'][1:] if len(response['MetricDataResults'][0]['Values']) > 1 else [0]
            
            # Calculate mean and standard deviation
            mean = np.mean(historical_values)
            std_dev = np.std(historical_values) if len(historical_values) > 1 else 0
            
            # Check for anomalies (> 3 standard deviations)
            if std_dev > 0 and abs(current_value - mean) > 3 * std_dev:
                alert_message = f"ANOMALY DETECTED: {table}.{metric} current value {current_value} deviates significantly from historical mean {mean:.2f} (ยฑ{std_dev:.2f})"
                alerts.append(alert_message)
                
                # Send alert
                sns.publish(
                    TopicArn=os.environ['ALERT_TOPIC_ARN'],
                    Subject=f"DataOps Anomaly: {table}.{metric}",
                    Message=alert_message
                )
    
    return {
        'statusCode': 200,
        'body': json.dumps({
            'alerts_triggered': len(alerts),
            'alerts': alerts
        })
    }

Lessons I've Learned the Hard Way

After implementing DataOps across multiple organizations, here are my most valuable lessons:

1. Start with Version Control for Everything

The foundation of DataOps is treating your data pipelines like software. That means version control for:

  • SQL queries and transformations

  • Pipeline definitions and configs

  • Schema definitions

  • Data quality rules

I've found that simply applying this practice eliminates about 70% of the "what changed?" troubleshooting sessions.

2. Make Data Quality Everyone's Responsibility

I used to think data quality was just for data engineers. Now I embed quality rules at every layer:

  • Ingestion validation in Lambda functions

  • Data profiling in the bronze-to-silver process

  • Business rule validation in silver-to-gold

  • Automated testing in CI/CD

3. Implement Self-Documenting Systems

One breakthrough was creating self-documenting pipelines. I built a metadata crawler that runs nightly:

# Databricks notebook that crawls our data lake and generates documentation
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Get all tables in silver and gold layers
tables_df = spark.sql("""
    SELECT 
        table_catalog,
        table_schema,
        table_name,
        created,
        last_altered
    FROM system.information_schema.tables
    WHERE table_schema IN ('silver', 'gold')
""")

# For each table, get schema and statistics
metadata = []

for row in tables_df.collect():
    schema = row['table_schema']
    table = row['table_name']
    
    # Get column information
    columns_df = spark.sql(f"""
        SELECT 
            column_name,
            data_type,
            comment
        FROM system.information_schema.columns
        WHERE table_schema = '{schema}'
        AND table_name = '{table}'
    """)
    
    # Get basic stats
    stats_df = spark.sql(f"""
        DESCRIBE EXTENDED {schema}.{table}
    """)
    
    # Get row count and size
    count_df = spark.sql(f"""
        SELECT COUNT(*) as row_count FROM {schema}.{table}
    """)
    
    row_count = count_df.collect()[0]['row_count']
    
    # Collect sample values for each column
    sample_df = spark.table(f"{schema}.{table}").limit(5)
    
    # Add to metadata collection
    metadata.append({
        'schema': schema,
        'table': table,
        'created': row['created'],
        'last_altered': row['last_altered'],
        'row_count': row_count,
        'columns': columns_df.collect(),
        'sample_data': sample_df.toJSON().collect()
    })

# Write metadata to JSON for documentation site
import json
with open('/dbfs/documentation/metadata.json', 'w') as f:
    json.dump(metadata, f)

# Also write as Delta table
metadata_df = spark.createDataFrame(metadata)
metadata_df.write.format('delta').mode('overwrite').save('/documentation/metadata')

This feeds into an internal documentation site that shows data lineage, schema information, and quality metrics in real-time.

4. Test Everything in Isolation First

Early on, I made the mistake of testing entire pipelines end-to-end. Now I:

  • Write unit tests for individual transformations

  • Create integration tests for component interfaces

  • Have end-to-end tests that validate key business metrics

This approach dramatically reduces debugging time when things go wrong.

My DataOps Toolbox

To implement a complete DataOps solution, here are the specific tools and services I use:

  1. Development Environment:

    • Databricks notebooks for exploratory work

    • VS Code with PySpark extensions for local development

    • Git for version control

  2. Storage and Processing:

    • AWS S3 for data lake storage

    • Databricks Delta Lake for ACID transactions

    • AWS Glue for metadata catalog

  3. ETL/ELT:

    • PySpark on Databricks for large-scale processing

    • AWS Lambda for event-driven processing

    • Amazon EMR for cost-optimized batch jobs

  4. Orchestration:

    • Databricks Jobs for workflow orchestration

    • AWS Step Functions for complex flow control

    • Apache Airflow for multi-system orchestration

  5. Testing and Monitoring:

    • Great Expectations for data validation

    • Databricks dbx for CI/CD integration

    • CloudWatch for metric collection and alerting

Getting Started with DataOps

If you're looking to begin your own DataOps journey, here's how I recommend starting:

  1. Map your current data workflows and identify pain points

  2. Start small with a single pipeline or dataset

  3. Implement version control for all code and configurations

  4. Add basic monitoring to understand your current state

  5. Gradually introduce automation to replace manual steps

The beauty of DataOps is that you can implement it incrementally. Each improvement builds on the last, creating a virtuous cycle of better data quality, faster delivery, and increased trust.

Next up in my data engineering series, I'll dive deeper into building real-time data pipelines using AWS Kinesis and Databricks Structured Streaming. Stay tuned!

PreviousWhat is Data Engineering?NextThe Role of a Cloud Architect

Last updated 1 day ago

๐Ÿงฎ
๐Ÿ”„