Kubeflow Deep Learning Guide with PyTorch
Table of Contents
Introduction
Kubeflow is a machine learning toolkit for Kubernetes that makes deployments of ML workflows on Kubernetes simple, portable, and scalable. This guide focuses on using Kubeflow with PyTorch for deep learning tasks.
Key Kubeflow Components for Deep Learning:
- Training Operator: For distributed training jobs
- Katib: For hyperparameter tuning and neural architecture search
- KServe: For model serving and inference
- Pipelines: For ML workflow orchestration
- Notebooks: For interactive development
Prerequisites
Before starting, ensure you have:
- Kubernetes cluster with Kubeflow installed
- kubectl configured to access your cluster
- Docker for building container images
- Basic knowledge of PyTorch and Kubernetes
Setting Up Your Environment
1. Create a Namespace
apiVersion: v1
kind: Namespace
metadata:
name: pytorch-training
2. Base Docker Image for PyTorch
Create a Dockerfile
for your PyTorch environment:
FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime
WORKDIR /app
# Install additional dependencies
RUN pip install --no-cache-dir \
\
torchvision \
tensorboard \
scikit-learn \
pandas \
numpy \
matplotlib
seaborn
# Copy your training code
COPY . /app/
# Set the default command
CMD ["python", "train.py"]
Creating PyTorch Training Jobs
Simple Training Job
Create a basic PyTorch training script (train.py
):
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import argparse
import os
class SimpleNet(nn.Module):
def __init__(self, num_classes=10):
super(SimpleNet, self).__init__()
self.features = nn.Sequential(
1, 32, 3, 1),
nn.Conv2d(
nn.ReLU(),32, 64, 3, 1),
nn.Conv2d(
nn.ReLU(),2),
nn.MaxPool2d(0.25),
nn.Dropout(
nn.Flatten(),9216, 128),
nn.Linear(
nn.ReLU(),0.5),
nn.Dropout(128, num_classes)
nn.Linear(
)
def forward(self, x):
return self.features(x)
def train_epoch(model, device, train_loader, optimizer, criterion, epoch):
model.train()= 0
total_loss = 0
correct
for batch_idx, (data, target) in enumerate(train_loader):
= data.to(device), target.to(device)
data, target
optimizer.zero_grad()= model(data)
output = criterion(output, target)
loss
loss.backward()
optimizer.step()
+= loss.item()
total_loss = output.argmax(dim=1, keepdim=True)
pred += pred.eq(target.view_as(pred)).sum().item()
correct
if batch_idx % 100 == 0:
print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] '
f'Loss: {loss.item():.6f}')
= 100. * correct / len(train_loader.dataset)
accuracy = total_loss / len(train_loader)
avg_loss print(f'Train Epoch: {epoch}, Average Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
return avg_loss, accuracy
def test(model, device, test_loader, criterion):
eval()
model.= 0
test_loss = 0
correct
with torch.no_grad():
for data, target in test_loader:
= data.to(device), target.to(device)
data, target = model(data)
output += criterion(output, target).item()
test_loss = output.argmax(dim=1, keepdim=True)
pred += pred.eq(target.view_as(pred)).sum().item()
correct
/= len(test_loader)
test_loss = 100. * correct / len(test_loader.dataset)
accuracy print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%')
return test_loss, accuracy
def main():
= argparse.ArgumentParser(description='PyTorch MNIST Training')
parser '--batch-size', type=int, default=64, metavar='N',
parser.add_argument(help='input batch size for training (default: 64)')
'--test-batch-size', type=int, default=1000, metavar='N',
parser.add_argument(help='input batch size for testing (default: 1000)')
'--epochs', type=int, default=10, metavar='N',
parser.add_argument(help='number of epochs to train (default: 10)')
'--lr', type=float, default=0.01, metavar='LR',
parser.add_argument(help='learning rate (default: 0.01)')
'--momentum', type=float, default=0.5, metavar='M',
parser.add_argument(help='SGD momentum (default: 0.5)')
'--no-cuda', action='store_true', default=False,
parser.add_argument(help='disables CUDA training')
'--seed', type=int, default=1, metavar='S',
parser.add_argument(help='random seed (default: 1)')
'--model-dir', type=str, default='/tmp/model',
parser.add_argument(help='directory to save the model')
= parser.parse_args()
args
torch.manual_seed(args.seed)= torch.device("cuda" if torch.cuda.is_available() and not args.no_cuda else "cpu")
device
# Data loading
= transforms.Compose([
transform
transforms.ToTensor(),0.1307,), (0.3081,))
transforms.Normalize((
])
= datasets.MNIST('/tmp/data', train=True, download=True, transform=transform)
train_dataset = datasets.MNIST('/tmp/data', train=False, transform=transform)
test_dataset
= DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
train_loader = DataLoader(test_dataset, batch_size=args.test_batch_size, shuffle=False)
test_loader
# Model, loss, and optimizer
= SimpleNet().to(device)
model = nn.CrossEntropyLoss()
criterion = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
optimizer
# Training loop
for epoch in range(1, args.epochs + 1):
= train_epoch(model, device, train_loader, optimizer, criterion, epoch)
train_loss, train_acc = test(model, device, test_loader, criterion)
test_loss, test_acc
# Save model
=True)
os.makedirs(args.model_dir, exist_okf'{args.model_dir}/model.pth')
torch.save(model.state_dict(), print(f'Model saved to {args.model_dir}/model.pth')
if __name__ == '__main__':
main()
PyTorchJob YAML Configuration
Create a pytorchjob.yaml
file:
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-mnist-training
namespace: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
metadata:
annotations:
sidecar.istio.io/inject: "false"
spec:
containers:
- name: pytorch
image: your-registry/pytorch-mnist:latest
imagePullPolicy: Always
command:
- python
- train.py
args:
- --epochs=20
- --batch-size=64
- --lr=0.01
- --model-dir=/mnt/model
resources:
requests:
memory: "2Gi"
cpu: "1"
limits:
memory: "4Gi"
cpu: "2"
nvidia.com/gpu: "1"
volumeMounts:
- name: model-storage
mountPath: /mnt/model
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
Distributed Training
For distributed training across multiple GPUs or nodes:
Distributed Training Script
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import os
def setup(rank, world_size):
"""Initialize the distributed environment."""
'MASTER_ADDR'] = os.environ.get('MASTER_ADDR', 'localhost')
os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', '12355')
os.environ["nccl", rank=rank, world_size=world_size)
dist.init_process_group(
def cleanup():
"""Clean up the distributed environment."""
dist.destroy_process_group()
def train_distributed(rank, world_size, args):
setup(rank, world_size)
= torch.device(f"cuda:{rank}")
device
torch.cuda.set_device(device)
# Create model and move to GPU
= SimpleNet().to(device)
model = DDP(model, device_ids=[rank])
model
# Create distributed sampler
= DistributedSampler(train_dataset,
train_sampler =world_size,
num_replicas=rank)
rank
= DataLoader(train_dataset,
train_loader =args.batch_size,
batch_size=train_sampler,
sampler=True)
pin_memory
= nn.CrossEntropyLoss()
criterion = optim.SGD(model.parameters(), lr=args.lr)
optimizer
# Training loop
for epoch in range(args.epochs):
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
= data.to(device, non_blocking=True), target.to(device, non_blocking=True)
data, target
optimizer.zero_grad()= model(data)
output = criterion(output, target)
loss
loss.backward()
optimizer.step()
if rank == 0 and batch_idx % 100 == 0:
print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
# Save model only on rank 0
if rank == 0:
f'{args.model_dir}/distributed_model.pth')
torch.save(model.module.state_dict(),
cleanup()
Distributed PyTorchJob YAML
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: pytorch-distributed-training
namespace: pytorch-training
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: your-registry/pytorch-distributed:latest
command:
- python
- distributed_train.py
args:
- --epochs=50
- --batch-size=32
resources:
limits:
nvidia.com/gpu: "1"
Worker:
replicas: 3
restartPolicy: OnFailure
template:
spec:
containers:
- name: pytorch
image: your-registry/pytorch-distributed:latest
command:
- python
- distributed_train.py
args:
- --epochs=50
- --batch-size=32
resources:
limits:
nvidia.com/gpu: "1"
Hyperparameter Tuning with Katib
Katib Experiment Configuration
Create a katib-experiment.yaml
:
apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
name: pytorch-hyperparameter-tuning
namespace: pytorch-training
spec:
algorithm:
algorithmName: random
objective:
type: maximize
goal: 0.95
objectiveMetricName: accuracy
parameters:
- name: lr
parameterType: double
feasibleSpace:
min: "0.001"
max: "0.1"
- name: batch-size
parameterType: int
feasibleSpace:
min: "16"
max: "128"
- name: momentum
parameterType: double
feasibleSpace:
min: "0.1"
max: "0.9"
trialTemplate:
primaryContainerName: training-container
trialSpec:
apiVersion: kubeflow.org/v1
kind: PyTorchJob
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
restartPolicy: OnFailure
template:
spec:
containers:
- name: training-container
image: your-registry/pytorch-katib:latest
command:
- python
- train_with_metrics.py
args:
- --lr=${trialParameters.lr}
- --batch-size=${trialParameters.batch-size}
- --momentum=${trialParameters.momentum}
- --epochs=10
parallelTrialCount: 3
maxTrialCount: 20
maxFailedTrialCount: 3
Training Script with Metrics for Katib
# train_with_metrics.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import argparse
def main():
= argparse.ArgumentParser()
parser '--lr', type=float, default=0.01)
parser.add_argument('--batch-size', type=int, default=64)
parser.add_argument('--momentum', type=float, default=0.5)
parser.add_argument('--epochs', type=int, default=10)
parser.add_argument(= parser.parse_args()
args
# Setup device
= torch.device("cuda" if torch.cuda.is_available() else "cpu")
device
# Data loading
= transforms.Compose([
transform
transforms.ToTensor(),0.1307,), (0.3081,))
transforms.Normalize((
])
= datasets.MNIST('/tmp/data', train=True, download=True, transform=transform)
train_dataset = datasets.MNIST('/tmp/data', train=False, transform=transform)
test_dataset
= DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
train_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
test_loader
# Model
= SimpleNet().to(device)
model = nn.CrossEntropyLoss()
criterion = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
optimizer
# Training
for epoch in range(args.epochs):
model.train()for batch_idx, (data, target) in enumerate(train_loader):
= data.to(device), target.to(device)
data, target
optimizer.zero_grad()= model(data)
output = criterion(output, target)
loss
loss.backward()
optimizer.step()
# Evaluation
eval()
model.= 0
correct = 0
total with torch.no_grad():
for data, target in test_loader:
= data.to(device), target.to(device)
data, target = model(data)
outputs = torch.max(outputs.data, 1)
_, predicted += target.size(0)
total += (predicted == target).sum().item()
correct
= correct / total
accuracy
# Print metrics for Katib (important format)
print(f"accuracy={accuracy:.4f}")
print(f"loss={1-accuracy:.4f}")
if __name__ == '__main__':
main()
Model Serving with KServe
Create a Model Server
First, create a custom predictor (predictor.py
):
import torch
import torch.nn as nn
from torchvision import transforms
import kserve
from typing import Dict
import numpy as np
from PIL import Image
import io
import base64
class SimpleNet(nn.Module):
def __init__(self, num_classes=10):
super(SimpleNet, self).__init__()
self.features = nn.Sequential(
1, 32, 3, 1),
nn.Conv2d(
nn.ReLU(),32, 64, 3, 1),
nn.Conv2d(
nn.ReLU(),2),
nn.MaxPool2d(0.25),
nn.Dropout(
nn.Flatten(),9216, 128),
nn.Linear(
nn.ReLU(),0.5),
nn.Dropout(128, num_classes)
nn.Linear(
)
def forward(self, x):
return self.features(x)
class PyTorchMNISTPredictor(kserve.Model):
def __init__(self, name: str):
super().__init__(name)
self.name = name
self.model = None
self.transform = transforms.Compose([
transforms.ToTensor(),0.1307,), (0.3081,))
transforms.Normalize((
])self.ready = False
def load(self):
self.model = SimpleNet()
self.model.load_state_dict(torch.load('/mnt/models/model.pth', map_location='cpu'))
self.model.eval()
self.ready = True
def predict(self, payload: Dict) -> Dict:
if not self.ready:
raise RuntimeError("Model not loaded")
# Decode base64 image
= base64.b64decode(payload["instances"][0]["image"])
image_data = Image.open(io.BytesIO(image_data)).convert('L')
image
# Preprocess
= self.transform(image).unsqueeze(0)
input_tensor
# Predict
with torch.no_grad():
= self.model(input_tensor)
output = torch.softmax(output, dim=1)
probabilities = torch.argmax(probabilities, dim=1).item()
predicted_class = probabilities[0][predicted_class].item()
confidence
return {
"predictions": [{
"class": predicted_class,
"confidence": confidence,
"probabilities": probabilities[0].tolist()
}]
}
if __name__ == "__main__":
= PyTorchMNISTPredictor("pytorch-mnist-predictor")
model
model.load() kserve.ModelServer().start([model])
InferenceService YAML
apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
name: pytorch-mnist-predictor
namespace: pytorch-training
spec:
predictor:
containers:
- name: kserve-container
image: your-registry/pytorch-predictor:latest
ports:
- containerPort: 8080
protocol: TCP
volumeMounts:
- name: model-storage
mountPath: /mnt/models
resources:
requests:
cpu: "100m"
memory: "1Gi"
limits:
cpu: "1"
memory: "2Gi"
volumes:
- name: model-storage
persistentVolumeClaim:
claimName: model-pvc
Complete Pipeline Example
Kubeflow Pipeline with PyTorch
import kfp
from kfp import dsl
from kfp.components import create_component_from_func
def preprocess_data_op():
return dsl.ContainerOp(
='preprocess-data',
name='your-registry/data-preprocessing:latest',
image=['python', 'preprocess.py'],
command={'dataset_path': '/tmp/dataset_path.txt'}
file_outputs
)
def train_model_op(dataset_path, lr: float = 0.01, batch_size: int = 64):
return dsl.ContainerOp(
='train-model',
name='your-registry/pytorch-training:latest',
image=['python', 'train.py'],
command=[
arguments'--data-path', dataset_path,
'--lr', lr,
'--batch-size', batch_size,
'--model-dir', '/tmp/model'
],={'model_path': '/tmp/model_path.txt'}
file_outputs
)
def evaluate_model_op(model_path, dataset_path):
return dsl.ContainerOp(
='evaluate-model',
name='your-registry/pytorch-evaluation:latest',
image=['python', 'evaluate.py'],
command=[
arguments'--model-path', model_path,
'--data-path', dataset_path
],={'metrics': '/tmp/metrics.json'}
file_outputs
)
def deploy_model_op(model_path):
return dsl.ContainerOp(
='deploy-model',
name='your-registry/model-deployment:latest',
image=['python', 'deploy.py'],
command=['--model-path', model_path]
arguments
)
@dsl.pipeline(
='PyTorch Training Pipeline',
name='Complete PyTorch training and deployment pipeline'
description
)def pytorch_training_pipeline(
float = 0.01,
lr: int = 64,
batch_size: int = 10
epochs:
):# Data preprocessing
= preprocess_data_op()
preprocess_task
# Model training
= train_model_op(
train_task =preprocess_task.outputs['dataset_path'],
dataset_path=lr,
lr=batch_size
batch_size
)
# Model evaluation
= evaluate_model_op(
evaluate_task =train_task.outputs['model_path'],
model_path=preprocess_task.outputs['dataset_path']
dataset_path
)
# Conditional deployment based on accuracy
with dsl.Condition(evaluate_task.outputs['metrics'], '>', '0.9'):
= deploy_model_op(
deploy_task =train_task.outputs['model_path']
model_path
)
# Compile and run the pipeline
if __name__ == '__main__':
compile(pytorch_training_pipeline, 'pytorch_pipeline.yaml') kfp.compiler.Compiler().
Best Practices
1. Resource Management
- Always specify resource requests and limits
- Use GPU resources efficiently with proper scheduling
- Implement proper cleanup procedures
2. Data Management
- Use persistent volumes for model storage
- Implement data versioning
- Use distributed storage for large datasets
3. Monitoring and Logging
- Implement comprehensive logging
- Use metrics collection for model performance
- Set up alerts for training failures
4. Security
- Use proper RBAC configurations
- Secure container images
- Implement secrets management for sensitive data
5. Scalability
- Design for horizontal scaling
- Use distributed training for large models
- Implement efficient data loading pipelines
6. Model Versioning
- Tag and version your models
- Implement A/B testing for model deployments
- Use model registries for tracking
7. Error Handling
- Implement robust error handling in training scripts
- Use appropriate restart policies
- Set up proper monitoring and alerting
This guide provides a comprehensive foundation for using Kubeflow with PyTorch for deep learning workflows. Adapt the examples to your specific use cases and requirements.