In my data engineering journey, I learned the hard way that garbage in equals garbage out. No matter how sophisticated your pipelines are, if your data quality is poor, downstream analytics and ML models will produce unreliable results. I've seen production issues caused by unexpected null values, schema changes, and data driftβall of which could have been caught with proper data quality testing.
This article covers data quality validation, testing frameworks, and strategies I've implemented in production environments to ensure data reliability.
Understanding Data Quality
The Six Dimensions of Data Quality
From my experience building data pipelines, I focus on these six dimensions:
Accuracy: Does the data correctly represent reality?
Completeness: Are all expected records present?
Consistency: Does the data agree across systems?
Timeliness: Is the data available when needed?
Validity: Does the data conform to business rules?
Uniqueness: Are there unwanted duplicates?
Common Data Quality Issues
Here are real issues I've encountered in production:
Great Expectations Framework
Great Expectations is the industry-standard framework I use for data validation. It allows you to define expectations (assertions) about your data and validate them automatically.
Setting Up Great Expectations
Defining Expectations
Here's how I define expectations for a users table:
Running Validations with Checkpoints
Data Profiling
Data profiling helps you understand your data's characteristics. I use pandas-profiling (now called ydata-profiling) for quick exploratory analysis:
Schema Validation
Schema validation ensures data conforms to expected structure. I use Pydantic for this:
Data Contracts
Data contracts define agreements between data producers and consumers. I implement them as versioned schemas with validation:
Testing Strategies for Data Pipelines
Unit Tests for Data Transformations
Integration Tests for Data Pipelines
Monitoring Data Quality in Production
Implementing Data Quality Metrics
Best Practices
From my production experience:
Fail Fast: Validate data early in the pipeline. Don't process bad data.
Automated Testing: Run data quality checks automatically in CI/CD.
Data Contracts: Use contracts between teams to define expectations.
Monitoring: Track quality metrics over time to detect degradation.
Alerts: Set up alerts for critical quality issues (schema changes, missing data, etc.).
Documentation: Document all expectations and business rules.
Version Control: Version your expectation suites and contracts.
Quarantine: Move invalid data to quarantine tables for investigation.
Key Takeaways
Data quality is critical: Invest time in validation and testing
Great Expectations: Industry-standard framework for data validation
Schema validation: Use Pydantic or similar for type safety
Data contracts: Define clear agreements between producers and consumers
Test pipelines: Unit test transformations, integration test end-to-end
Monitor in production: Track quality metrics and set up alerts
Fail fast: Catch issues early before they propagate
# common_data_quality_issues.py
import pandas as pd
from typing import Dict, List
from dataclasses import dataclass
from datetime import datetime
@dataclass
class DataQualityIssue:
"""Represents a data quality issue"""
issue_type: str
column: str
description: str
count: int
severity: str # 'critical', 'high', 'medium', 'low'
timestamp: datetime
def detect_quality_issues(df: pd.DataFrame) -> List[DataQualityIssue]:
"""
Detect common data quality issues in a DataFrame.
Args:
df: Input DataFrame to check
Returns:
List of detected data quality issues
"""
issues: List[DataQualityIssue] = []
# Check for missing values
for column in df.columns:
null_count = df[column].isnull().sum()
if null_count > 0:
issues.append(DataQualityIssue(
issue_type='missing_values',
column=column,
description=f'Found {null_count} null values',
count=null_count,
severity='high' if null_count > len(df) * 0.1 else 'medium',
timestamp=datetime.now()
))
# Check for duplicates
duplicate_count = df.duplicated().sum()
if duplicate_count > 0:
issues.append(DataQualityIssue(
issue_type='duplicates',
column='all',
description=f'Found {duplicate_count} duplicate rows',
count=duplicate_count,
severity='high',
timestamp=datetime.now()
))
# Check for data type inconsistencies
for column in df.select_dtypes(include=['object']).columns:
# Check if numeric column stored as string
try:
pd.to_numeric(df[column], errors='raise')
issues.append(DataQualityIssue(
issue_type='type_inconsistency',
column=column,
description=f'Column {column} contains numeric data stored as string',
count=len(df),
severity='medium',
timestamp=datetime.now()
))
except (ValueError, TypeError):
pass
# Check for outliers (for numeric columns)
for column in df.select_dtypes(include=['int64', 'float64']).columns:
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
outliers = df[
(df[column] < Q1 - 1.5 * IQR) |
(df[column] > Q3 + 1.5 * IQR)
]
if len(outliers) > 0:
issues.append(DataQualityIssue(
issue_type='outliers',
column=column,
description=f'Found {len(outliers)} outliers using IQR method',
count=len(outliers),
severity='low',
timestamp=datetime.now()
))
return issues
# Example usage
if __name__ == "__main__":
# Sample data with quality issues
df = pd.DataFrame({
'user_id': [1, 2, 3, 4, 5, 5], # Duplicate
'age': [25, None, 30, 150, 35, 35], # Missing value and outlier
'email': ['[email protected]', '[email protected]', None, '[email protected]', '[email protected]', '[email protected]'],
'revenue': ['100.50', '200.75', '150.00', '300.25', '250.00', '250.00'] # Stored as string
})
issues = detect_quality_issues(df)
print(f"Detected {len(issues)} data quality issues:\n")
for issue in issues:
print(f"[{issue.severity.upper()}] {issue.issue_type} in '{issue.column}': {issue.description}")
# setup_great_expectations.py
from typing import Dict, Any
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
from great_expectations.checkpoint import Checkpoint
def initialize_data_context() -> gx.DataContext:
"""
Initialize Great Expectations data context.
Returns:
Configured DataContext instance
"""
# Initialize data context
context = gx.get_context()
return context
def create_datasource(
context: gx.DataContext,
datasource_name: str,
connection_string: str
) -> None:
"""
Create a PostgreSQL datasource.
Args:
context: Great Expectations data context
datasource_name: Name for the datasource
connection_string: PostgreSQL connection string
"""
datasource_config = {
"name": datasource_name,
"class_name": "Datasource",
"execution_engine": {
"class_name": "SqlAlchemyExecutionEngine",
"connection_string": connection_string,
},
"data_connectors": {
"default_runtime_data_connector": {
"class_name": "RuntimeDataConnector",
"batch_identifiers": ["default_identifier_name"],
},
"default_inferred_data_connector": {
"class_name": "InferredAssetSqlDataConnector",
"include_schema_name": True,
},
},
}
context.add_datasource(**datasource_config)
def create_expectation_suite(
context: gx.DataContext,
suite_name: str
) -> gx.core.ExpectationSuite:
"""
Create an expectation suite.
Args:
context: Great Expectations data context
suite_name: Name for the expectation suite
Returns:
Created ExpectationSuite
"""
suite = context.add_expectation_suite(expectation_suite_name=suite_name)
return suite
# define_expectations.py
import great_expectations as gx
from great_expectations.core.batch import BatchRequest
from great_expectations.validator.validator import Validator
def define_user_table_expectations(context: gx.DataContext) -> None:
"""
Define expectations for a users table.
Args:
context: Great Expectations data context
"""
# Create expectation suite
suite_name = "users_suite"
suite = context.add_or_update_expectation_suite(expectation_suite_name=suite_name)
# Get validator
batch_request = BatchRequest(
datasource_name="postgres_datasource",
data_connector_name="default_inferred_data_connector",
data_asset_name="public.users",
)
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=suite_name,
)
# Define expectations
# 1. Table should exist and have rows
validator.expect_table_row_count_to_be_between(min_value=1)
# 2. Required columns should exist
validator.expect_table_columns_to_match_ordered_list(
column_list=["user_id", "email", "name", "age", "created_at", "is_active"]
)
# 3. user_id should be unique and not null
validator.expect_column_values_to_be_unique(column="user_id")
validator.expect_column_values_to_not_be_null(column="user_id")
# 4. email should be unique, not null, and match email format
validator.expect_column_values_to_not_be_null(column="email")
validator.expect_column_values_to_be_unique(column="email")
validator.expect_column_values_to_match_regex(
column="email",
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
)
# 5. name should not be null
validator.expect_column_values_to_not_be_null(column="name")
# 6. age should be between 0 and 120
validator.expect_column_values_to_be_between(
column="age",
min_value=0,
max_value=120,
mostly=0.99 # Allow 1% exceptions for data quality issues
)
# 7. created_at should not be null and be a valid timestamp
validator.expect_column_values_to_not_be_null(column="created_at")
# 8. is_active should be boolean
validator.expect_column_values_to_be_in_set(
column="is_active",
value_set=[True, False]
)
# Save expectation suite
validator.save_expectation_suite(discard_failed_expectations=False)
print(f"Created expectation suite '{suite_name}' with {len(validator.get_expectation_suite().expectations)} expectations")
# Example: Custom expectation for business logic
def define_custom_business_expectations(validator: Validator) -> None:
"""
Define custom business logic expectations.
Args:
validator: Great Expectations validator
"""
# Ensure revenue is positive
validator.expect_column_values_to_be_between(
column="revenue",
min_value=0,
mostly=0.999 # Allow very few exceptions
)
# Ensure order_date is not in the future
validator.expect_column_values_to_be_between(
column="order_date",
max_value="today",
parse_strings_as_datetimes=True
)
# Ensure status values are valid
validator.expect_column_values_to_be_in_set(
column="status",
value_set=["pending", "processing", "completed", "cancelled"]
)
# Ensure phone numbers match expected format
validator.expect_column_values_to_match_regex(
column="phone",
regex=r"^\+?1?\d{9,15}$" # International phone format
)
# run_validation_checkpoint.py
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint
from typing import Dict, Any
import sys
def create_and_run_checkpoint(
context: gx.DataContext,
checkpoint_name: str,
datasource_name: str,
data_asset_name: str,
expectation_suite_name: str
) -> Dict[str, Any]:
"""
Create and run a validation checkpoint.
Args:
context: Great Expectations data context
checkpoint_name: Name for the checkpoint
datasource_name: Name of the datasource
data_asset_name: Name of the data asset (e.g., table name)
expectation_suite_name: Name of the expectation suite to use
Returns:
Validation results dictionary
"""
checkpoint_config = {
"name": checkpoint_name,
"config_version": 1.0,
"class_name": "Checkpoint",
"validations": [
{
"batch_request": {
"datasource_name": datasource_name,
"data_connector_name": "default_inferred_data_connector",
"data_asset_name": data_asset_name,
},
"expectation_suite_name": expectation_suite_name,
}
],
}
# Add checkpoint
context.add_or_update_checkpoint(**checkpoint_config)
# Run checkpoint
results = context.run_checkpoint(checkpoint_name=checkpoint_name)
return results
def evaluate_validation_results(results: Dict[str, Any]) -> bool:
"""
Evaluate validation results and determine if pipeline should continue.
Args:
results: Validation results from checkpoint
Returns:
True if validation passed, False otherwise
"""
validation_result = results.list_validation_results()[0]
# Check if validation was successful
if validation_result["success"]:
print("β All expectations passed!")
return True
else:
print("β Some expectations failed:")
# Print failed expectations
for result in validation_result["results"]:
if not result["success"]:
expectation = result["expectation_config"]
print(f" - {expectation['expectation_type']}: {result.get('exception_info', {}).get('raised_exception', 'Failed')}")
return False
# Example: Integration with Airflow
def validate_data_quality_task(**context) -> None:
"""
Airflow task to validate data quality using Great Expectations.
Fails the task if validation doesn't pass.
"""
import great_expectations as gx
# Initialize context
ge_context = gx.get_context()
# Run checkpoint
results = create_and_run_checkpoint(
context=ge_context,
checkpoint_name="daily_users_checkpoint",
datasource_name="postgres_datasource",
data_asset_name="public.users",
expectation_suite_name="users_suite"
)
# Evaluate results
if not evaluate_validation_results(results):
raise ValueError("Data quality validation failed!")
print("Data quality validation passed. Proceeding with pipeline.")
# data_profiling.py
from ydata_profiling import ProfileReport
import pandas as pd
from pathlib import Path
def generate_data_profile(
df: pd.DataFrame,
output_file: str = "data_profile.html",
title: str = "Data Profile Report"
) -> ProfileReport:
"""
Generate comprehensive data profile report.
Args:
df: DataFrame to profile
output_file: Path to save HTML report
title: Title for the report
Returns:
ProfileReport object
"""
# Create profile
profile = ProfileReport(
df,
title=title,
minimal=False, # Full report
explorative=True, # Show all statistics
correlations={
"pearson": {"calculate": True},
"spearman": {"calculate": True},
"kendall": {"calculate": False}, # Can be slow
"phi_k": {"calculate": True},
"cramers": {"calculate": True},
},
)
# Save report
profile.to_file(output_file)
print(f"Profile report saved to {output_file}")
return profile
def extract_profile_insights(profile: ProfileReport) -> dict:
"""
Extract key insights from profile report.
Args:
profile: ProfileReport object
Returns:
Dictionary of insights
"""
description = profile.get_description()
insights = {
"total_rows": description["table"]["n"],
"total_columns": description["table"]["n_var"],
"missing_cells": description["table"]["n_cells_missing"],
"duplicate_rows": description["table"]["n_duplicates"],
"high_correlation_pairs": [],
"columns_with_high_missing": [],
}
# Find columns with high missing percentages
for var_name, var_stats in description["variables"].items():
if var_stats.get("p_missing", 0) > 0.5: # More than 50% missing
insights["columns_with_high_missing"].append({
"column": var_name,
"missing_pct": var_stats["p_missing"] * 100
})
# Find highly correlated features
if "correlations" in description:
pearson = description["correlations"].get("pearson", {})
if pearson:
for i, col1 in enumerate(pearson.columns):
for j, col2 in enumerate(pearson.columns):
if i < j: # Avoid duplicates
corr_value = pearson.iloc[i, j]
if abs(corr_value) > 0.8 and abs(corr_value) < 1.0:
insights["high_correlation_pairs"].append({
"column1": col1,
"column2": col2,
"correlation": corr_value
})
return insights
# Example usage
if __name__ == "__main__":
# Load sample data
df = pd.read_csv("sales_data.csv")
# Generate profile
profile = generate_data_profile(
df,
output_file="sales_profile.html",
title="Sales Data Profile"
)
# Extract insights
insights = extract_profile_insights(profile)
print("\n=== Key Insights ===")
print(f"Total rows: {insights['total_rows']:,}")
print(f"Total columns: {insights['total_columns']}")
print(f"Duplicate rows: {insights['duplicate_rows']}")
if insights['columns_with_high_missing']:
print("\nColumns with >50% missing values:")
for col_info in insights['columns_with_high_missing']:
print(f" - {col_info['column']}: {col_info['missing_pct']:.1f}% missing")
if insights['high_correlation_pairs']:
print("\nHighly correlated column pairs:")
for pair in insights['high_correlation_pairs']:
print(f" - {pair['column1']} <-> {pair['column2']}: {pair['correlation']:.3f}")
# schema_validation.py
from pydantic import BaseModel, EmailStr, Field, validator, root_validator
from typing import Optional, List
from datetime import datetime, date
from enum import Enum
import pandas as pd
class OrderStatus(str, Enum):
"""Valid order statuses"""
PENDING = "pending"
PROCESSING = "processing"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class UserSchema(BaseModel):
"""Schema for user data validation"""
user_id: int = Field(..., gt=0, description="Unique user identifier")
email: EmailStr
name: str = Field(..., min_length=1, max_length=100)
age: Optional[int] = Field(None, ge=0, le=120)
created_at: datetime
is_active: bool = True
@validator('name')
def name_must_not_be_empty(cls, v):
"""Ensure name is not just whitespace"""
if not v or not v.strip():
raise ValueError('Name cannot be empty or whitespace')
return v.strip()
class Config:
# Allow field aliases
allow_population_by_field_name = True
# Example data for documentation
schema_extra = {
"example": {
"user_id": 12345,
"email": "[email protected]",
"name": "John Doe",
"age": 30,
"created_at": "2024-01-01T00:00:00",
"is_active": True
}
}
class OrderItemSchema(BaseModel):
"""Schema for order line items"""
product_id: int = Field(..., gt=0)
product_name: str
quantity: int = Field(..., gt=0)
unit_price: float = Field(..., gt=0)
total_price: float = Field(..., gt=0)
@root_validator
def validate_total_price(cls, values):
"""Ensure total_price = quantity * unit_price"""
quantity = values.get('quantity')
unit_price = values.get('unit_price')
total_price = values.get('total_price')
if quantity and unit_price and total_price:
expected_total = round(quantity * unit_price, 2)
if abs(expected_total - total_price) > 0.01: # Allow small floating point difference
raise ValueError(
f'total_price ({total_price}) does not match quantity ({quantity}) * unit_price ({unit_price})'
)
return values
class OrderSchema(BaseModel):
"""Schema for order data validation"""
order_id: int = Field(..., gt=0)
user_id: int = Field(..., gt=0)
order_date: date
status: OrderStatus
items: List[OrderItemSchema] = Field(..., min_items=1)
total_amount: float = Field(..., gt=0)
shipping_address: str = Field(..., min_length=10)
@validator('order_date')
def order_date_not_future(cls, v):
"""Ensure order date is not in the future"""
if v > date.today():
raise ValueError('Order date cannot be in the future')
return v
@root_validator
def validate_total_amount(cls, values):
"""Ensure total_amount matches sum of item totals"""
items = values.get('items')
total_amount = values.get('total_amount')
if items and total_amount:
expected_total = sum(item.total_price for item in items)
if abs(expected_total - total_amount) > 0.01:
raise ValueError(
f'total_amount ({total_amount}) does not match sum of items ({expected_total})'
)
return values
def validate_dataframe_schema(
df: pd.DataFrame,
schema: type[BaseModel]
) -> tuple[List[BaseModel], List[dict]]:
"""
Validate DataFrame against Pydantic schema.
Args:
df: DataFrame to validate
schema: Pydantic model class
Returns:
Tuple of (valid_records, errors)
"""
valid_records: List[BaseModel] = []
errors: List[dict] = []
for idx, row in df.iterrows():
try:
# Convert row to dict
row_dict = row.to_dict()
# Validate using Pydantic
validated = schema(**row_dict)
valid_records.append(validated)
except Exception as e:
errors.append({
"row_index": idx,
"error": str(e),
"data": row_dict
})
return valid_records, errors
# Example usage
if __name__ == "__main__":
# Sample data
users_data = [
{"user_id": 1, "email": "[email protected]", "name": "Alice", "age": 30, "created_at": "2024-01-01T10:00:00", "is_active": True},
{"user_id": 2, "email": "invalid-email", "name": "Bob", "age": 25, "created_at": "2024-01-02T10:00:00", "is_active": True}, # Invalid email
{"user_id": 3, "email": "[email protected]", "name": "", "age": 150, "created_at": "2024-01-03T10:00:00", "is_active": False}, # Empty name, invalid age
]
df = pd.DataFrame(users_data)
# Validate
valid, errors = validate_dataframe_schema(df, UserSchema)
print(f"Valid records: {len(valid)}")
print(f"Errors: {len(errors)}\n")
if errors:
print("Validation errors:")
for error in errors:
print(f" Row {error['row_index']}: {error['error']}")
# data_contracts.py
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
from datetime import datetime
from enum import Enum
import json
import hashlib
class DataContractVersion(str, Enum):
"""Contract version"""
V1 = "v1"
V2 = "v2"
class FieldDefinition(BaseModel):
"""Definition of a data field"""
name: str
type: str # 'string', 'integer', 'float', 'boolean', 'datetime', 'date'
required: bool = True
description: str = ""
constraints: Optional[Dict[str, Any]] = None
# Constraints examples:
# {"min": 0, "max": 100} for numbers
# {"pattern": "^[A-Z]{2}$"} for strings
# {"enum": ["A", "B", "C"]} for categorical
class DataContract(BaseModel):
"""Data contract definition"""
contract_id: str
version: str
name: str
description: str
owner: str
fields: List[FieldDefinition]
created_at: datetime
updated_at: datetime
def get_schema_hash(self) -> str:
"""Generate hash of schema for change detection"""
schema_str = json.dumps(
[f.dict() for f in self.fields],
sort_keys=True
)
return hashlib.sha256(schema_str.encode()).hexdigest()
def validate_data(self, data: Dict[str, Any]) -> List[str]:
"""
Validate data against contract.
Args:
data: Data dictionary to validate
Returns:
List of validation errors (empty if valid)
"""
errors: List[str] = []
for field in self.fields:
# Check required fields
if field.required and field.name not in data:
errors.append(f"Missing required field: {field.name}")
continue
if field.name not in data:
continue
value = data[field.name]
# Type checking (simplified)
if field.type == "integer" and not isinstance(value, int):
errors.append(f"Field {field.name} must be integer, got {type(value).__name__}")
elif field.type == "string" and not isinstance(value, str):
errors.append(f"Field {field.name} must be string, got {type(value).__name__}")
elif field.type == "float" and not isinstance(value, (int, float)):
errors.append(f"Field {field.name} must be float, got {type(value).__name__}")
elif field.type == "boolean" and not isinstance(value, bool):
errors.append(f"Field {field.name} must be boolean, got {type(value).__name__}")
# Constraint validation
if field.constraints:
constraint_errors = self._validate_constraints(field.name, value, field.constraints)
errors.extend(constraint_errors)
return errors
def _validate_constraints(
self,
field_name: str,
value: Any,
constraints: Dict[str, Any]
) -> List[str]:
"""Validate field constraints"""
errors: List[str] = []
if "min" in constraints and value < constraints["min"]:
errors.append(f"Field {field_name} value {value} is below minimum {constraints['min']}")
if "max" in constraints and value > constraints["max"]:
errors.append(f"Field {field_name} value {value} exceeds maximum {constraints['max']}")
if "enum" in constraints and value not in constraints["enum"]:
errors.append(f"Field {field_name} value {value} not in allowed values {constraints['enum']}")
if "pattern" in constraints and isinstance(value, str):
import re
if not re.match(constraints["pattern"], value):
errors.append(f"Field {field_name} value {value} does not match pattern {constraints['pattern']}")
return errors
# Example: Define a data contract for user events
def create_user_events_contract() -> DataContract:
"""Create data contract for user events stream"""
contract = DataContract(
contract_id="user_events_v1",
version="1.0.0",
name="User Events Contract",
description="Contract for user interaction events from web application",
owner="data-engineering-team",
fields=[
FieldDefinition(
name="event_id",
type="string",
required=True,
description="Unique event identifier (UUID)",
constraints={"pattern": r"^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$"}
),
FieldDefinition(
name="user_id",
type="integer",
required=True,
description="User identifier",
constraints={"min": 1}
),
FieldDefinition(
name="event_type",
type="string",
required=True,
description="Type of event",
constraints={"enum": ["page_view", "click", "purchase", "logout"]}
),
FieldDefinition(
name="timestamp",
type="datetime",
required=True,
description="Event timestamp in ISO 8601 format"
),
FieldDefinition(
name="properties",
type="string", # JSON string
required=False,
description="Additional event properties as JSON"
),
],
created_at=datetime.now(),
updated_at=datetime.now()
)
return contract
# Example usage
if __name__ == "__main__":
contract = create_user_events_contract()
# Valid event
valid_event = {
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"user_id": 12345,
"event_type": "purchase",
"timestamp": "2024-01-15T10:30:00Z",
"properties": '{"product_id": 789, "amount": 99.99}'
}
errors = contract.validate_data(valid_event)
print(f"Valid event errors: {errors}") # Should be empty
# Invalid event
invalid_event = {
"event_id": "invalid-uuid", # Wrong format
"user_id": -1, # Below minimum
"event_type": "unknown_event", # Not in enum
# Missing required 'timestamp'
}
errors = contract.validate_data(invalid_event)
print(f"\nInvalid event errors:")
for error in errors:
print(f" - {error}")
# test_transformations.py
import pytest
import pandas as pd
from datetime import datetime
def clean_user_data(df: pd.DataFrame) -> pd.DataFrame:
"""Clean user data - function to test"""
df = df.copy()
# Remove duplicates
df = df.drop_duplicates(subset=['user_id'])
# Fill missing ages with median
df['age'] = df['age'].fillna(df['age'].median())
# Standardize email to lowercase
df['email'] = df['email'].str.lower().str.strip()
# Remove invalid emails
df = df[df['email'].str.contains('@', na=False)]
return df
class TestUserDataCleaning:
"""Test suite for user data cleaning function"""
@pytest.fixture
def sample_user_data(self) -> pd.DataFrame:
"""Fixture providing sample user data"""
return pd.DataFrame({
'user_id': [1, 2, 3, 3, 4], # 3 is duplicate
'email': ['[email protected]', '[email protected]', 'invalid', '[email protected]', '[email protected]'],
'age': [25, None, 30, 30, 35]
})
def test_removes_duplicates(self, sample_user_data):
"""Test that duplicate user_ids are removed"""
result = clean_user_data(sample_user_data)
assert len(result) < len(sample_user_data)
assert result['user_id'].is_unique
def test_fills_missing_ages(self, sample_user_data):
"""Test that missing ages are filled with median"""
result = clean_user_data(sample_user_data)
assert result['age'].isnull().sum() == 0
def test_standardizes_emails(self, sample_user_data):
"""Test that emails are lowercase and trimmed"""
result = clean_user_data(sample_user_data)
for email in result['email']:
assert email == email.lower()
assert email == email.strip()
def test_removes_invalid_emails(self, sample_user_data):
"""Test that invalid emails are removed"""
result = clean_user_data(sample_user_data)
assert all('@' in email for email in result['email'])
def test_handles_empty_dataframe(self):
"""Test handling of empty input"""
empty_df = pd.DataFrame(columns=['user_id', 'email', 'age'])
result = clean_user_data(empty_df)
assert len(result) == 0
assert list(result.columns) == ['user_id', 'email', 'age']
# test_pipeline_integration.py
import pytest
from sqlalchemy import create_engine
import pandas as pd
from typing import Generator
@pytest.fixture(scope="session")
def test_database() -> Generator:
"""Create test database for integration tests"""
# Use in-memory SQLite for testing
engine = create_engine("sqlite:///:memory:")
# Create tables
with engine.connect() as conn:
conn.execute("""
CREATE TABLE source_users (
user_id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP
)
""")
conn.execute("""
CREATE TABLE target_users (
user_id INTEGER PRIMARY KEY,
name TEXT,
email TEXT,
created_at TIMESTAMP,
loaded_at TIMESTAMP
)
""")
yield engine
# Cleanup
engine.dispose()
def extract_users(engine) -> pd.DataFrame:
"""Extract users from source"""
query = "SELECT * FROM source_users"
return pd.read_sql(query, engine)
def load_users(df: pd.DataFrame, engine) -> None:
"""Load users to target"""
df['loaded_at'] = datetime.now()
df.to_sql('target_users', engine, if_exists='append', index=False)
class TestUserPipeline:
"""Integration tests for user data pipeline"""
def test_end_to_end_pipeline(self, test_database):
"""Test complete pipeline from source to target"""
# Insert test data into source
test_users = pd.DataFrame({
'user_id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'email': ['[email protected]', '[email protected]', '[email protected]'],
'created_at': [datetime.now()] * 3
})
test_users.to_sql('source_users', test_database, if_exists='append', index=False)
# Run pipeline
extracted = extract_users(test_database)
cleaned = clean_user_data(extracted)
load_users(cleaned, test_database)
# Verify results
result = pd.read_sql("SELECT * FROM target_users", test_database)
assert len(result) == 3
assert set(result['user_id']) == {1, 2, 3}
assert all(result['loaded_at'].notnull())
# data_quality_metrics.py
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Any
import pandas as pd
@dataclass
class DataQualityMetrics:
"""Data quality metrics for monitoring"""
table_name: str
timestamp: datetime
row_count: int
null_count: Dict[str, int]
duplicate_count: int
schema_match: bool
freshness_hours: float
def to_dict(self) -> Dict[str, Any]:
"""Convert metrics to dictionary for logging"""
return {
"table_name": self.table_name,
"timestamp": self.timestamp.isoformat(),
"row_count": self.row_count,
"null_count": self.null_count,
"duplicate_count": self.duplicate_count,
"schema_match": self.schema_match,
"freshness_hours": self.freshness_hours
}
def calculate_data_quality_metrics(
df: pd.DataFrame,
table_name: str,
expected_schema: List[str],
timestamp_column: str = "updated_at"
) -> DataQualityMetrics:
"""
Calculate comprehensive data quality metrics.
Args:
df: DataFrame to analyze
table_name: Name of the table/dataset
expected_schema: List of expected column names
timestamp_column: Column containing update timestamps
Returns:
DataQualityMetrics object
"""
# Calculate null counts per column
null_counts = df.isnull().sum().to_dict()
# Check schema match
schema_match = set(df.columns) == set(expected_schema)
# Calculate data freshness
if timestamp_column in df.columns:
latest_timestamp = pd.to_datetime(df[timestamp_column]).max()
freshness_hours = (datetime.now() - latest_timestamp).total_seconds() / 3600
else:
freshness_hours = -1 # Unknown
return DataQualityMetrics(
table_name=table_name,
timestamp=datetime.now(),
row_count=len(df),
null_count=null_counts,
duplicate_count=df.duplicated().sum(),
schema_match=schema_match,
freshness_hours=freshness_hours
)
def send_quality_metrics_to_monitoring(metrics: DataQualityMetrics) -> None:
"""
Send metrics to monitoring system (CloudWatch, Datadog, etc.)
Args:
metrics: Data quality metrics to send
"""
# Example: Send to CloudWatch
import boto3
cloudwatch = boto3.client('cloudwatch')
cloudwatch.put_metric_data(
Namespace='DataQuality',
MetricData=[
{
'MetricName': 'RowCount',
'Value': metrics.row_count,
'Unit': 'Count',
'Timestamp': metrics.timestamp,
'Dimensions': [
{'Name': 'TableName', 'Value': metrics.table_name}
]
},
{
'MetricName': 'DuplicateCount',
'Value': metrics.duplicate_count,
'Unit': 'Count',
'Timestamp': metrics.timestamp,
'Dimensions': [
{'Name': 'TableName', 'Value': metrics.table_name}
]
},
{
'MetricName': 'DataFreshness',
'Value': metrics.freshness_hours,
'Unit': 'None',
'Timestamp': metrics.timestamp,
'Dimensions': [
{'Name': 'TableName', 'Value': metrics.table_name}
]
}
]
)
print(f"Metrics sent to CloudWatch for {metrics.table_name}")