MLOps Journey: A Data Engineer's Perspective with Databricks and GitLab
Published: June 30, 2025
As a data engineering practitioner, I've witnessed the evolution of machine learning operations from experimental notebooks to production-ready systems. My journey with MLOps has been filled with challenges, learnings, and transformative experiences. In this post, I'll share my personal experiences implementing MLOps practices using Databricks Community Edition, Python, and GitLab as my technology stack of choice.
The Data Engineer's Dilemma in ML Projects
When I first transitioned from traditional data engineering to machine learning projects, I quickly realized that my existing toolkit and processes were insufficient. Traditional data pipelines were deterministic and relatively straightforward to test and deploy. Machine learning pipelines, however, introduced new complexities:
Models that behaved differently with varying data distributions
Experiments that needed careful tracking and reproducibility
Model drift that required continuous monitoring
Increased collaboration needs between data scientists and engineers
I found myself asking: How do I bring the same level of rigor and automation to ML workflows that I've established for data processing pipelines?
My MLOps Architecture Journey
After numerous iterations, I developed an MLOps architecture that balanced flexibility with governance. Here's a sequence diagram showing the end-to-end workflow I established:
This workflow helped establish clear handoffs between roles while maintaining the flexibility data scientists needed for experimentation.
Setting Up the Infrastructure
Databricks Community Edition: The Experimentation Platform
Databricks Community Edition became the foundation of my MLOps practice for several reasons:
It provided a collaborative notebook environment that data scientists loved
It included built-in MLflow for experiment tracking
It offered seamless scaling for larger workloads
It was accessible without enterprise-level budgets
Setting up Databricks for MLOps wasn't trivial. I needed to:
Configure workspace permissions
Create cluster configurations that balanced cost with performance
Set up MLflow experiment tracking
Establish connections with GitLab
The most critical part was establishing the MLflow tracking server:
# In Databricks notebook
import mlflow
# Set the experiment location
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
mlflow.set_experiment(f"/Users/{username}/mlops_experiments")
# Start tracking an experiment
with mlflow.start_run(run_name="baseline_model") as run:
# Log parameters
mlflow.log_param("learning_rate", 0.01)
mlflow.log_param("max_depth", 5)
# Log metrics during training
for epoch in range(10):
train_accuracy = 0.80 + epoch * 0.01
mlflow.log_metric("train_accuracy", train_accuracy, step=epoch)
mlflow.log_metric("val_accuracy", train_accuracy - 0.05, step=epoch)
# Log the model itself
mlflow.sklearn.log_model(trained_model, "random_forest_model")
# Record the run ID for later reference
run_id = run.info.run_id
print(f"Experiment run ID: {run_id}")
GitLab: Version Control and CI/CD Pipeline
While Databricks handled experimentation, I needed a robust system for version control, collaboration, and automated testing. GitLab became my platform of choice because:
It provided comprehensive CI/CD capabilities
It had excellent support for merge requests and code reviews
It integrated well with Python ecosystems
It facilitated collaboration between data scientists and engineers
I structured my GitLab repository to accommodate both the code and ML artifacts:
mlops-project/
├── .gitlab-ci.yml # CI/CD pipeline configuration
├── README.md # Project documentation
├── config/ # Configuration files
│ ├── model_params.yaml # Model hyperparameters
│ └── pipeline_config.yaml # Pipeline configuration
├── data/ # Data versioning with DVC
│ └── .gitignore # Ignore data files, track DVC pointers
├── models/ # Model registry local copy
│ └── .gitignore # Ignore model binaries
├── notebooks/ # Databricks notebooks export
│ ├── exploration/ # EDA notebooks
│ └── training/ # Training notebooks
├── src/ # Python source code
│ ├── data_processing/ # Data processing scripts
│ │ ├── __init__.py
│ │ └── preprocessing.py # Data preprocessing
│ ├── evaluation/ # Model evaluation
│ │ ├── __init__.py
│ │ └── metrics.py # Evaluation metrics
│ ├── model/ # Model definition
│ │ ├── __init__.py
│ │ └── train.py # Model training code
│ └── utils/ # Utility functions
│ ├── __init__.py
│ └── databricks_utils.py # Utils for Databricks
├── tests/ # Unit and integration tests
│ ├── __init__.py
│ ├── test_preprocessing.py # Test preprocessing functions
│ └── test_model.py # Test model functions
└── deployment/ # Deployment scripts
├── databricks_deploy.py # Deploy to Databricks
└── environment.yml # Environment definition
The CI/CD pipeline was configured to:
Run tests on code changes
Validate data quality
Train and validate models
Register models if they met performance criteria
Deploy models to production
Here's the GitLab CI/CD pipeline configuration that tied it all together:
# .gitlab-ci.yml
stages:
- test
- data_validation
- train
- evaluate
- register
- deploy
variables:
DATABRICKS_HOST: ${DATABRICKS_HOST}
DATABRICKS_TOKEN: ${DATABRICKS_TOKEN}
MLFLOW_TRACKING_URI: "databricks"
# Run unit tests
unit_tests:
stage: test
image: python:3.9
script:
- pip install -r requirements.txt
- pytest tests/
except:
- tags
# Validate the input data
data_validation:
stage: data_validation
image: python:3.9
script:
- pip install -r requirements.txt
- python src/data_processing/validate_data.py
artifacts:
paths:
- data/processed/
expire_in: 1 week
except:
- tags
# Train the model
model_training:
stage: train
image: python:3.9
script:
- pip install -r requirements.txt
- python src/model/train.py
artifacts:
paths:
- models/trained_model.pkl
- training_metadata.json
expire_in: 1 week
except:
- tags
needs:
- data_validation
# Evaluate the model
model_evaluation:
stage: evaluate
image: python:3.9
script:
- pip install -r requirements.txt
- python src/evaluation/evaluate_model.py
artifacts:
paths:
- evaluation_results.json
expire_in: 1 week
needs:
- model_training
except:
- tags
# Register the model if it meets quality criteria
model_registration:
stage: register
image: python:3.9
script:
- pip install -r requirements.txt
- python src/model/register_model.py
only:
- main
needs:
- model_evaluation
# Deploy the model to production
model_deployment:
stage: deploy
image: python:3.9
script:
- pip install -r requirements.txt
- python deployment/databricks_deploy.py
only:
- main
when: manual
needs:
- model_registration
The Data Processing and Model Training Workflow
One of my biggest challenges was establishing a repeatable process for data processing and model training. Here's a sequence diagram of the workflow I implemented:
The code to implement this workflow was designed to be both robust and maintainable:
# src/data_processing/preprocessing.py
import pandas as pd
from sklearn.model_selection import train_test_split
import mlflow
import yaml
import os
def load_config():
"""Load the preprocessing configuration."""
with open("config/pipeline_config.yaml", "r") as f:
return yaml.safe_load(f)
def load_raw_data(config):
"""Load data from the configured source."""
data_path = config["data"]["source_path"]
return pd.read_csv(data_path)
def validate_data(df, config):
"""Perform data validation checks."""
# Check for missing values
missing_pct = df.isnull().mean() * 100
missing_threshold = config["data_validation"]["missing_threshold"]
# Log validation results
with mlflow.start_run(run_name="data_validation") as run:
mlflow.log_params({
"dataset_rows": len(df),
"dataset_columns": len(df.columns),
"missing_threshold": missing_threshold
})
# Log missing value percentages
for col, pct in missing_pct.items():
mlflow.log_metric(f"missing_pct_{col}", pct)
# Check against thresholds
columns_exceeding_threshold = missing_pct[missing_pct > missing_threshold].index.tolist()
if columns_exceeding_threshold:
mlflow.log_param("validation_status", "failed")
mlflow.log_param("failed_columns", str(columns_exceeding_threshold))
raise ValueError(f"Columns exceeding missing threshold: {columns_exceeding_threshold}")
mlflow.log_param("validation_status", "passed")
return df
def preprocess_features(df, config):
"""Apply feature engineering steps."""
# Create new features
if config["feature_engineering"]["create_date_features"]:
df['date'] = pd.to_datetime(df[config["data"]["date_column"]])
df['dayofweek'] = df['date'].dt.dayofweek
df['month'] = df['date'].dt.month
df['year'] = df['date'].dt.year
# Handle categorical features
categorical_cols = config["data"]["categorical_columns"]
if config["feature_engineering"]["one_hot_encode"]:
df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)
# Drop unnecessary columns
if "columns_to_drop" in config["data"]:
df = df.drop(columns=config["data"]["columns_to_drop"])
return df
def split_dataset(df, config):
"""Split data into training and testing sets."""
target_col = config["data"]["target_column"]
test_size = config["data_split"]["test_size"]
random_state = config["data_split"]["random_state"]
X = df.drop(columns=[target_col])
y = df[target_col]
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=test_size, random_state=random_state,
stratify=y if config["data_split"]["stratify"] else None
)
# Log the split details
with mlflow.start_run(run_name="data_splitting"):
mlflow.log_params({
"test_size": test_size,
"random_state": random_state,
"stratify": config["data_split"]["stratify"],
"train_samples": len(X_train),
"test_samples": len(X_test)
})
# Save the splits
output_path = config["data"]["processed_path"]
os.makedirs(output_path, exist_ok=True)
X_train.to_csv(f"{output_path}/X_train.csv", index=False)
X_test.to_csv(f"{output_path}/X_test.csv", index=False)
y_train.to_csv(f"{output_path}/y_train.csv", index=False)
y_test.to_csv(f"{output_path}/y_test.csv", index=False)
return X_train, X_test, y_train, y_test
def run_preprocessing_pipeline():
"""Run the full preprocessing pipeline."""
config = load_config()
df = load_raw_data(config)
df = validate_data(df, config)
df = preprocess_features(df, config)
X_train, X_test, y_train, y_test = split_dataset(df, config)
print("Preprocessing pipeline completed successfully.")
return X_train, X_test, y_train, y_test
if __name__ == "__main__":
run_preprocessing_pipeline()
The model training code followed a similar pattern of configuration-driven, trackable processes:
# src/model/train.py
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
import yaml
import os
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV
def load_config():
"""Load the model training configuration."""
with open("config/model_params.yaml", "r") as f:
return yaml.safe_load(f)
def load_training_data(config):
"""Load the preprocessed training data."""
data_path = config["data"]["processed_path"]
X_train = pd.read_csv(f"{data_path}/X_train.csv")
y_train = pd.read_csv(f"{data_path}/y_train.csv").values.ravel()
return X_train, y_train
def train_model(X_train, y_train, config):
"""Train the model with the specified configuration."""
# Set up MLflow experiment
mlflow.set_experiment(config["mlflow"]["experiment_name"])
with mlflow.start_run(run_name=config["mlflow"]["run_name"]) as run:
# Log parameters
mlflow.log_params(config["model"]["params"])
if config["model"]["hyperparameter_tuning"]["enabled"]:
# Perform hyperparameter tuning
param_grid = config["model"]["hyperparameter_tuning"]["param_grid"]
cv = config["model"]["hyperparameter_tuning"]["cv"]
base_model = RandomForestClassifier()
grid_search = GridSearchCV(
estimator=base_model,
param_grid=param_grid,
cv=cv,
scoring='f1',
n_jobs=-1
)
grid_search.fit(X_train, y_train)
# Log hyperparameter tuning results
best_params = grid_search.best_params_
mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
mlflow.log_metric("best_cv_score", grid_search.best_score_)
# Use the best model
model = grid_search.best_estimator_
else:
# Train with specified parameters
model = RandomForestClassifier(
n_estimators=config["model"]["params"]["n_estimators"],
max_depth=config["model"]["params"]["max_depth"],
min_samples_split=config["model"]["params"]["min_samples_split"],
random_state=config["model"]["params"]["random_state"]
)
model.fit(X_train, y_train)
# Log feature importance
if hasattr(model, 'feature_importances_'):
importances = model.feature_importances_
feature_df = pd.DataFrame({
'feature': X_train.columns,
'importance': importances
}).sort_values('importance', ascending=False)
# Log as a CSV artifact
feature_importance_path = "feature_importance.csv"
feature_df.to_csv(feature_importance_path, index=False)
mlflow.log_artifact(feature_importance_path)
# Also log top features as parameters
top_features = feature_df.head(5)['feature'].tolist()
mlflow.log_param("top_features", str(top_features))
# Log the model
mlflow.sklearn.log_model(model, "model")
# Save run ID for later reference
run_id = run.info.run_id
# Save model metadata
metadata = {
"run_id": run_id,
"model_type": type(model).__name__,
"feature_count": X_train.shape[1],
"training_samples": X_train.shape[0],
"timestamp": pd.Timestamp.now().isoformat()
}
import json
with open("training_metadata.json", "w") as f:
json.dump(metadata, f)
# Log the metadata as an artifact
mlflow.log_artifact("training_metadata.json")
return model, run_id
def save_model(model, run_id, config):
"""Save the trained model to disk."""
import joblib
os.makedirs("models", exist_ok=True)
model_path = "models/trained_model.pkl"
joblib.dump(model, model_path)
print(f"Model saved to {model_path}")
print(f"MLflow run ID: {run_id}")
return model_path
def run_training_pipeline():
"""Run the full model training pipeline."""
config = load_config()
X_train, y_train = load_training_data(config)
model, run_id = train_model(X_train, y_train, config)
model_path = save_model(model, run_id, config)
print("Model training pipeline completed successfully.")
return model, run_id, model_path
if __name__ == "__main__":
run_training_pipeline()
The Model Deployment and Monitoring Workflow
The final piece of my MLOps puzzle was model deployment and monitoring. This was perhaps the most challenging part, requiring careful orchestration between GitLab CI/CD, Databricks, and production systems. Here's a sequence diagram showing the deployment process:
The code for model deployment looked like this:
# deployment/databricks_deploy.py
import os
import requests
import json
import mlflow
import yaml
from mlflow.tracking import MlflowClient
def load_config():
"""Load deployment configuration."""
with open("config/deployment_config.yaml", "r") as f:
return yaml.safe_load(f)
def load_evaluation_results():
"""Load the model evaluation results."""
with open("evaluation_results.json", "r") as f:
return json.load(f)
def check_deployment_criteria(eval_results, config):
"""Check if the model meets deployment criteria."""
criteria_met = True
# Check each metric against threshold
for metric, threshold in config["deployment"]["criteria"].items():
if eval_results[metric] < threshold:
print(f"Deployment criteria not met: {metric} = {eval_results[metric]}, threshold = {threshold}")
criteria_met = False
return criteria_met
def get_production_model_version(client, model_name):
"""Get current production model version if it exists."""
try:
production_model = client.get_latest_versions(model_name, stages=["Production"])
if production_model:
return production_model[0].version
return None
except:
return None
def deploy_model_to_databricks(config, eval_results):
"""Deploy the model to Databricks."""
# Set MLflow tracking URI
mlflow.set_tracking_uri("databricks")
client = MlflowClient()
# Load metadata to get run ID
with open("training_metadata.json", "r") as f:
metadata = json.load(f)
run_id = metadata["run_id"]
model_name = config["deployment"]["model_name"]
# Register the model if deployment criteria are met
if check_deployment_criteria(eval_results, config):
print(f"Deployment criteria met. Registering model...")
# Register the model in MLflow Model Registry
model_uri = f"runs:/{run_id}/model"
model_details = mlflow.register_model(model_uri, model_name)
model_version = model_details.version
print(f"Model registered as: {model_name} version {model_version}")
# Transition to staging
client.transition_model_version_stage(
name=model_name,
version=model_version,
stage="Staging"
)
print(f"Model {model_name} version {model_version} transitioned to Staging")
# Set up Databricks API request
databricks_host = os.environ.get("DATABRICKS_HOST")
databricks_token = os.environ.get("DATABRICKS_TOKEN")
headers = {
"Authorization": f"Bearer {databricks_token}",
"Content-Type": "application/json"
}
# Create a job to validate the model in staging
validation_job_config = {
"name": f"Validate {model_name} v{model_version}",
"tasks": [
{
"task_key": "validate_model",
"notebook_task": {
"notebook_path": "/Shared/validation/validate_model",
"base_parameters": {
"model_name": model_name,
"model_version": model_version
}
},
"new_cluster": {
"spark_version": "10.4.x-scala2.12",
"node_type_id": "Standard_DS3_v2",
"num_workers": 1
}
}
]
}
# Create and run validation job
print("Creating validation job...")
response = requests.post(
f"{databricks_host}/api/2.1/jobs/create",
headers=headers,
json=validation_job_config
)
if response.status_code == 200:
job_id = response.json()["job_id"]
print(f"Created validation job with ID: {job_id}")
# Run the job
run_response = requests.post(
f"{databricks_host}/api/2.1/jobs/run-now",
headers=headers,
json={"job_id": job_id}
)
if run_response.status_code == 200:
run_id = run_response.json()["run_id"]
print(f"Started validation job run with ID: {run_id}")
# Wait for job completion would be implemented here
# For CI/CD purposes, we'd typically set up a separate job to check results
print("Model deployment initiated successfully.")
return True
else:
print(f"Failed to start validation job: {run_response.text}")
else:
print(f"Failed to create validation job: {response.text}")
else:
print("Model did not meet deployment criteria. Deployment aborted.")
return False
def main():
"""Main deployment function."""
config = load_config()
eval_results = load_evaluation_results()
success = deploy_model_to_databricks(config, eval_results)
if success:
print("Model deployment pipeline completed successfully.")
exit(0)
else:
print("Model deployment failed.")
exit(1)
if __name__ == "__main__":
main()
Lessons Learned as a Data Engineering Practitioner
Throughout my MLOps journey, I've learned several critical lessons that have shaped my practice as a data engineer:
1. Start with Strong Data Foundations
As a data engineer, I found that ML projects amplify data quality issues. My biggest success factor was investing heavily in data validation and quality controls. Prior to implementing MLOps, nearly 40% of model failures could be traced to data issues. After implementing robust data validation in the pipeline, this dropped to less than 10%.
2. Embrace Modularity
Making ML pipelines modular helps isolate issues and enables incremental improvements. I separate my pipelines into discrete steps:
Data ingestion
Validation
Preprocessing
Feature engineering
Model training
Evaluation
Deployment
This approach has reduced debugging time by 60% and made it easier to identify bottlenecks.
3. Automate Thoughtfully
Not everything should be automated immediately. I've found a phased approach works best:
Start with automating the most error-prone manual tasks
Add monitoring and alerting next
Finally, implement automated retraining and deployment
4. Version Everything
In ML systems, versioning goes beyond code. I track:
Data versions (using DVC)
Model versions (using MLflow)
Environment configurations
Experiment parameters
This comprehensive versioning has been crucial for reproducing results and debugging production issues.
5. Monitor Not Just Performance, But Data Too
My most valuable lesson was learning to monitor input data distributions in production. Several times, we caught data drift issues before they impacted model performance by setting up monitoring for:
Feature distributions
Input data schema changes
Data quality metrics
6. Collaboration Is Key
The most successful ML projects I've worked on involved close collaboration between data scientists and engineers. Using tools like:
Shared GitLab repositories
Clear documentation
Standardized notebooks in Databricks
Regular sync meetings
These practices have dramatically improved the transition from experiment to production.
Challenges I Faced and How I Overcame Them
Challenge 1: Environment Inconsistency
Problem: Models would work in development but fail in production due to environment differences.
Solution: I implemented Docker containers for consistent environments and created detailed environment.yml files for Databricks clusters. This reduced environment-related failures by 90%.
Challenge 2: Long-Running Training Jobs Breaking CI/CD
Problem: Model training could take hours, making CI/CD pipelines impractical.
Solution: I separated the CI/CD pipeline into stages and used Databricks Jobs API to handle long-running training processes asynchronously. This kept most CI jobs under 10 minutes while still ensuring quality.
Challenge 3: Model Drift in Production
Problem: Models would silently degrade over time as data patterns shifted.
Solution: I implemented:
Statistical monitoring of input feature distributions
Performance monitoring with sliding windows
Automated retraining triggers when metrics dropped below thresholds
This approach has caught drift issues weeks before they would have impacted business metrics.
Conclusion: The Continuous MLOps Journey
My MLOps journey as a data engineer has transformed how I approach machine learning projects. The integration of Databricks for experimentation, GitLab for CI/CD, and MLflow for experiment tracking has created a robust, reproducible ML pipeline that balances flexibility with governance.
The key takeaway from my experience is that MLOps is not a destination but a continuous journey of improvement. Start small, focus on the highest-value problems, and gradually expand your MLOps capabilities.
For data engineers taking their first steps into MLOps, I recommend:
Start with experiment tracking and model versioning
Focus on data quality and validation
Build modular pipelines that can evolve
Implement monitoring early
Collaborate closely with data scientists
I hope sharing my personal MLOps journey helps you on yours. What challenges are you facing in implementing MLOps in your organization? What tools and practices have you found most valuable? I'd love to hear about your experiences in the comments.
About the Author: A passionate data engineering practitioner with years of experience implementing MLOps solutions across various industries.
Last updated