ML Pipeline Reference Architecture
Overview
This reference architecture demonstrates an end-to-end machine learning pipeline built on Azure, integrating Azure ML, Databricks MLflow, and Synapse Analytics for scalable model training, deployment, and monitoring in production environments.
Business Drivers
- ML Operations (MLOps): Automate model lifecycle from training to deployment
- Feature Engineering: Centralized feature store for consistency
- Model Governance: Track experiments, lineage, and compliance
- Scalability: Train models on petabytes of data
- Real-Time Scoring: Low-latency predictions (<100ms)
- Monitoring: Detect model drift and performance degradation
- Collaboration: Enable data scientists and engineers to work together
Key Capabilities
- Distributed training on Spark clusters
- Automated hyperparameter tuning (AutoML)
- Model versioning and registry
- A/B testing and champion/challenger patterns
- Batch and real-time inference
- Feature store with point-in-time correctness
- Explainable AI and bias detection
- CI/CD for ML models
Architecture Diagram
graph TB
subgraph "Data Sources"
SYNAPSE[Synapse SQL Pool<br/>Historical Data]
ADLS[Data Lake<br/>Raw Features]
STREAMING[Event Hubs<br/>Real-time Events]
EXTERNAL[External APIs<br/>Third-party Data]
end
subgraph "Feature Engineering"
FEATURE_ENG[Databricks<br/>Feature Engineering]
FEATURE_STORE[(Feature Store<br/>Offline + Online)]
VALIDATION[Data Validation<br/>Great Expectations]
end
subgraph "Model Training - Experimentation"
NOTEBOOK[Notebooks<br/>Exploration]
AUTOML[Azure ML AutoML<br/>Model Selection]
TRAINING[Distributed Training<br/>Horovod/Ray]
HPO[Hyperparameter Tuning<br/>Optuna/Hyperopt]
end
subgraph "Experiment Tracking"
MLFLOW[MLflow Tracking<br/>Experiments & Metrics]
MODEL_REGISTRY[MLflow Registry<br/>Versioned Models]
ARTIFACTS[Artifact Store<br/>Model Files]
end
subgraph "Model Evaluation"
EVAL[Model Evaluation<br/>Validation Metrics]
EXPLAIN[Explainability<br/>SHAP/LIME]
FAIRNESS[Bias Detection<br/>Fairlearn]
CHAMPION[Champion/Challenger<br/>A/B Testing]
end
subgraph "Model Deployment"
AML_ENDPOINT[Azure ML Endpoint<br/>Managed Online]
ACI[Azure Container Instances<br/>Dev/Test]
AKS[Azure Kubernetes Service<br/>Production]
BATCH[Batch Inference<br/>Spark/Synapse]
end
subgraph "Serving & APIs"
API_MGMT[API Management<br/>Gateway]
FUNCTION[Azure Functions<br/>Event-driven]
STREAM_ANALYTICS[Stream Analytics<br/>Real-time Scoring]
end
subgraph "Monitoring & Ops"
DATA_DRIFT[Data Drift Monitor<br/>Input Distribution]
MODEL_PERF[Model Performance<br/>Accuracy Metrics]
APP_INSIGHTS[Application Insights<br/>Telemetry]
ALERTS[Alerting<br/>Anomalies]
end
subgraph "MLOps Automation"
DEVOPS[Azure DevOps<br/>CI/CD Pipelines]
GIT[Git/GitHub<br/>Version Control]
TERRAFORM[Terraform<br/>Infrastructure as Code]
end
%% Data Flow
SYNAPSE --> FEATURE_ENG
ADLS --> FEATURE_ENG
STREAMING --> FEATURE_ENG
EXTERNAL --> FEATURE_ENG
FEATURE_ENG --> VALIDATION
VALIDATION --> FEATURE_STORE
FEATURE_STORE --> NOTEBOOK
FEATURE_STORE --> AUTOML
FEATURE_STORE --> TRAINING
NOTEBOOK --> MLFLOW
AUTOML --> MLFLOW
TRAINING --> MLFLOW
HPO --> MLFLOW
MLFLOW --> MODEL_REGISTRY
MODEL_REGISTRY --> ARTIFACTS
ARTIFACTS --> EVAL
EVAL --> EXPLAIN
EVAL --> FAIRNESS
EVAL --> CHAMPION
CHAMPION --> |Deploy| AML_ENDPOINT
CHAMPION --> ACI
CHAMPION --> AKS
CHAMPION --> BATCH
AML_ENDPOINT --> API_MGMT
AKS --> API_MGMT
BATCH --> ADLS
API_MGMT --> FUNCTION
STREAMING --> STREAM_ANALYTICS
STREAM_ANALYTICS --> API_MGMT
AML_ENDPOINT --> DATA_DRIFT
AML_ENDPOINT --> MODEL_PERF
API_MGMT --> APP_INSIGHTS
MODEL_PERF --> ALERTS
GIT --> DEVOPS
DEVOPS --> TRAINING
DEVOPS --> AKS
TERRAFORM --> AKS
style FEATURE_STORE fill:#ffd700
style MODEL_REGISTRY fill:#90ee90
style AML_ENDPOINT fill:#87ceeb
style DATA_DRIFT fill:#ff6b6b
Azure Service Mapping
| Component | Azure Service | Purpose | ML Capability |
| Feature Engineering | Azure Databricks | Large-scale data transformation | Unity Catalog feature store |
| AutoML | Azure Machine Learning | Automated model selection | 100+ algorithms, neural architecture search |
| Distributed Training | Azure ML Compute | GPU/CPU clusters | PyTorch, TensorFlow, scikit-learn |
| Experiment Tracking | MLflow on Databricks | Log metrics, parameters, artifacts | Integration with Databricks |
| Model Registry | MLflow Model Registry | Version control for models | Stage transitions (Dev/Staging/Prod) |
| Managed Endpoints | Azure ML Online Endpoints | Real-time scoring | Auto-scaling, A/B testing |
| Batch Inference | Synapse Spark Pools | Large-scale batch scoring | Integration with data lake |
| API Gateway | Azure API Management | Unified API layer | Rate limiting, OAuth, caching |
| Stream Scoring | Azure Stream Analytics | Real-time predictions | Event-driven inference |
| Model Monitoring | Azure ML Model Monitor | Drift and performance tracking | Data distribution comparison |
| Explainability | Azure ML Responsible AI | Model interpretability | SHAP, LIME, error analysis |
| CI/CD | Azure DevOps / GitHub Actions | Automated deployment | ML pipeline orchestration |
| Infrastructure | Terraform / Bicep | IaC for ML resources | Reproducible environments |
Feature Store Architecture
Feature Engineering Pipeline
# Databricks feature engineering with Delta Live Tables
from databricks import feature_store
from databricks.feature_store import FeatureStoreClient
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import dlt
# Initialize feature store
fs = FeatureStoreClient()
# Define feature table
@dlt.table(
name="customer_features",
comment="Customer behavioral features for churn prediction",
table_properties={
"quality": "gold",
"pipelines.autoOptimize.managed": "true"
}
)
def create_customer_features():
"""
Compute customer features with point-in-time correctness
"""
# Read transactions
transactions = spark.table("silver.transactions")
# Time-based window features
window_30d = Window.partitionBy("customer_id").orderBy("transaction_date") \
.rangeBetween(-2592000, 0) # 30 days in seconds
window_90d = Window.partitionBy("customer_id").orderBy("transaction_date") \
.rangeBetween(-7776000, 0) # 90 days
features = transactions.groupBy("customer_id").agg(
# Recency
datediff(current_date(), max("transaction_date")).alias("days_since_last_purchase"),
# Frequency
count("transaction_id").alias("total_transactions"),
countDistinct(month("transaction_date")).alias("active_months"),
# Monetary
sum("amount").alias("lifetime_value"),
avg("amount").alias("avg_transaction_amount"),
max("amount").alias("max_transaction_amount"),
# Trends
sum(when(col("transaction_date") >= date_sub(current_date(), 30), col("amount"))
.otherwise(0)).alias("amount_last_30d"),
count(when(col("transaction_date") >= date_sub(current_date(), 30), 1)
.otherwise(None)).alias("transactions_last_30d"),
sum(when(col("transaction_date") >= date_sub(current_date(), 90), col("amount"))
.otherwise(0)).alias("amount_last_90d"),
# Product diversity
countDistinct("product_category").alias("unique_categories_purchased"),
# Channel preferences
sum(when(col("channel") == "online", 1).otherwise(0)).alias("online_transactions"),
sum(when(col("channel") == "mobile", 1).otherwise(0)).alias("mobile_transactions"),
sum(when(col("channel") == "store", 1).otherwise(0)).alias("store_transactions")
)
# Calculate derived features
features = features.withColumn(
"avg_amount_per_month",
col("lifetime_value") / greatest(col("active_months"), lit(1))
).withColumn(
"amount_trend",
(col("amount_last_30d") - col("amount_last_90d") / 3) / greatest(col("amount_last_90d") / 3, lit(1))
).withColumn(
"online_preference_ratio",
col("online_transactions") / greatest(col("total_transactions"), lit(1))
)
# Add timestamp for point-in-time lookup
features = features.withColumn("feature_timestamp", current_timestamp())
return features
# Create feature table
fs.create_table(
name="ml_features.customer_features",
primary_keys=["customer_id"],
timestamp_keys=["feature_timestamp"],
df=create_customer_features(),
description="Customer behavioral features for predictive modeling"
)
# Product features
@dlt.table(name="product_features")
def create_product_features():
"""Product performance and characteristics"""
products = spark.table("silver.products")
sales = spark.table("silver.transaction_lines")
product_stats = sales.groupBy("product_id").agg(
count("*").alias("total_sales_count"),
sum("quantity").alias("total_units_sold"),
sum("line_total").alias("total_revenue"),
avg("unit_price").alias("avg_selling_price"),
countDistinct("transaction_id").alias("unique_transactions"),
approx_percentile("unit_price", 0.5).alias("median_price")
)
product_features = products.join(product_stats, "product_id", "left").select(
"product_id",
"category",
"brand",
"list_price",
"cost",
col("list_price") - col("cost").alias("margin"),
(col("list_price") - col("cost")) / col("list_price").alias("margin_pct"),
coalesce("total_sales_count", lit(0)).alias("sales_count"),
coalesce("total_revenue", lit(0)).alias("revenue"),
coalesce("avg_selling_price", col("list_price")).alias("avg_price"),
current_timestamp().alias("feature_timestamp")
)
return product_features
# Online feature store (for low-latency serving)
def publish_to_online_store():
"""Sync features to online store (Cosmos DB)"""
from azure.cosmos import CosmosClient
# Read latest features
customer_features = spark.table("ml_features.customer_features")
# Write to Cosmos DB for low-latency serving
customer_features.write \
.format("cosmos.oltp") \
.option("spark.cosmos.accountEndpoint", cosmos_endpoint) \
.option("spark.cosmos.accountKey", cosmos_key) \
.option("spark.cosmos.database", "features") \
.option("spark.cosmos.container", "customer_features") \
.mode("overwrite") \
.save()
Model Training Pipeline
Distributed Training with Horovod
# Distributed deep learning training
import horovod.torch as hvd
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from azureml.core import Run
import mlflow
# Initialize Horovod
hvd.init()
torch.cuda.set_device(hvd.local_rank())
# Get run context
run = Run.get_context()
class CustomerChurnModel(nn.Module):
"""Neural network for churn prediction"""
def __init__(self, input_dim, hidden_dims=[256, 128, 64]):
super().__init__()
layers = []
prev_dim = input_dim
for hidden_dim in hidden_dims:
layers.extend([
nn.Linear(prev_dim, hidden_dim),
nn.BatchNorm1d(hidden_dim),
nn.ReLU(),
nn.Dropout(0.3)
])
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
layers.append(nn.Sigmoid())
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
def train_distributed():
"""Train model with Horovod"""
# Load features from feature store
fs = FeatureStoreClient()
training_df = fs.read_table(name="ml_features.customer_features")
# Prepare data
feature_columns = [
"days_since_last_purchase", "total_transactions", "lifetime_value",
"avg_transaction_amount", "amount_last_30d", "transactions_last_30d",
"unique_categories_purchased", "online_preference_ratio"
]
X = training_df.select(feature_columns).toPandas().values
y = training_df.select("churned").toPandas().values
# Create data loaders
dataset = torch.utils.data.TensorDataset(
torch.FloatTensor(X),
torch.FloatTensor(y)
)
# Partition dataset among workers
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=hvd.size(),
rank=hvd.rank()
)
dataloader = DataLoader(
dataset,
batch_size=256,
sampler=sampler
)
# Initialize model
model = CustomerChurnModel(input_dim=len(feature_columns))
model = model.cuda()
# Horovod: scale learning rate by number of workers
optimizer = torch.optim.Adam(model.parameters(), lr=0.001 * hvd.size())
# Horovod: wrap optimizer with DistributedOptimizer
optimizer = hvd.DistributedOptimizer(
optimizer,
named_parameters=model.named_parameters()
)
# Horovod: broadcast parameters & optimizer state
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Training loop
criterion = nn.BCELoss()
for epoch in range(100):
model.train()
epoch_loss = 0
for batch_idx, (data, target) in enumerate(dataloader):
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
# Log metrics (only rank 0)
if hvd.rank() == 0:
avg_loss = epoch_loss / len(dataloader)
mlflow.log_metric("train_loss", avg_loss, step=epoch)
run.log("train_loss", avg_loss)
print(f"Epoch {epoch}: Loss = {avg_loss:.4f}")
# Save model (only rank 0)
if hvd.rank() == 0:
mlflow.pytorch.log_model(model, "model")
run.upload_file("outputs/model.pth", "model.pth")
return model
if __name__ == "__main__":
# Start MLflow run
mlflow.start_run()
# Train model
model = train_distributed()
mlflow.end_run()
AutoML for Model Selection
# Automated machine learning with Azure ML
from azureml.core import Workspace, Dataset, Experiment
from azureml.train.automl import AutoMLConfig
from azureml.automl.core.featurization import FeaturizationConfig
from databricks.feature_store import FeatureStoreClient
# Load training data from feature store
fs = FeatureStoreClient()
# Create training dataset
training_set = fs.create_training_set(
df=spark.table("silver.customers"),
feature_lookups=[
{
"table_name": "ml_features.customer_features",
"lookup_key": "customer_id"
},
{
"table_name": "ml_features.product_features",
"lookup_key": "product_id"
}
],
label="churned",
exclude_columns=["customer_id"]
)
training_df = training_set.load_df()
# Convert to Pandas for Azure ML
training_pd = training_df.toPandas()
# Register dataset
ws = Workspace.from_config()
dataset = Dataset.Tabular.register_pandas_dataframe(
training_pd,
target=(ws.get_default_datastore(), "training_data"),
name="churn_training_data"
)
# Configure AutoML
automl_config = AutoMLConfig(
task='classification',
primary_metric='AUC_weighted',
training_data=dataset,
label_column_name='churned',
compute_target='ml-compute-cluster',
n_cross_validations=5,
enable_early_stopping=True,
experiment_timeout_hours=3,
max_concurrent_iterations=10,
max_cores_per_iteration=-1,
enable_onnx_compatible_models=True,
model_explainability=True,
# Feature engineering
featurization=FeaturizationConfig(
dataset_language='en',
blocked_transformers=['LabelEncoder'],
drop_columns=['customer_id'],
imputation_strategies={
'days_since_last_purchase': 'mean',
'amount_last_30d': 'zero'
}
),
# Ensemble settings
enable_voting_ensemble=True,
enable_stack_ensemble=True,
stack_meta_learner_type='LogisticRegressionCV',
# Model selection
allowed_models=[
'XGBoostClassifier',
'LightGBM',
'RandomForest',
'ExtremeRandomTrees',
'GradientBoosting',
'LogisticRegression'
]
)
# Run AutoML experiment
experiment = Experiment(ws, 'churn-prediction-automl')
run = experiment.submit(automl_config, show_output=True)
# Get best model
best_run, fitted_model = run.get_output()
# Register model
model = best_run.register_model(
model_name='churn-prediction-model',
model_path='outputs/model.pkl',
model_framework='ScikitLearn',
tags={
'algorithm': fitted_model.__class__.__name__,
'auc': best_run.get_metrics()['AUC_weighted'],
'feature_store_version': 'v1'
}
)
# Log to MLflow
import mlflow
from mlflow.tracking import MlflowClient
mlflow.set_tracking_uri(databricks_workspace_url)
mlflow.set_experiment("/Shared/churn-prediction")
with mlflow.start_run():
mlflow.log_param("framework", "AutoML")
mlflow.log_param("primary_metric", "AUC_weighted")
mlflow.log_metrics(best_run.get_metrics())
mlflow.sklearn.log_model(fitted_model, "model")
# Register in MLflow Model Registry
mlflow.register_model(
f"runs:/{mlflow.active_run().info.run_id}/model",
"churn-prediction"
)
Model Evaluation & Explainability
Comprehensive Model Evaluation
# Model evaluation with multiple metrics
from sklearn.metrics import *
import matplotlib.pyplot as plt
import seaborn as sns
from azureml.interpret import ExplanationClient
import shap
class ModelEvaluator:
"""Comprehensive model evaluation"""
def __init__(self, model, X_test, y_test):
self.model = model
self.X_test = X_test
self.y_test = y_test
self.y_pred = model.predict(X_test)
self.y_pred_proba = model.predict_proba(X_test)[:, 1]
def evaluate_classification_metrics(self):
"""Calculate classification metrics"""
metrics = {
"accuracy": accuracy_score(self.y_test, self.y_pred),
"precision": precision_score(self.y_test, self.y_pred),
"recall": recall_score(self.y_test, self.y_pred),
"f1_score": f1_score(self.y_test, self.y_pred),
"auc_roc": roc_auc_score(self.y_test, self.y_pred_proba),
"avg_precision": average_precision_score(self.y_test, self.y_pred_proba),
"log_loss": log_loss(self.y_test, self.y_pred_proba)
}
# Log to MLflow
mlflow.log_metrics(metrics)
return metrics
def plot_confusion_matrix(self):
"""Visualize confusion matrix"""
cm = confusion_matrix(self.y_test, self.y_pred)
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
mlflow.log_figure(plt.gcf(), "confusion_matrix.png")
plt.close()
def plot_roc_curve(self):
"""Plot ROC curve"""
fpr, tpr, thresholds = roc_curve(self.y_test, self.y_pred_proba)
auc_score = auc(fpr, tpr)
plt.figure(figsize=(8, 6))
plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {auc_score:.3f})')
plt.plot([0, 1], [0, 1], 'k--', label='Random')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend()
mlflow.log_figure(plt.gcf(), "roc_curve.png")
plt.close()
def plot_precision_recall_curve(self):
"""Plot precision-recall curve"""
precision, recall, thresholds = precision_recall_curve(
self.y_test,
self.y_pred_proba
)
plt.figure(figsize=(8, 6))
plt.plot(recall, precision)
plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curve')
mlflow.log_figure(plt.gcf(), "precision_recall_curve.png")
plt.close()
def explain_model_shap(self, feature_names):
"""Generate SHAP explanations"""
# Create SHAP explainer
explainer = shap.TreeExplainer(self.model)
shap_values = explainer.shap_values(self.X_test)
# Summary plot
plt.figure(figsize=(10, 8))
shap.summary_plot(shap_values, self.X_test, feature_names=feature_names, show=False)
mlflow.log_figure(plt.gcf(), "shap_summary.png")
plt.close()
# Feature importance
feature_importance = pd.DataFrame({
'feature': feature_names,
'importance': np.abs(shap_values).mean(axis=0)
}).sort_values('importance', ascending=False)
plt.figure(figsize=(10, 6))
sns.barplot(data=feature_importance.head(20), x='importance', y='feature')
plt.title('Top 20 Feature Importances (SHAP)')
mlflow.log_figure(plt.gcf(), "feature_importance.png")
plt.close()
return shap_values
def check_fairness(self, sensitive_feature):
"""Evaluate model fairness"""
from fairlearn.metrics import MetricFrame, selection_rate
# Calculate metrics by sensitive group
metric_frame = MetricFrame(
metrics={
'accuracy': accuracy_score,
'precision': precision_score,
'recall': recall_score,
'selection_rate': selection_rate
},
y_true=self.y_test,
y_pred=self.y_pred,
sensitive_features=sensitive_feature
)
print("Fairness Metrics by Group:")
print(metric_frame.by_group)
# Log fairness metrics
mlflow.log_dict(metric_frame.by_group.to_dict(), "fairness_metrics.json")
return metric_frame
# Usage
evaluator = ModelEvaluator(model, X_test, y_test)
metrics = evaluator.evaluate_classification_metrics()
evaluator.plot_confusion_matrix()
evaluator.plot_roc_curve()
evaluator.plot_precision_recall_curve()
shap_values = evaluator.explain_model_shap(feature_names)
fairness_metrics = evaluator.check_fairness(sensitive_feature)
Model Deployment
Real-Time Endpoint Deployment
# Deploy model to Azure ML managed endpoint
from azureml.core import Workspace, Model, Environment
from azureml.core.model import InferenceConfig
from azureml.core.webservice import AciWebservice, AksWebservice
from azureml.core.compute import AksCompute, ComputeTarget
# Get workspace
ws = Workspace.from_config()
# Get registered model
model = Model(ws, name='churn-prediction-model')
# Create inference environment
env = Environment.from_conda_specification(
name='churn-inference-env',
file_path='conda_env.yml'
)
# Create scoring script
scoring_script = """
import json
import numpy as np
import pandas as pd
from azureml.core.model import Model
import joblib
def init():
global model
model_path = Model.get_model_path('churn-prediction-model')
model = joblib.load(model_path)
def run(raw_data):
try:
data = json.loads(raw_data)
df = pd.DataFrame(data['data'])
# Get predictions
predictions = model.predict_proba(df)[:, 1]
# Get feature importance (if supported)
if hasattr(model, 'feature_importances_'):
importance = model.feature_importances_.tolist()
else:
importance = None
return {
'predictions': predictions.tolist(),
'feature_importance': importance
}
except Exception as e:
return {'error': str(e)}
"""
with open('score.py', 'w') as f:
f.write(scoring_script)
# Create inference config
inference_config = InferenceConfig(
entry_script='score.py',
environment=env
)
# Deploy to AKS for production
aks_name = 'ml-inference-cluster'
aks_target = ComputeTarget(ws, aks_name)
# Production deployment config
deployment_config = AksWebservice.deploy_configuration(
cpu_cores=2,
memory_gb=4,
autoscale_enabled=True,
autoscale_min_replicas=2,
autoscale_max_replicas=10,
autoscale_target_utilization=70,
auth_enabled=True,
enable_app_insights=True,
scoring_timeout_ms=5000,
collect_model_data=True # Enable model monitoring
)
# Deploy service
service = Model.deploy(
workspace=ws,
name='churn-prediction-service',
models=[model],
inference_config=inference_config,
deployment_config=deployment_config,
deployment_target=aks_target,
overwrite=True
)
service.wait_for_deployment(show_output=True)
print(f"Scoring URI: {service.scoring_uri}")
print(f"Swagger URI: {service.swagger_uri}")
Batch Inference Pipeline
# Batch scoring with Synapse Spark
from pyspark.sql.functions import *
import mlflow
import mlflow.pyfunc
# Load model from MLflow
model_uri = "models:/churn-prediction/Production"
loaded_model = mlflow.pyfunc.spark_udf(
spark,
model_uri=model_uri,
result_type="double"
)
# Load features for scoring
fs = FeatureStoreClient()
customers_to_score = spark.table("silver.customers") \
.where(col("churn_score_date").isNull())
# Get features from feature store
scoring_df = fs.create_training_set(
df=customers_to_score,
feature_lookups=[
{
"table_name": "ml_features.customer_features",
"lookup_key": "customer_id"
}
],
exclude_columns=["customer_id", "churn_score_date"]
).load_df()
# Prepare feature vector
feature_cols = [
"days_since_last_purchase", "total_transactions", "lifetime_value",
"avg_transaction_amount", "amount_last_30d", "transactions_last_30d",
"unique_categories_purchased", "online_preference_ratio"
]
# Apply model
predictions = scoring_df.withColumn(
"churn_probability",
loaded_model(*feature_cols)
).withColumn(
"churn_risk_category",
when(col("churn_probability") >= 0.75, "High")
.when(col("churn_probability") >= 0.50, "Medium")
.when(col("churn_probability") >= 0.25, "Low")
.otherwise("Very Low")
).withColumn(
"score_date",
current_date()
)
# Write predictions
(predictions
.select("customer_id", "churn_probability", "churn_risk_category", "score_date")
.write
.format("delta")
.mode("append")
.partitionBy("score_date")
.saveAsTable("gold.customer_churn_scores")
)
# Update feature store
spark.sql("""
UPDATE silver.customers c
SET churn_score_date = CURRENT_DATE
WHERE EXISTS (
SELECT 1 FROM gold.customer_churn_scores s
WHERE s.customer_id = c.customer_id
AND s.score_date = CURRENT_DATE
)
""")
Model Monitoring
Data Drift Detection
# Monitor data drift using Azure ML
from azureml.datadrift import DataDriftDetector
from azureml.core import Workspace, Dataset
ws = Workspace.from_config()
# Create baseline dataset (training data)
baseline_dataset = Dataset.get_by_name(ws, 'churn_training_data')
# Create target dataset (recent production data)
target_dataset = Dataset.get_by_name(ws, 'churn_scoring_data_recent')
# Create data drift detector
monitor = DataDriftDetector.create_from_datasets(
workspace=ws,
name='churn_model_datadrift',
baseline_data_set=baseline_dataset,
target_data_set=target_dataset,
compute_target='ml-compute-cluster',
frequency='Week',
feature_list=[
'days_since_last_purchase', 'total_transactions', 'lifetime_value',
'avg_transaction_amount', 'amount_last_30d', 'transactions_last_30d'
],
drift_threshold=0.3,
latency=24 # Hours
)
# Run drift detection
drift_run = monitor.run(
target_date=datetime.now(),
services=['AmlCompute']
)
# Get drift results
drift_metrics = drift_run.get_metrics()
print(f"Overall drift score: {drift_metrics['drift_coefficient']}")
print(f"Drifted features:")
for feature, drift_score in drift_metrics['feature_drift_scores'].items():
if drift_score > 0.3:
print(f" {feature}: {drift_score:.3f}")
# Track model performance over time
from azureml.core import Run
from datetime import datetime, timedelta
import pandas as pd
class ModelPerformanceMonitor:
"""Monitor deployed model performance"""
def __init__(self, workspace, model_name):
self.ws = workspace
self.model_name = model_name
def collect_prediction_data(self, days=7):
"""Collect predictions and actuals from Application Insights"""
from azure.monitor.query import LogsQueryClient
logs_client = LogsQueryClient(credential=DefaultAzureCredential())
# Query predictions
predictions_query = f"""
requests
| where timestamp > ago({days}d)
| where name == "{self.model_name}"
| extend prediction = toreal(customDimensions["prediction"])
| extend customer_id = tostring(customDimensions["customer_id"])
| project timestamp, customer_id, prediction
"""
predictions = logs_client.query_workspace(
workspace_id=app_insights_workspace_id,
query=predictions_query,
timespan=timedelta(days=days)
)
# Get actuals (from feedback/labels)
actuals_df = spark.sql(f"""
SELECT customer_id, churned as actual, churn_date
FROM silver.customers
WHERE churn_date >= DATEADD(day, -{days}, CURRENT_DATE)
""").toPandas()
# Join predictions with actuals
df = predictions.merge(actuals_df, on='customer_id', how='inner')
return df
def calculate_performance_metrics(self, df):
"""Calculate ongoing performance metrics"""
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score
y_true = df['actual']
y_pred = (df['prediction'] >= 0.5).astype(int)
y_pred_proba = df['prediction']
metrics = {
"timestamp": datetime.now(),
"sample_size": len(df),
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, zero_division=0),
"recall": recall_score(y_true, y_pred, zero_division=0),
"auc_roc": roc_auc_score(y_true, y_pred_proba) if len(y_true.unique()) > 1 else None
}
return metrics
def detect_performance_degradation(self, current_metrics, baseline_metrics):
"""Alert if performance degrades"""
degradation_threshold = 0.05 # 5% drop
alerts = []
for metric, current_value in current_metrics.items():
if metric in baseline_metrics and current_value is not None:
baseline_value = baseline_metrics[metric]
if isinstance(baseline_value, (int, float)) and isinstance(current_value, (int, float)):
drop = baseline_value - current_value
if drop > degradation_threshold:
alerts.append({
"metric": metric,
"baseline": baseline_value,
"current": current_value,
"drop": drop,
"severity": "High" if drop > 0.10 else "Medium"
})
return alerts
def trigger_retraining(self):
"""Trigger model retraining pipeline"""
from azure.devops.connection import Connection
from msrest.authentication import BasicAuthentication
# Trigger Azure DevOps pipeline
credentials = BasicAuthentication('', azure_devops_pat)
connection = Connection(base_url=org_url, creds=credentials)
build_client = connection.clients.get_build_client()
build = build_client.queue_build(
build={
"definition": {"id": retrain_pipeline_id},
"parameters": json.dumps({
"reason": "performance_degradation",
"trigger_time": datetime.now().isoformat()
})
},
project=project_name
)
print(f"Retraining pipeline triggered: Build ID {build.id}")
return build.id
# Usage
monitor = ModelPerformanceMonitor(ws, 'churn-prediction-service')
data = monitor.collect_prediction_data(days=7)
current_metrics = monitor.calculate_performance_metrics(data)
# Compare with baseline
baseline_metrics = {
"accuracy": 0.85,
"precision": 0.82,
"recall": 0.78,
"auc_roc": 0.89
}
alerts = monitor.detect_performance_degradation(current_metrics, baseline_metrics)
if alerts:
print("Performance degradation detected:")
for alert in alerts:
print(f" {alert['metric']}: {alert['baseline']:.3f} -> {alert['current']:.3f} ({alert['drop']:.3f} drop)")
# Trigger retraining
monitor.trigger_retraining()
MLOps CI/CD Pipeline
Azure DevOps YAML Pipeline
# azure-pipelines-ml.yml
trigger:
branches:
include:
- main
- develop
paths:
include:
- src/ml/**
- tests/ml/**
variables:
- group: ml-pipeline-variables
- name: python.version
value: '3.9'
stages:
- stage: DataValidation
displayName: 'Validate Training Data'
jobs:
- job: ValidateData
pool:
vmImage: 'ubuntu-latest'
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '$(python.version)'
- script: |
pip install great-expectations databricks-cli
great_expectations checkpoint run training_data_checkpoint
displayName: 'Run Data Quality Checks'
- stage: ModelTraining
displayName: 'Train ML Model'
dependsOn: DataValidation
jobs:
- job: TrainModel
pool:
vmImage: 'ubuntu-latest'
steps:
- task: UsePythonVersion@0
inputs:
versionSpec: '$(python.version)'
- script: |
pip install azureml-sdk databricks-cli mlflow
python src/ml/train.py \
--experiment-name churn-prediction \
--compute-target ml-compute-cluster
displayName: 'Train Model'
env:
AZURE_ML_WORKSPACE: $(ML_WORKSPACE_NAME)
AZURE_SUBSCRIPTION_ID: $(AZURE_SUBSCRIPTION_ID)
- task: PublishTestResults@2
inputs:
testResultsFiles: '**/test-results.xml'
testRunTitle: 'Model Training Tests'
- stage: ModelEvaluation
displayName: 'Evaluate Model'
dependsOn: ModelTraining
jobs:
- job: EvaluateModel
pool:
vmImage: 'ubuntu-latest'
steps:
- script: |
python src/ml/evaluate.py \
--model-name churn-prediction-model \
--baseline-auc 0.85
displayName: 'Evaluate Model Performance'
- script: |
python src/ml/explain.py \
--model-name churn-prediction-model
displayName: 'Generate Model Explanations'
- task: PublishBuildArtifacts@1
inputs:
pathToPublish: 'outputs/evaluation'
artifactName: 'model-evaluation'
- stage: DeployStaging
displayName: 'Deploy to Staging'
dependsOn: ModelEvaluation
condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/develop'))
jobs:
- deployment: DeployStaging
environment: 'ml-staging'
strategy:
runOnce:
deploy:
steps:
- script: |
python src/ml/deploy.py \
--model-name churn-prediction-model \
--environment staging \
--compute-type aci
displayName: 'Deploy to Staging'
- stage: IntegrationTests
displayName: 'Integration Tests'
dependsOn: DeployStaging
jobs:
- job: TestEndpoint
pool:
vmImage: 'ubuntu-latest'
steps:
- script: |
python tests/integration/test_endpoint.py \
--endpoint-url $(STAGING_ENDPOINT_URL)
displayName: 'Test Staging Endpoint'
- stage: DeployProduction
displayName: 'Deploy to Production'
dependsOn: IntegrationTests
condition: and(succeeded(), eq(variables['Build.SourceBranch'], 'refs/heads/main'))
jobs:
- deployment: DeployProduction
environment: 'ml-production'
strategy:
runOnce:
deploy:
steps:
- script: |
python src/ml/deploy.py \
--model-name churn-prediction-model \
--environment production \
--compute-type aks \
--min-replicas 2 \
--max-replicas 10
displayName: 'Deploy to Production AKS'
- script: |
python src/ml/enable_monitoring.py \
--service-name churn-prediction-service \
--enable-data-drift true \
--enable-model-perf true
displayName: 'Enable Monitoring'
Cost Optimization
ML Infrastructure Costs
| Component | Configuration | Monthly Cost | Usage Pattern |
| Databricks (Training) | 8-node GPU cluster (NC6s_v3) | $12,000 | 8 hours/day, 70% spot instances |
| Azure ML Compute | 10-node CPU cluster (D13_v2) | $3,600 | Auto-scaling, training only |
| AKS (Inference) | 3 nodes (D4s_v3), auto-scale to 10 | $900 | Production inference cluster |
| MLflow Tracking | Databricks managed | Included | Part of Databricks workspace |
| Feature Store | Delta Lake on ADLS Gen2 | $500 | 5TB storage, frequent access |
| Model Registry | Cosmos DB (1K RU/s) | $60 | Metadata storage |
| Monitoring | Application Insights | $250 | 50GB/month telemetry |
| Total | | ~$17,000/month | |
Cost Optimization Strategies
# 1. Use spot instances for training
databricks_cluster_config = {
"cluster_name": "ml-training-cluster",
"spark_version": "11.3.x-gpu-ml-scala2.12",
"node_type_id": "Standard_NC6s_v3",
"num_workers": 8,
"azure_attributes": {
"availability": "SPOT_WITH_FALLBACK_AZURE",
"spot_bid_max_price": -1 # Pay up to on-demand price
},
"autotermination_minutes": 30 # Auto-terminate when idle
}
# 2. Use low-priority VMs for Azure ML compute
from azureml.core.compute import AmlCompute, ComputeTarget
compute_config = AmlCompute.provisioning_configuration(
vm_size='STANDARD_D13_V2',
min_nodes=0, # Scale to zero when idle
max_nodes=10,
vm_priority='lowpriority', # 80% discount
idle_seconds_before_scaledown=300
)
# 3. Cache intermediate results
@dlt.table(
name="cached_features",
table_properties={
"pipelines.cache.enabled": "true"
}
)
def expensive_features():
# Expensive feature engineering
pass
# 4. Use serverless SQL for ad-hoc queries
# Instead of dedicated pool
Internal Documentation
External References
Next Steps
- Review Retail Analytics Architecture
- Explore Financial Services Architecture
- Learn about Healthcare Analytics
- Implement Enterprise Data Warehouse