# pipelines/training_pipeline.py
from kfp import dsl
from kfp.dsl import Output, Model, Dataset, Metrics
@dsl.component(
base_image='python:3.12-slim',
packages_to_install=[
'pandas==2.1.0',
'scikit-learn==1.3.0',
'joblib==1.3.2'
]
)
def load_and_validate_data(
data_url: str,
output_data: Output[Dataset]
):
"""Load and validate data."""
import pandas as pd
from pathlib import Path
# Download data
df = pd.read_csv(data_url)
# Validate
required_columns = ['text', 'sentiment']
assert all(col in df.columns for col in required_columns)
# Clean
df = df.drop_duplicates(subset=['text'])
df = df[df['text'].str.strip() != '']
# Save
output_path = Path(output_data.path)
df.to_csv(output_path, index=False)
print(f"Processed {len(df)} rows")
@dsl.component(
base_image='python:3.12-slim',
packages_to_install=['pandas==2.1.0', 'scikit-learn==1.3.0', 'joblib==1.3.2']
)
def train_model(
input_data: dsl.Input[Dataset],
model_output: Output[Model],
metrics: Output[Metrics],
C: float = 1.0
):
"""Train sentiment model."""
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
import joblib
from pathlib import Path
# Load data
df = pd.read_csv(input_data.path)
# Features
vectorizer = TfidfVectorizer(max_features=10000, ngram_range=(1, 2))
X = vectorizer.fit_transform(df['text'])
y = df['sentiment']
# Split
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
# Train
model = LogisticRegression(C=C, max_iter=1000, random_state=42)
model.fit(X_train, y_train)
# Evaluate
train_acc = accuracy_score(y_train, model.predict(X_train))
test_acc = accuracy_score(y_test, model.predict(X_test))
print(f"Train Accuracy: {train_acc:.3f}")
print(f"Test Accuracy: {test_acc:.3f}")
# Log metrics
metrics.log_metric('train_accuracy', train_acc)
metrics.log_metric('test_accuracy', test_acc)
metrics.log_metric('n_samples', len(df))
# Save model and vectorizer
model_data = {
'model': model,
'vectorizer': vectorizer
}
joblib.dump(model_data, Path(model_output.path))
# Metadata
model_output.metadata['accuracy'] = test_acc
model_output.metadata['framework'] = 'sklearn'
@dsl.component(
base_image='python:3.12-slim',
packages_to_install=['scikit-learn==1.3.0', 'joblib==1.3.2']
)
def validate_model(
model: dsl.Input[Model],
accuracy_threshold: float = 0.85
) -> str:
"""Validate model meets quality threshold."""
accuracy = model.metadata.get('accuracy', 0)
if accuracy < accuracy_threshold:
raise ValueError(f"Model accuracy {accuracy:.3f} below threshold {accuracy_threshold}")
print(f"Model validation passed: {accuracy:.3f}")
return "PASSED"
@dsl.pipeline(
name='sentiment-training-pipeline',
description='End-to-end sentiment model training'
)
def training_pipeline(
data_url: str = 's3://my-bucket/reviews.csv',
model_C: float = 1.0,
accuracy_threshold: float = 0.85
):
"""Complete training pipeline."""
# Load and validate data
data_task = load_and_validate_data(data_url=data_url)
# Train model
train_task = train_model(
input_data=data_task.outputs['output_data'],
C=model_C
)
# Validate model quality
validate_task = validate_model(
model=train_task.outputs['model_output'],
accuracy_threshold=accuracy_threshold
)
return train_task.outputs['model_output']
if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(
pipeline_func=training_pipeline,
package_path='sentiment_pipeline.yaml'
)
print("Pipeline compiled successfully")