Part 11: ModelOps — Governance, Drift Detection, and Production Lifecycle
The Problem with "Ship It and Forget It"
Drift Detection with Evidently AI
# jobs/drift_detector/detector.py
import json
import os
from datetime import datetime, timedelta
import pandas as pd
from evidently.metrics import (
DatasetDriftMetric,
DatasetMissingValuesMetric,
)
from evidently.report import Report
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway
PUSHGATEWAY_URL = os.environ["PUSHGATEWAY_URL"]
MLFLOW_TRACKING_URI = os.environ["MLFLOW_TRACKING_URI"]
REFERENCE_DATASET_PATH = os.environ["REFERENCE_DATASET_PATH"]
def fetch_production_data(hours: int = 24) -> pd.DataFrame:
"""Fetch the last N hours of production inference inputs."""
# In practice, I log inference inputs to a PostgreSQL table
# This is the same DB as the application, in a separate schema
import psycopg2
conn = psycopg2.connect(os.environ["DATABASE_URL"])
df = pd.read_sql(
f"""
SELECT feature_vector, predicted_at
FROM ml.recommendation_inference_log
WHERE predicted_at >= NOW() - INTERVAL '{hours} hours'
""",
conn,
)
conn.close()
return df
def run_drift_detection():
reference_df = pd.read_parquet(REFERENCE_DATASET_PATH)
current_df = fetch_production_data(hours=24)
if len(current_df) < 100:
print("Not enough production samples for drift detection, skipping")
return
report = Report(metrics=[
DatasetDriftMetric(),
DatasetMissingValuesMetric(),
])
report.run(reference_data=reference_df, current_data=current_df)
result = report.as_dict()
drift_detected = result["metrics"][0]["result"]["dataset_drift"]
drift_share = result["metrics"][0]["result"]["share_of_drifted_columns"]
# Push metrics to Prometheus via Pushgateway
registry = CollectorRegistry()
drift_gauge = Gauge(
"goreliable_model_drift_detected",
"1 if dataset drift detected, 0 otherwise",
["model"],
registry=registry,
)
drift_share_gauge = Gauge(
"goreliable_model_drift_share",
"Share of features that have drifted",
["model"],
registry=registry,
)
drift_gauge.labels(model="recommendation").set(1 if drift_detected else 0)
drift_share_gauge.labels(model="recommendation").set(drift_share)
push_to_gateway(PUSHGATEWAY_URL, job="drift-detector", registry=registry)
print(f"Drift detected: {drift_detected}, share: {drift_share:.2%}")
return drift_detected
if __name__ == "__main__":
run_drift_detection()Drift Alert to Retraining Trigger
The Go Model Metadata Service
Model Rollback
Governance Checklist
PreviousPart 10: LLMOps — Operating Large Language Models ReliablyNextPart 12: GitOps at Scale — ArgoCD Orchestrating the Full Platform
Last updated