Kubeflow Deep Learning Guide with PyTorch

code
mlops
intermediate
Author

Krishnatheja Vanka

Published

May 31, 2025

Kubeflow Deep Learning Guide with PyTorch

Table of Contents

  1. Introduction
  2. Prerequisites
  3. Setting Up Your Environment
  4. Creating PyTorch Training Jobs
  5. Distributed Training
  6. Hyperparameter Tuning with Katib
  7. Model Serving with KServe
  8. Complete Pipeline Example
  9. Best Practices

Introduction

Kubeflow is a machine learning toolkit for Kubernetes that makes deployments of ML workflows on Kubernetes simple, portable, and scalable. This guide focuses on using Kubeflow with PyTorch for deep learning tasks.

Key Kubeflow Components for Deep Learning:

  • Training Operator: For distributed training jobs
  • Katib: For hyperparameter tuning and neural architecture search
  • KServe: For model serving and inference
  • Pipelines: For ML workflow orchestration
  • Notebooks: For interactive development

Prerequisites

Before starting, ensure you have:

  • Kubernetes cluster with Kubeflow installed
  • kubectl configured to access your cluster
  • Docker for building container images
  • Basic knowledge of PyTorch and Kubernetes

Setting Up Your Environment

1. Create a Namespace

apiVersion: v1
kind: Namespace
metadata:
  name: pytorch-training

2. Base Docker Image for PyTorch

Create a Dockerfile for your PyTorch environment:

FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime

WORKDIR /app

# Install additional dependencies
RUN pip install --no-cache-dir \
    torchvision \
    tensorboard \
    scikit-learn \
    pandas \
    numpy \
    matplotlib \
    seaborn

# Copy your training code
COPY . /app/

# Set the default command
CMD ["python", "train.py"]

Creating PyTorch Training Jobs

Simple Training Job

Create a basic PyTorch training script (train.py):

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import argparse
import os

class SimpleNet(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 3, 1),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, 1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
            nn.Flatten(),
            nn.Linear(9216, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )
    
    def forward(self, x):
        return self.features(x)

def train_epoch(model, device, train_loader, optimizer, criterion, epoch):
    model.train()
    total_loss = 0
    correct = 0
    
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        
        if batch_idx % 100 == 0:
            print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}] '
                  f'Loss: {loss.item():.6f}')
    
    accuracy = 100. * correct / len(train_loader.dataset)
    avg_loss = total_loss / len(train_loader)
    print(f'Train Epoch: {epoch}, Average Loss: {avg_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return avg_loss, accuracy

def test(model, device, test_loader, criterion):
    model.eval()
    test_loss = 0
    correct = 0
    
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    
    test_loss /= len(test_loader)
    accuracy = 100. * correct / len(test_loader.dataset)
    print(f'Test set: Average loss: {test_loss:.4f}, Accuracy: {accuracy:.2f}%')
    return test_loss, accuracy

def main():
    parser = argparse.ArgumentParser(description='PyTorch MNIST Training')
    parser.add_argument('--batch-size', type=int, default=64, metavar='N',
                        help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
                        help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N',
                        help='number of epochs to train (default: 10)')
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
                        help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
                        help='SGD momentum (default: 0.5)')
    parser.add_argument('--no-cuda', action='store_true', default=False,
                        help='disables CUDA training')
    parser.add_argument('--seed', type=int, default=1, metavar='S',
                        help='random seed (default: 1)')
    parser.add_argument('--model-dir', type=str, default='/tmp/model',
                        help='directory to save the model')
    
    args = parser.parse_args()
    
    torch.manual_seed(args.seed)
    device = torch.device("cuda" if torch.cuda.is_available() and not args.no_cuda else "cpu")
    
    # Data loading
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    
    train_dataset = datasets.MNIST('/tmp/data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST('/tmp/data', train=False, transform=transform)
    
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=args.test_batch_size, shuffle=False)
    
    # Model, loss, and optimizer
    model = SimpleNet().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
    
    # Training loop
    for epoch in range(1, args.epochs + 1):
        train_loss, train_acc = train_epoch(model, device, train_loader, optimizer, criterion, epoch)
        test_loss, test_acc = test(model, device, test_loader, criterion)
    
    # Save model
    os.makedirs(args.model_dir, exist_ok=True)
    torch.save(model.state_dict(), f'{args.model_dir}/model.pth')
    print(f'Model saved to {args.model_dir}/model.pth')

if __name__ == '__main__':
    main()

PyTorchJob YAML Configuration

Create a pytorchjob.yaml file:

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-mnist-training
  namespace: pytorch-training
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        metadata:
          annotations:
            sidecar.istio.io/inject: "false"
        spec:
          containers:
          - name: pytorch
            image: your-registry/pytorch-mnist:latest
            imagePullPolicy: Always
            command:
            - python
            - train.py
            args:
            - --epochs=20
            - --batch-size=64
            - --lr=0.01
            - --model-dir=/mnt/model
            resources:
              requests:
                memory: "2Gi"
                cpu: "1"
              limits:
                memory: "4Gi"
                cpu: "2"
                nvidia.com/gpu: "1"
            volumeMounts:
            - name: model-storage
              mountPath: /mnt/model
          volumes:
          - name: model-storage
            persistentVolumeClaim:
              claimName: model-pvc

Distributed Training

For distributed training across multiple GPUs or nodes:

Distributed Training Script

import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data.distributed import DistributedSampler
import os

def setup(rank, world_size):
    """Initialize the distributed environment."""
    os.environ['MASTER_ADDR'] = os.environ.get('MASTER_ADDR', 'localhost')
    os.environ['MASTER_PORT'] = os.environ.get('MASTER_PORT', '12355')
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    """Clean up the distributed environment."""
    dist.destroy_process_group()

def train_distributed(rank, world_size, args):
    setup(rank, world_size)
    
    device = torch.device(f"cuda:{rank}")
    torch.cuda.set_device(device)
    
    # Create model and move to GPU
    model = SimpleNet().to(device)
    model = DDP(model, device_ids=[rank])
    
    # Create distributed sampler
    train_sampler = DistributedSampler(train_dataset, 
                                       num_replicas=world_size, 
                                       rank=rank)
    
    train_loader = DataLoader(train_dataset, 
                              batch_size=args.batch_size,
                              sampler=train_sampler,
                              pin_memory=True)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=args.lr)
    
    # Training loop
    for epoch in range(args.epochs):
        train_sampler.set_epoch(epoch)
        
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device, non_blocking=True), target.to(device, non_blocking=True)
            
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
            
            if rank == 0 and batch_idx % 100 == 0:
                print(f"Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}")
    
    # Save model only on rank 0
    if rank == 0:
        torch.save(model.module.state_dict(), f'{args.model_dir}/distributed_model.pth')
    
    cleanup()

Distributed PyTorchJob YAML

apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: pytorch-distributed-training
  namespace: pytorch-training
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: pytorch
            image: your-registry/pytorch-distributed:latest
            command:
            - python
            - distributed_train.py
            args:
            - --epochs=50
            - --batch-size=32
            resources:
              limits:
                nvidia.com/gpu: "1"
    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        spec:
          containers:
          - name: pytorch
            image: your-registry/pytorch-distributed:latest
            command:
            - python
            - distributed_train.py
            args:
            - --epochs=50
            - --batch-size=32
            resources:
              limits:
                nvidia.com/gpu: "1"

Hyperparameter Tuning with Katib

Katib Experiment Configuration

Create a katib-experiment.yaml:

apiVersion: kubeflow.org/v1beta1
kind: Experiment
metadata:
  name: pytorch-hyperparameter-tuning
  namespace: pytorch-training
spec:
  algorithm:
    algorithmName: random
  objective:
    type: maximize
    goal: 0.95
    objectiveMetricName: accuracy
  parameters:
  - name: lr
    parameterType: double
    feasibleSpace:
      min: "0.001"
      max: "0.1"
  - name: batch-size
    parameterType: int
    feasibleSpace:
      min: "16"
      max: "128"
  - name: momentum
    parameterType: double
    feasibleSpace:
      min: "0.1"
      max: "0.9"
  trialTemplate:
    primaryContainerName: training-container
    trialSpec:
      apiVersion: kubeflow.org/v1
      kind: PyTorchJob
      spec:
        pytorchReplicaSpecs:
          Master:
            replicas: 1
            restartPolicy: OnFailure
            template:
              spec:
                containers:
                - name: training-container
                  image: your-registry/pytorch-katib:latest
                  command:
                  - python
                  - train_with_metrics.py
                  args:
                  - --lr=${trialParameters.lr}
                  - --batch-size=${trialParameters.batch-size}
                  - --momentum=${trialParameters.momentum}
                  - --epochs=10
  parallelTrialCount: 3
  maxTrialCount: 20
  maxFailedTrialCount: 3

Training Script with Metrics for Katib

# train_with_metrics.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--lr', type=float, default=0.01)
    parser.add_argument('--batch-size', type=int, default=64)
    parser.add_argument('--momentum', type=float, default=0.5)
    parser.add_argument('--epochs', type=int, default=10)
    args = parser.parse_args()
    
    # Setup device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Data loading
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
    
    train_dataset = datasets.MNIST('/tmp/data', train=True, download=True, transform=transform)
    test_dataset = datasets.MNIST('/tmp/data', train=False, transform=transform)
    
    train_loader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=1000, shuffle=False)
    
    # Model
    model = SimpleNet().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.SGD(model.parameters(), lr=args.lr, momentum=args.momentum)
    
    # Training
    for epoch in range(args.epochs):
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
    
    # Evaluation
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            outputs = model(data)
            _, predicted = torch.max(outputs.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    
    accuracy = correct / total
    
    # Print metrics for Katib (important format)
    print(f"accuracy={accuracy:.4f}")
    print(f"loss={1-accuracy:.4f}")

if __name__ == '__main__':
    main()

Model Serving with KServe

Create a Model Server

First, create a custom predictor (predictor.py):

import torch
import torch.nn as nn
from torchvision import transforms
import kserve
from typing import Dict
import numpy as np
from PIL import Image
import io
import base64

class SimpleNet(nn.Module):
    def __init__(self, num_classes=10):
        super(SimpleNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 32, 3, 1),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, 1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.25),
            nn.Flatten(),
            nn.Linear(9216, 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, num_classes)
        )
    
    def forward(self, x):
        return self.features(x)

class PyTorchMNISTPredictor(kserve.Model):
    def __init__(self, name: str):
        super().__init__(name)
        self.name = name
        self.model = None
        self.transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.1307,), (0.3081,))
        ])
        self.ready = False

    def load(self):
        self.model = SimpleNet()
        self.model.load_state_dict(torch.load('/mnt/models/model.pth', map_location='cpu'))
        self.model.eval()
        self.ready = True

    def predict(self, payload: Dict) -> Dict:
        if not self.ready:
            raise RuntimeError("Model not loaded")
        
        # Decode base64 image
        image_data = base64.b64decode(payload["instances"][0]["image"])
        image = Image.open(io.BytesIO(image_data)).convert('L')
        
        # Preprocess
        input_tensor = self.transform(image).unsqueeze(0)
        
        # Predict
        with torch.no_grad():
            output = self.model(input_tensor)
            probabilities = torch.softmax(output, dim=1)
            predicted_class = torch.argmax(probabilities, dim=1).item()
            confidence = probabilities[0][predicted_class].item()
        
        return {
            "predictions": [{
                "class": predicted_class,
                "confidence": confidence,
                "probabilities": probabilities[0].tolist()
            }]
        }

if __name__ == "__main__":
    model = PyTorchMNISTPredictor("pytorch-mnist-predictor")
    model.load()
    kserve.ModelServer().start([model])

InferenceService YAML

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: pytorch-mnist-predictor
  namespace: pytorch-training
spec:
  predictor:
    containers:
    - name: kserve-container
      image: your-registry/pytorch-predictor:latest
      ports:
      - containerPort: 8080
        protocol: TCP
      volumeMounts:
      - name: model-storage
        mountPath: /mnt/models
      resources:
        requests:
          cpu: "100m"
          memory: "1Gi"
        limits:
          cpu: "1"
          memory: "2Gi"
    volumes:
    - name: model-storage
      persistentVolumeClaim:
        claimName: model-pvc

Complete Pipeline Example

Kubeflow Pipeline with PyTorch

import kfp
from kfp import dsl
from kfp.components import create_component_from_func

def preprocess_data_op():
    return dsl.ContainerOp(
        name='preprocess-data',
        image='your-registry/data-preprocessing:latest',
        command=['python', 'preprocess.py'],
        file_outputs={'dataset_path': '/tmp/dataset_path.txt'}
    )

def train_model_op(dataset_path, lr: float = 0.01, batch_size: int = 64):
    return dsl.ContainerOp(
        name='train-model',
        image='your-registry/pytorch-training:latest',
        command=['python', 'train.py'],
        arguments=[
            '--data-path', dataset_path,
            '--lr', lr,
            '--batch-size', batch_size,
            '--model-dir', '/tmp/model'
        ],
        file_outputs={'model_path': '/tmp/model_path.txt'}
    )

def evaluate_model_op(model_path, dataset_path):
    return dsl.ContainerOp(
        name='evaluate-model',
        image='your-registry/pytorch-evaluation:latest',
        command=['python', 'evaluate.py'],
        arguments=[
            '--model-path', model_path,
            '--data-path', dataset_path
        ],
        file_outputs={'metrics': '/tmp/metrics.json'}
    )

def deploy_model_op(model_path):
    return dsl.ContainerOp(
        name='deploy-model',
        image='your-registry/model-deployment:latest',
        command=['python', 'deploy.py'],
        arguments=['--model-path', model_path]
    )

@dsl.pipeline(
    name='PyTorch Training Pipeline',
    description='Complete PyTorch training and deployment pipeline'
)
def pytorch_training_pipeline(
    lr: float = 0.01,
    batch_size: int = 64,
    epochs: int = 10
):
    # Data preprocessing
    preprocess_task = preprocess_data_op()
    
    # Model training
    train_task = train_model_op(
        dataset_path=preprocess_task.outputs['dataset_path'],
        lr=lr,
        batch_size=batch_size
    )
    
    # Model evaluation
    evaluate_task = evaluate_model_op(
        model_path=train_task.outputs['model_path'],
        dataset_path=preprocess_task.outputs['dataset_path']
    )
    
    # Conditional deployment based on accuracy
    with dsl.Condition(evaluate_task.outputs['metrics'], '>', '0.9'):
        deploy_task = deploy_model_op(
            model_path=train_task.outputs['model_path']
        )

# Compile and run the pipeline
if __name__ == '__main__':
    kfp.compiler.Compiler().compile(pytorch_training_pipeline, 'pytorch_pipeline.yaml')

Best Practices

1. Resource Management

  • Always specify resource requests and limits
  • Use GPU resources efficiently with proper scheduling
  • Implement proper cleanup procedures

2. Data Management

  • Use persistent volumes for model storage
  • Implement data versioning
  • Use distributed storage for large datasets

3. Monitoring and Logging

  • Implement comprehensive logging
  • Use metrics collection for model performance
  • Set up alerts for training failures

4. Security

  • Use proper RBAC configurations
  • Secure container images
  • Implement secrets management for sensitive data

5. Scalability

  • Design for horizontal scaling
  • Use distributed training for large models
  • Implement efficient data loading pipelines

6. Model Versioning

  • Tag and version your models
  • Implement A/B testing for model deployments
  • Use model registries for tracking

7. Error Handling

  • Implement robust error handling in training scripts
  • Use appropriate restart policies
  • Set up proper monitoring and alerting

This guide provides a comprehensive foundation for using Kubeflow with PyTorch for deep learning workflows. Adapt the examples to your specific use cases and requirements.