MLOps Journey: A Data Engineer's Perspective with Databricks and GitLab

Published: June 30, 2025

As a data engineering practitioner, I've witnessed the evolution of machine learning operations from experimental notebooks to production-ready systems. My journey with MLOps has been filled with challenges, learnings, and transformative experiences. In this post, I'll share my personal experiences implementing MLOps practices using Databricks Community Edition, Python, and GitLab as my technology stack of choice.

The Data Engineer's Dilemma in ML Projects

When I first transitioned from traditional data engineering to machine learning projects, I quickly realized that my existing toolkit and processes were insufficient. Traditional data pipelines were deterministic and relatively straightforward to test and deploy. Machine learning pipelines, however, introduced new complexities:

  • Models that behaved differently with varying data distributions

  • Experiments that needed careful tracking and reproducibility

  • Model drift that required continuous monitoring

  • Increased collaboration needs between data scientists and engineers

I found myself asking: How do I bring the same level of rigor and automation to ML workflows that I've established for data processing pipelines?

My MLOps Architecture Journey

After numerous iterations, I developed an MLOps architecture that balanced flexibility with governance. Here's a sequence diagram showing the end-to-end workflow I established:

This workflow helped establish clear handoffs between roles while maintaining the flexibility data scientists needed for experimentation.

Setting Up the Infrastructure

Databricks Community Edition: The Experimentation Platform

Databricks Community Edition became the foundation of my MLOps practice for several reasons:

  1. It provided a collaborative notebook environment that data scientists loved

  2. It included built-in MLflow for experiment tracking

  3. It offered seamless scaling for larger workloads

  4. It was accessible without enterprise-level budgets

Setting up Databricks for MLOps wasn't trivial. I needed to:

  1. Configure workspace permissions

  2. Create cluster configurations that balanced cost with performance

  3. Set up MLflow experiment tracking

  4. Establish connections with GitLab

The most critical part was establishing the MLflow tracking server:

# In Databricks notebook
import mlflow

# Set the experiment location
username = dbutils.notebook.entry_point.getDbutils().notebook().getContext().userName().get()
mlflow.set_experiment(f"/Users/{username}/mlops_experiments")

# Start tracking an experiment
with mlflow.start_run(run_name="baseline_model") as run:
    # Log parameters
    mlflow.log_param("learning_rate", 0.01)
    mlflow.log_param("max_depth", 5)
    
    # Log metrics during training
    for epoch in range(10):
        train_accuracy = 0.80 + epoch * 0.01
        mlflow.log_metric("train_accuracy", train_accuracy, step=epoch)
        mlflow.log_metric("val_accuracy", train_accuracy - 0.05, step=epoch)
    
    # Log the model itself
    mlflow.sklearn.log_model(trained_model, "random_forest_model")
    
    # Record the run ID for later reference
    run_id = run.info.run_id
    print(f"Experiment run ID: {run_id}")

GitLab: Version Control and CI/CD Pipeline

While Databricks handled experimentation, I needed a robust system for version control, collaboration, and automated testing. GitLab became my platform of choice because:

  1. It provided comprehensive CI/CD capabilities

  2. It had excellent support for merge requests and code reviews

  3. It integrated well with Python ecosystems

  4. It facilitated collaboration between data scientists and engineers

I structured my GitLab repository to accommodate both the code and ML artifacts:

mlops-project/
├── .gitlab-ci.yml                # CI/CD pipeline configuration
├── README.md                     # Project documentation
├── config/                       # Configuration files
│   ├── model_params.yaml         # Model hyperparameters
│   └── pipeline_config.yaml      # Pipeline configuration
├── data/                         # Data versioning with DVC
│   └── .gitignore                # Ignore data files, track DVC pointers
├── models/                       # Model registry local copy
│   └── .gitignore                # Ignore model binaries
├── notebooks/                    # Databricks notebooks export
│   ├── exploration/              # EDA notebooks
│   └── training/                 # Training notebooks
├── src/                          # Python source code
│   ├── data_processing/          # Data processing scripts
│   │   ├── __init__.py
│   │   └── preprocessing.py      # Data preprocessing
│   ├── evaluation/               # Model evaluation
│   │   ├── __init__.py
│   │   └── metrics.py            # Evaluation metrics
│   ├── model/                    # Model definition
│   │   ├── __init__.py
│   │   └── train.py              # Model training code
│   └── utils/                    # Utility functions
│       ├── __init__.py
│       └── databricks_utils.py   # Utils for Databricks
├── tests/                        # Unit and integration tests
│   ├── __init__.py
│   ├── test_preprocessing.py     # Test preprocessing functions
│   └── test_model.py             # Test model functions
└── deployment/                   # Deployment scripts
    ├── databricks_deploy.py      # Deploy to Databricks
    └── environment.yml           # Environment definition

The CI/CD pipeline was configured to:

  1. Run tests on code changes

  2. Validate data quality

  3. Train and validate models

  4. Register models if they met performance criteria

  5. Deploy models to production

Here's the GitLab CI/CD pipeline configuration that tied it all together:

# .gitlab-ci.yml

stages:
  - test
  - data_validation
  - train
  - evaluate
  - register
  - deploy

variables:
  DATABRICKS_HOST: ${DATABRICKS_HOST}
  DATABRICKS_TOKEN: ${DATABRICKS_TOKEN}
  MLFLOW_TRACKING_URI: "databricks"

# Run unit tests
unit_tests:
  stage: test
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - pytest tests/
  except:
    - tags

# Validate the input data
data_validation:
  stage: data_validation
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python src/data_processing/validate_data.py
  artifacts:
    paths:
      - data/processed/
    expire_in: 1 week
  except:
    - tags

# Train the model
model_training:
  stage: train
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python src/model/train.py
  artifacts:
    paths:
      - models/trained_model.pkl
      - training_metadata.json
    expire_in: 1 week
  except:
    - tags
  needs:
    - data_validation

# Evaluate the model
model_evaluation:
  stage: evaluate
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python src/evaluation/evaluate_model.py
  artifacts:
    paths:
      - evaluation_results.json
    expire_in: 1 week
  needs:
    - model_training
  except:
    - tags

# Register the model if it meets quality criteria
model_registration:
  stage: register
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python src/model/register_model.py
  only:
    - main
  needs:
    - model_evaluation

# Deploy the model to production
model_deployment:
  stage: deploy
  image: python:3.9
  script:
    - pip install -r requirements.txt
    - python deployment/databricks_deploy.py
  only:
    - main
  when: manual
  needs:
    - model_registration

The Data Processing and Model Training Workflow

One of my biggest challenges was establishing a repeatable process for data processing and model training. Here's a sequence diagram of the workflow I implemented:

The code to implement this workflow was designed to be both robust and maintainable:

# src/data_processing/preprocessing.py

import pandas as pd
from sklearn.model_selection import train_test_split
import mlflow
import yaml
import os

def load_config():
    """Load the preprocessing configuration."""
    with open("config/pipeline_config.yaml", "r") as f:
        return yaml.safe_load(f)

def load_raw_data(config):
    """Load data from the configured source."""
    data_path = config["data"]["source_path"]
    return pd.read_csv(data_path)

def validate_data(df, config):
    """Perform data validation checks."""
    # Check for missing values
    missing_pct = df.isnull().mean() * 100
    missing_threshold = config["data_validation"]["missing_threshold"]
    
    # Log validation results
    with mlflow.start_run(run_name="data_validation") as run:
        mlflow.log_params({
            "dataset_rows": len(df),
            "dataset_columns": len(df.columns),
            "missing_threshold": missing_threshold
        })
        
        # Log missing value percentages
        for col, pct in missing_pct.items():
            mlflow.log_metric(f"missing_pct_{col}", pct)
        
        # Check against thresholds
        columns_exceeding_threshold = missing_pct[missing_pct > missing_threshold].index.tolist()
        if columns_exceeding_threshold:
            mlflow.log_param("validation_status", "failed")
            mlflow.log_param("failed_columns", str(columns_exceeding_threshold))
            raise ValueError(f"Columns exceeding missing threshold: {columns_exceeding_threshold}")
        
        mlflow.log_param("validation_status", "passed")
    
    return df

def preprocess_features(df, config):
    """Apply feature engineering steps."""
    # Create new features
    if config["feature_engineering"]["create_date_features"]:
        df['date'] = pd.to_datetime(df[config["data"]["date_column"]])
        df['dayofweek'] = df['date'].dt.dayofweek
        df['month'] = df['date'].dt.month
        df['year'] = df['date'].dt.year
    
    # Handle categorical features
    categorical_cols = config["data"]["categorical_columns"]
    if config["feature_engineering"]["one_hot_encode"]:
        df = pd.get_dummies(df, columns=categorical_cols, drop_first=True)
    
    # Drop unnecessary columns
    if "columns_to_drop" in config["data"]:
        df = df.drop(columns=config["data"]["columns_to_drop"])
    
    return df

def split_dataset(df, config):
    """Split data into training and testing sets."""
    target_col = config["data"]["target_column"]
    test_size = config["data_split"]["test_size"]
    random_state = config["data_split"]["random_state"]
    
    X = df.drop(columns=[target_col])
    y = df[target_col]
    
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, 
        stratify=y if config["data_split"]["stratify"] else None
    )
    
    # Log the split details
    with mlflow.start_run(run_name="data_splitting"):
        mlflow.log_params({
            "test_size": test_size,
            "random_state": random_state,
            "stratify": config["data_split"]["stratify"],
            "train_samples": len(X_train),
            "test_samples": len(X_test)
        })
    
    # Save the splits
    output_path = config["data"]["processed_path"]
    os.makedirs(output_path, exist_ok=True)
    
    X_train.to_csv(f"{output_path}/X_train.csv", index=False)
    X_test.to_csv(f"{output_path}/X_test.csv", index=False)
    y_train.to_csv(f"{output_path}/y_train.csv", index=False)
    y_test.to_csv(f"{output_path}/y_test.csv", index=False)
    
    return X_train, X_test, y_train, y_test

def run_preprocessing_pipeline():
    """Run the full preprocessing pipeline."""
    config = load_config()
    df = load_raw_data(config)
    df = validate_data(df, config)
    df = preprocess_features(df, config)
    X_train, X_test, y_train, y_test = split_dataset(df, config)
    
    print("Preprocessing pipeline completed successfully.")
    return X_train, X_test, y_train, y_test

if __name__ == "__main__":
    run_preprocessing_pipeline()

The model training code followed a similar pattern of configuration-driven, trackable processes:

# src/model/train.py

import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
import yaml
import os
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import GridSearchCV

def load_config():
    """Load the model training configuration."""
    with open("config/model_params.yaml", "r") as f:
        return yaml.safe_load(f)

def load_training_data(config):
    """Load the preprocessed training data."""
    data_path = config["data"]["processed_path"]
    X_train = pd.read_csv(f"{data_path}/X_train.csv")
    y_train = pd.read_csv(f"{data_path}/y_train.csv").values.ravel()
    return X_train, y_train

def train_model(X_train, y_train, config):
    """Train the model with the specified configuration."""
    # Set up MLflow experiment
    mlflow.set_experiment(config["mlflow"]["experiment_name"])
    
    with mlflow.start_run(run_name=config["mlflow"]["run_name"]) as run:
        # Log parameters
        mlflow.log_params(config["model"]["params"])
        
        if config["model"]["hyperparameter_tuning"]["enabled"]:
            # Perform hyperparameter tuning
            param_grid = config["model"]["hyperparameter_tuning"]["param_grid"]
            cv = config["model"]["hyperparameter_tuning"]["cv"]
            
            base_model = RandomForestClassifier()
            grid_search = GridSearchCV(
                estimator=base_model,
                param_grid=param_grid,
                cv=cv,
                scoring='f1',
                n_jobs=-1
            )
            
            grid_search.fit(X_train, y_train)
            
            # Log hyperparameter tuning results
            best_params = grid_search.best_params_
            mlflow.log_params({f"best_{k}": v for k, v in best_params.items()})
            mlflow.log_metric("best_cv_score", grid_search.best_score_)
            
            # Use the best model
            model = grid_search.best_estimator_
        else:
            # Train with specified parameters
            model = RandomForestClassifier(
                n_estimators=config["model"]["params"]["n_estimators"],
                max_depth=config["model"]["params"]["max_depth"],
                min_samples_split=config["model"]["params"]["min_samples_split"],
                random_state=config["model"]["params"]["random_state"]
            )
            model.fit(X_train, y_train)
        
        # Log feature importance
        if hasattr(model, 'feature_importances_'):
            importances = model.feature_importances_
            feature_df = pd.DataFrame({
                'feature': X_train.columns,
                'importance': importances
            }).sort_values('importance', ascending=False)
            
            # Log as a CSV artifact
            feature_importance_path = "feature_importance.csv"
            feature_df.to_csv(feature_importance_path, index=False)
            mlflow.log_artifact(feature_importance_path)
            
            # Also log top features as parameters
            top_features = feature_df.head(5)['feature'].tolist()
            mlflow.log_param("top_features", str(top_features))
        
        # Log the model
        mlflow.sklearn.log_model(model, "model")
        
        # Save run ID for later reference
        run_id = run.info.run_id
        
        # Save model metadata
        metadata = {
            "run_id": run_id,
            "model_type": type(model).__name__,
            "feature_count": X_train.shape[1],
            "training_samples": X_train.shape[0],
            "timestamp": pd.Timestamp.now().isoformat()
        }
        
        import json
        with open("training_metadata.json", "w") as f:
            json.dump(metadata, f)
        
        # Log the metadata as an artifact
        mlflow.log_artifact("training_metadata.json")
        
        return model, run_id

def save_model(model, run_id, config):
    """Save the trained model to disk."""
    import joblib
    os.makedirs("models", exist_ok=True)
    model_path = "models/trained_model.pkl"
    joblib.dump(model, model_path)
    
    print(f"Model saved to {model_path}")
    print(f"MLflow run ID: {run_id}")
    return model_path

def run_training_pipeline():
    """Run the full model training pipeline."""
    config = load_config()
    X_train, y_train = load_training_data(config)
    model, run_id = train_model(X_train, y_train, config)
    model_path = save_model(model, run_id, config)
    
    print("Model training pipeline completed successfully.")
    return model, run_id, model_path

if __name__ == "__main__":
    run_training_pipeline()

The Model Deployment and Monitoring Workflow

The final piece of my MLOps puzzle was model deployment and monitoring. This was perhaps the most challenging part, requiring careful orchestration between GitLab CI/CD, Databricks, and production systems. Here's a sequence diagram showing the deployment process:

The code for model deployment looked like this:

# deployment/databricks_deploy.py

import os
import requests
import json
import mlflow
import yaml
from mlflow.tracking import MlflowClient

def load_config():
    """Load deployment configuration."""
    with open("config/deployment_config.yaml", "r") as f:
        return yaml.safe_load(f)

def load_evaluation_results():
    """Load the model evaluation results."""
    with open("evaluation_results.json", "r") as f:
        return json.load(f)
    
def check_deployment_criteria(eval_results, config):
    """Check if the model meets deployment criteria."""
    criteria_met = True
    
    # Check each metric against threshold
    for metric, threshold in config["deployment"]["criteria"].items():
        if eval_results[metric] < threshold:
            print(f"Deployment criteria not met: {metric} = {eval_results[metric]}, threshold = {threshold}")
            criteria_met = False
    
    return criteria_met

def get_production_model_version(client, model_name):
    """Get current production model version if it exists."""
    try:
        production_model = client.get_latest_versions(model_name, stages=["Production"])
        if production_model:
            return production_model[0].version
        return None
    except:
        return None

def deploy_model_to_databricks(config, eval_results):
    """Deploy the model to Databricks."""
    # Set MLflow tracking URI
    mlflow.set_tracking_uri("databricks")
    client = MlflowClient()
    
    # Load metadata to get run ID
    with open("training_metadata.json", "r") as f:
        metadata = json.load(f)
    
    run_id = metadata["run_id"]
    model_name = config["deployment"]["model_name"]
    
    # Register the model if deployment criteria are met
    if check_deployment_criteria(eval_results, config):
        print(f"Deployment criteria met. Registering model...")
        
        # Register the model in MLflow Model Registry
        model_uri = f"runs:/{run_id}/model"
        model_details = mlflow.register_model(model_uri, model_name)
        model_version = model_details.version
        
        print(f"Model registered as: {model_name} version {model_version}")
        
        # Transition to staging
        client.transition_model_version_stage(
            name=model_name,
            version=model_version,
            stage="Staging"
        )
        
        print(f"Model {model_name} version {model_version} transitioned to Staging")
        
        # Set up Databricks API request
        databricks_host = os.environ.get("DATABRICKS_HOST")
        databricks_token = os.environ.get("DATABRICKS_TOKEN")
        
        headers = {
            "Authorization": f"Bearer {databricks_token}",
            "Content-Type": "application/json"
        }
        
        # Create a job to validate the model in staging
        validation_job_config = {
            "name": f"Validate {model_name} v{model_version}",
            "tasks": [
                {
                    "task_key": "validate_model",
                    "notebook_task": {
                        "notebook_path": "/Shared/validation/validate_model",
                        "base_parameters": {
                            "model_name": model_name,
                            "model_version": model_version
                        }
                    },
                    "new_cluster": {
                        "spark_version": "10.4.x-scala2.12",
                        "node_type_id": "Standard_DS3_v2",
                        "num_workers": 1
                    }
                }
            ]
        }
        
        # Create and run validation job
        print("Creating validation job...")
        response = requests.post(
            f"{databricks_host}/api/2.1/jobs/create",
            headers=headers,
            json=validation_job_config
        )
        
        if response.status_code == 200:
            job_id = response.json()["job_id"]
            print(f"Created validation job with ID: {job_id}")
            
            # Run the job
            run_response = requests.post(
                f"{databricks_host}/api/2.1/jobs/run-now",
                headers=headers,
                json={"job_id": job_id}
            )
            
            if run_response.status_code == 200:
                run_id = run_response.json()["run_id"]
                print(f"Started validation job run with ID: {run_id}")
                
                # Wait for job completion would be implemented here
                # For CI/CD purposes, we'd typically set up a separate job to check results
                
                print("Model deployment initiated successfully.")
                return True
            else:
                print(f"Failed to start validation job: {run_response.text}")
        else:
            print(f"Failed to create validation job: {response.text}")
    else:
        print("Model did not meet deployment criteria. Deployment aborted.")
    
    return False

def main():
    """Main deployment function."""
    config = load_config()
    eval_results = load_evaluation_results()
    
    success = deploy_model_to_databricks(config, eval_results)
    
    if success:
        print("Model deployment pipeline completed successfully.")
        exit(0)
    else:
        print("Model deployment failed.")
        exit(1)

if __name__ == "__main__":
    main()

Lessons Learned as a Data Engineering Practitioner

Throughout my MLOps journey, I've learned several critical lessons that have shaped my practice as a data engineer:

1. Start with Strong Data Foundations

As a data engineer, I found that ML projects amplify data quality issues. My biggest success factor was investing heavily in data validation and quality controls. Prior to implementing MLOps, nearly 40% of model failures could be traced to data issues. After implementing robust data validation in the pipeline, this dropped to less than 10%.

2. Embrace Modularity

Making ML pipelines modular helps isolate issues and enables incremental improvements. I separate my pipelines into discrete steps:

  • Data ingestion

  • Validation

  • Preprocessing

  • Feature engineering

  • Model training

  • Evaluation

  • Deployment

This approach has reduced debugging time by 60% and made it easier to identify bottlenecks.

3. Automate Thoughtfully

Not everything should be automated immediately. I've found a phased approach works best:

  1. Start with automating the most error-prone manual tasks

  2. Add monitoring and alerting next

  3. Finally, implement automated retraining and deployment

4. Version Everything

In ML systems, versioning goes beyond code. I track:

  • Data versions (using DVC)

  • Model versions (using MLflow)

  • Environment configurations

  • Experiment parameters

This comprehensive versioning has been crucial for reproducing results and debugging production issues.

5. Monitor Not Just Performance, But Data Too

My most valuable lesson was learning to monitor input data distributions in production. Several times, we caught data drift issues before they impacted model performance by setting up monitoring for:

  • Feature distributions

  • Input data schema changes

  • Data quality metrics

6. Collaboration Is Key

The most successful ML projects I've worked on involved close collaboration between data scientists and engineers. Using tools like:

  • Shared GitLab repositories

  • Clear documentation

  • Standardized notebooks in Databricks

  • Regular sync meetings

These practices have dramatically improved the transition from experiment to production.

Challenges I Faced and How I Overcame Them

Challenge 1: Environment Inconsistency

Problem: Models would work in development but fail in production due to environment differences.

Solution: I implemented Docker containers for consistent environments and created detailed environment.yml files for Databricks clusters. This reduced environment-related failures by 90%.

Challenge 2: Long-Running Training Jobs Breaking CI/CD

Problem: Model training could take hours, making CI/CD pipelines impractical.

Solution: I separated the CI/CD pipeline into stages and used Databricks Jobs API to handle long-running training processes asynchronously. This kept most CI jobs under 10 minutes while still ensuring quality.

Challenge 3: Model Drift in Production

Problem: Models would silently degrade over time as data patterns shifted.

Solution: I implemented:

  • Statistical monitoring of input feature distributions

  • Performance monitoring with sliding windows

  • Automated retraining triggers when metrics dropped below thresholds

This approach has caught drift issues weeks before they would have impacted business metrics.

Conclusion: The Continuous MLOps Journey

My MLOps journey as a data engineer has transformed how I approach machine learning projects. The integration of Databricks for experimentation, GitLab for CI/CD, and MLflow for experiment tracking has created a robust, reproducible ML pipeline that balances flexibility with governance.

The key takeaway from my experience is that MLOps is not a destination but a continuous journey of improvement. Start small, focus on the highest-value problems, and gradually expand your MLOps capabilities.

For data engineers taking their first steps into MLOps, I recommend:

  1. Start with experiment tracking and model versioning

  2. Focus on data quality and validation

  3. Build modular pipelines that can evolve

  4. Implement monitoring early

  5. Collaborate closely with data scientists

I hope sharing my personal MLOps journey helps you on yours. What challenges are you facing in implementing MLOps in your organization? What tools and practices have you found most valuable? I'd love to hear about your experiences in the comments.


About the Author: A passionate data engineering practitioner with years of experience implementing MLOps solutions across various industries.

Last updated