MLflow Best Practices for Computer Vision (Deep Learning)

Experiment Tracking
This guide demonstrates how to achieve operational excellence using MLFlow in Production.
Compatibility: MLflow ≥ 2.4 · PyTorch ≥ 2.0 · Python ≥ 3.10
Structure Experiments Hierarchically
Organise experiments to mirror your project structure. Avoid dumping all runs into a single experiment.
import mlflow
# One experiment per model family or research objective
mlflow.set_experiment("resnet-backbone-ablations")
mlflow.set_experiment("yolov8-object-detection-v2")Use Nested Runs for Multi-Stage Pipelines
CV pipelines typically consist of preprocessing → training → evaluation → post-processing. Model each stage as a child run.
with mlflow.start_run(run_name="full-pipeline") as parent_run:
with mlflow.start_run(run_name="data-augmentation", nested=True):
mlflow.log_params({"augment_strategy": "mosaic", "img_size": 640})
with mlflow.start_run(run_name="training", nested=True):
mlflow.log_params({"epochs": 100, "optimizer": "AdamW"})
with mlflow.start_run(run_name="evaluation", nested=True):
mlflow.log_metrics({"mAP50": 0.87, "mAP50-95": 0.63})Tag Runs Consistently
Tags are queryable — use them as first-class metadata for filtering and governance.
import os
mlflow.set_tags({
"task": "object-detection",
"backbone": "ResNet50",
"dataset": "COCO-2017",
"env": "production",
"team": "cv-platform",
"git_commit": os.getenv("GIT_COMMIT_SHA", "unknown"),
})Recommended Tag Schema:
| Tag Key | Example Value | Purpose |
|---|---|---|
task |
segmentation |
CV task type |
backbone |
EfficientNetV2-L |
Architecture family |
dataset |
COCO-2017 |
Dataset identifier |
env |
staging / production |
Deployment stage |
git_commit |
a3f8c12 |
Code reproducibility |
hardware |
A100-80GB |
Training hardware |
Model Logging and Registration
Log Models with Signatures and Input Examples
Always include a model signature and a representative input example. This is critical for serving CV models correctly — it prevents type/shape mismatches at inference time.
import mlflow.pytorch
import torch
import numpy as np
# Define signature from a real sample
sample_input = np.random.rand(1, 3, 224, 224).astype(np.float32)
sample_output = model(torch.tensor(sample_input)).detach().numpy()
signature = mlflow.models.infer_signature(sample_input, sample_output)
mlflow.pytorch.log_model(
pytorch_model=model,
artifact_path="model",
signature=signature,
input_example=sample_input,
registered_model_name="cv-resnet50-classifier",
)Use the Model Registry with Stage Transitions
The Model Registry enforces promotion gates: None → Staging → Production → Archived.
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition a validated model to production
client.transition_model_version_stage(
name="cv-resnet50-classifier",
version=3,
stage="Production",
archive_existing_versions=True, # Auto-archive old production version
)Always archive old production versions. Never leave two versions in Production simultaneously unless you are intentionally running A/B traffic splits.
Custom PyFuncs for Pre/Post-Processing
Wrap preprocessing (resize, normalise, augment) and postprocessing (NMS, softmax, decode boxes) into the model artifact itself using mlflow.pyfunc. This avoids serving-time pipeline drift.
class CVModelWrapper(mlflow.pyfunc.PythonModel):
def load_context(self, context):
import torch
self.model = torch.load(context.artifacts["model_path"])
self.model.eval()
def predict(self, context, model_input):
import torch, numpy as np
# Preprocess
tensor = torch.tensor(model_input).float() / 255.0
tensor = (tensor - 0.485) / 0.229 # ImageNet normalisation
# Inference
with torch.no_grad():
logits = self.model(tensor)
# Postprocess
return logits.softmax(dim=-1).numpy()
mlflow.pyfunc.log_model(
artifact_path="cv-model-wrapped",
python_model=CVModelWrapper(),
artifacts={"model_path": "path/to/model.pt"},
)Artifact Management
What to Log as Artifacts (CV-Specific)
| Artifact | When to Log | Why |
|---|---|---|
| Sample predictions (images) | End of each epoch | Visual debugging of model behaviour |
| Confusion matrix (as PNG) | Post-evaluation | Class-level error analysis |
| PR / ROC curves | Post-evaluation | Threshold selection guidance |
| Augmentation samples | Pre-training | Verify augmentation pipeline |
| Class activation maps (Grad-CAM) | Debugging runs | Explainability |
| ONNX / TorchScript exports | Release candidates | Deployment-ready formats |
| Training config YAML | Every run | Full reproducibility |
import matplotlib.pyplot as plt
# Log a batch of predictions as an image grid
fig, axes = plt.subplots(2, 4, figsize=(16, 8))
for i, ax in enumerate(axes.flat):
ax.imshow(pred_images[i])
ax.set_title(f"Pred: {pred_labels[i]}")
plt.tight_layout()
plt.savefig("/tmp/predictions_epoch_10.png")
mlflow.log_artifact("/tmp/predictions_epoch_10.png", artifact_path="visualisations")Log Config Files, Not Just Parameters
Log the full YAML/JSON config alongside individual parameters. This is your single source of truth for reproducibility.
mlflow.log_artifact("configs/train_config.yaml", artifact_path="configs")Logging the config file ensures you can fully reconstruct the training environment even if individual log_params calls are incomplete or inconsistent.
Dataset Versioning & Lineage
Use mlflow.log_input() (MLflow ≥ 2.4)
Track exact dataset versions to make runs reproducible and auditable.
dataset = mlflow.data.from_numpy(
features=X_train,
targets=y_train,
name="coco-detection-train",
source="s3://your-bucket/datasets/coco/2017/train/",
)
with mlflow.start_run():
mlflow.log_input(dataset, context="training")Record Dataset Hashes
For local or cached datasets, compute and log a SHA-256 checksum:
import hashlib
def dataset_hash(path: str) -> str:
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(65536), b""):
h.update(chunk)
return h.hexdigest()
mlflow.log_param("train_dataset_sha256", dataset_hash("/data/train.tar.gz"))Hyperparameter Management
Log All Hyperparameters — Including Implicit Ones
Don’t log only the obvious params. CV training has many implicit settings that affect results.
mlflow.log_params({
# Optimiser
"optimizer": "AdamW",
"lr": 1e-4,
"weight_decay": 1e-2,
"lr_scheduler": "cosine_annealing",
"warmup_epochs": 5,
# Data
"img_size": 640,
"batch_size": 32,
"num_workers": 8,
"augment_mosaic": True,
"augment_mixup": 0.1,
"augment_hsv_h": 0.015,
# Architecture
"backbone": "EfficientNetV2-L",
"pretrained": True,
"freeze_backbone_epochs": 10,
"dropout": 0.2,
# Training
"epochs": 200,
"early_stopping_patience": 15,
"amp": True, # Automatic mixed precision
"gradient_clip": 10.0,
"seed": 42,
})Integrate with Optuna / Ray Tune for HPO
When running hyperparameter optimisation sweeps, each trial should be its own MLflow run, nested under a parent sweep run.
import optuna
def objective(trial):
lr = trial.suggest_float("lr", 1e-5, 1e-2, log=True)
dropout = trial.suggest_float("dropout", 0.1, 0.5)
with mlflow.start_run(nested=True):
mlflow.log_params({"lr": lr, "dropout": dropout})
val_map = train_and_evaluate(lr=lr, dropout=dropout)
mlflow.log_metric("val_mAP50", val_map)
return val_map
with mlflow.start_run(run_name="hpo-sweep"):
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=50)Metrics & Evaluation
Log Metrics at the Right Granularity
Log per-step metrics for loss curves, per-epoch metrics for validation scores, and summary metrics at run end.
for epoch in range(num_epochs):
train_loss = run_training_epoch(...)
val_map, val_map95 = run_validation(...)
mlflow.log_metrics({
"train/loss": train_loss,
"val/mAP50": val_map,
"val/mAP50-95": val_map95,
"lr": scheduler.get_last_lr()[0],
}, step=epoch)
# Summary at end of training
mlflow.log_metrics({
"best_val_mAP50": best_map,
"best_epoch": best_epoch,
"total_train_time_hrs": elapsed / 3600,
})Log Task-Specific CV Metrics
| Task | Key Metrics to Log |
|---|---|
| Classification | top1_acc, top5_acc, per_class_f1 |
| Object Detection | mAP50, mAP50-95, precision, recall, FPS |
| Semantic Segmentation | mIoU, pixel_acc, per_class_IoU |
| Instance Segmentation | mask_AP, bbox_AP |
| Anomaly Detection | AUROC, AUPRC, F1@best_threshold |
| Depth Estimation | AbsRel, RMSE, delta_1 |
Use mlflow.evaluate() for Standardised Post-Training Evaluation
result = mlflow.evaluate(
model=f"runs:/{run_id}/model",
data=test_dataset,
targets="labels",
model_type="classifier",
evaluators=["default"],
extra_metrics=[
mlflow.metrics.precision_score(average="macro"),
mlflow.metrics.recall_score(average="macro"),
],
)
print(result.metrics)Model Serving & Deployment
Export to ONNX and Log as artifact
For production inference, ONNX enables hardware-agnostic deployment (TensorRT, OpenVINO, ONNX Runtime).
import torch
dummy_input = torch.randn(1, 3, 640, 640)
torch.onnx.export(
model,
dummy_input,
"/tmp/model.onnx",
opset_version=17,
input_names=["images"],
output_names=["output"],
dynamic_axes={"images": {0: "batch_size"}, "output": {0: "batch_size"}},
)
with mlflow.start_run():
mlflow.log_artifact("/tmp/model.onnx", artifact_path="onnx")Load Production Models by Stage, Not by Run ID
Never hardcode a run_id in serving code. Always load by registry stage.
# ✅ Correct — stage-based loading
model = mlflow.pytorch.load_model("models:/cv-resnet50-classifier/Production")
# ❌ Avoid — brittle, ties serving code to a specific run
model = mlflow.pytorch.load_model("runs:/abc123xyz/model")Log Inference Latency as a Metric
Track per-batch and per-image latency as part of your evaluation run:
import time
import numpy as np
latencies = []
for batch in test_loader:
t0 = time.perf_counter()
_ = model(batch)
latencies.append((time.perf_counter() - t0) * 1000)
mlflow.log_metrics({
"p50_latency_ms": np.percentile(latencies, 50),
"p95_latency_ms": np.percentile(latencies, 95),
"p99_latency_ms": np.percentile(latencies, 99),
"throughput_imgs_per_sec": len(test_loader.dataset) / (sum(latencies) / 1000),
})CI/CD Integration
Gate Promotions on Metric Thresholds
Never promote a model to production manually. Automate stage transitions with metric gates.
from mlflow.tracking import MlflowClient
client = MlflowClient()
run = client.get_run(candidate_run_id)
metrics = run.data.metrics
PRODUCTION_GATE = {
"val/mAP50": 0.85,
"p95_latency_ms": 50.0,
}
passed = all(
metrics.get(k, 0) >= v if "latency" not in k
else metrics.get(k, 9999) <= v
for k, v in PRODUCTION_GATE.items()
)
if passed:
client.transition_model_version_stage(
name="cv-detector",
version=candidate_version,
stage="Production",
archive_existing_versions=True,
)
print("✅ Promoted to Production")
else:
print("❌ Failed promotion gate")Automate Comparison Against Current Champion
Before any promotion, compare the challenger against the current champion model on a held-out test set.
champion = client.get_latest_versions("cv-detector", stages=["Production"])[0]
champion_metrics = client.get_run(champion.run_id).data.metrics
challenger_metrics = client.get_run(challenger_run_id).data.metrics
if challenger_metrics["val/mAP50"] > champion_metrics["val/mAP50"] + 0.005:
print("Challenger beats champion — proceed with promotion")
else:
print("Challenger did not improve sufficiently — reject")Environment Reproducibility
Always log the full environment alongside the model:
import subprocess
# Log pip freeze
pip_freeze = subprocess.check_output(["pip", "freeze"]).decode()
with open("/tmp/requirements.txt", "w") as f:
f.write(pip_freeze)
mlflow.log_artifact("/tmp/requirements.txt", artifact_path="environment")
# MLflow will also auto-capture conda.yaml / python_env.yaml when using log_modelGovernance, Reproducibility & Compliance
Seed Everything
Log all random seeds. In CV, augmentation pipelines use multiple RNGs (NumPy, PyTorch, Albumentations).
import random
import numpy as np
import torch
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
mlflow.log_param("global_seed", SEED)Record Hardware and Framework Versions
import torch
import torchvision
import platform
mlflow.log_params({
"python_version": platform.python_version(),
"pytorch_version": torch.__version__,
"torchvision_version": torchvision.__version__,
"cuda_version": torch.version.cuda,
"cudnn_version": str(torch.backends.cudnn.version()),
"gpu_model": torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU",
"num_gpus": torch.cuda.device_count(),
})Store Model Cards as artifacts
Document each registered model version with a model card (intended use, limitations, training data, fairness notes).
model_card = """
# Model Card: cv-resnet50-classifier v3
## Intended Use
- Binary defect classification for manufacturing QC
- Input: 224x224 RGB images
## Limitations
- Not validated on night-time imagery
- Class imbalance: defect rate ~3%
## Training Data
- Source: Internal dataset, 2023-01 to 2024-06
- 85k training / 15k validation images
- SHA256: a3f8c12...
## Performance
- val/top1_acc: 96.4%
- p95_latency_ms: 12.3ms (A100)
"""
with open("/tmp/MODEL_CARD.md", "w") as f:
f.write(model_card)
mlflow.log_artifact("/tmp/MODEL_CARD.md", artifact_path="governance")Performance & Scalability
Avoid Logging Inside the Training Loop
Excessive per-step metric logging adds I/O overhead. Batch or throttle your logging.
# ❌ Too frequent — logs every step
for step, batch in enumerate(train_loader):
loss = train_step(batch)
mlflow.log_metric("train/loss", loss, step=step) # Bottleneck!
# ✅ Log every N steps
LOG_INTERVAL = 50
for step, batch in enumerate(train_loader):
loss = train_step(batch)
if step % LOG_INTERVAL == 0:
mlflow.log_metric("train/loss", loss, step=step)Use Autologging Selectively
mlflow.pytorch.autolog() is convenient but can log too much noise in CV contexts. Prefer manual logging for control, and use autolog only as a baseline during exploration.
# Exploration: enable autolog
mlflow.pytorch.autolog(log_every_n_epoch=1, log_models=False)
# Production: disable autolog, log explicitly
mlflow.pytorch.autolog(disable=True)Backend Storage Recommendations
| Scale | Tracking Server | Artifact Store |
|---|---|---|
| Local/solo | Local filesystem | Local filesystem |
| Team | PostgreSQL + MLflow Server | S3 / GCS / Azure Blob |
| Enterprise | Managed MLflow (Databricks) | Object store + CDN |
import mlflow
# Point to a remote tracking server
mlflow.set_tracking_uri("http://your-mlflow-server:5000")