Introduction

DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective. It provides system innovations like ZeRO (Zero Redundancy Optimizer) to enable training massive models with trillions of parameters.

Key benefits:

  • Memory Efficiency: ZeRO reduces memory consumption by partitioning optimizer states, gradients, and model parameters
  • Speed: Achieves high training throughput through optimized kernels and communication
  • Scale: Enables training of models with billions/trillions of parameters
  • Ease of Use: Simple integration with existing PyTorch code

Installation

#| eval: false
# Install DeepSpeed
pip install deepspeed

# Or install from source for latest features
git clone https://github.com/microsoft/DeepSpeed.git
cd DeepSpeed
pip install .

# Verify installation
ds_report

Basic Setup

Simple Model Training

import torch
import torch.nn as nn
import deepspeed
from torch.utils.data import DataLoader, Dataset

# Define a simple model
class SimpleModel(nn.Module):
    def __init__(self, input_size=1000, hidden_size=2000, output_size=1000):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(input_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, output_size)
        )
    
    def forward(self, x):
        return self.layers(x)

# Dummy dataset
class DummyDataset(Dataset):
    def __init__(self, size=1000):
        self.size = size
        
    def __len__(self):
        return self.size
    
    def __getitem__(self, idx):
        return torch.randn(1000), torch.randn(1000)

# Initialize model and data
model = SimpleModel()
dataset = DummyDataset()
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)

# Initialize DeepSpeed
model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    config_params={
        "train_batch_size": 32,
        "optimizer": {
            "type": "Adam",
            "params": {"lr": 0.001}
        },
        "fp16": {"enabled": True}
    }
)

# Training loop
for epoch in range(10):
    for batch_idx, (data, target) in enumerate(dataloader):
        # Forward pass
        outputs = model_engine(data)
        loss = nn.MSELoss()(outputs, target)
        
        # Backward pass
        model_engine.backward(loss)
        model_engine.step()
        
        if batch_idx % 10 == 0:
            print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')

Configuration File Approach

import deepspeed
import argparse

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument('--local_rank', type=int, default=-1,
                       help='local rank passed from distributed launcher')
    parser.add_argument('--deepspeed_config', type=str, default='ds_config.json',
                       help='deepspeed config file')
    args = parser.parse_args()
    
    # Initialize distributed training
    deepspeed.init_distributed()
    
    model = SimpleModel()
    
    # Initialize with config file
    model_engine, optimizer, trainloader, _ = deepspeed.initialize(
        args=args,
        model=model,
        model_parameters=model.parameters(),
        training_data=dataset
    )
    
    # Training loop
    for step, batch in enumerate(trainloader):
        loss = model_engine(batch)
        model_engine.backward(loss)
        model_engine.step()

if __name__ == '__main__':
    main()

Configuration Files

Basic Configuration

Create a file called ds_config.json:

{
  "train_batch_size": 64,
  "train_micro_batch_size_per_gpu": 16,
  "gradient_accumulation_steps": 1,
  "optimizer": {
    "type": "Adam",
    "params": {
      "lr": 3e-5,
      "betas": [0.8, 0.999],
      "eps": 1e-8,
      "weight_decay": 3e-7
    }
  },
  "scheduler": {
    "type": "WarmupLR",
    "params": {
      "warmup_min_lr": 0,
      "warmup_max_lr": 3e-5,
      "warmup_num_steps": 1000
    }
  },
  "fp16": {
    "enabled": true,
    "auto_cast": false,
    "loss_scale": 0,
    "initial_scale_power": 16,
    "loss_scale_window": 1000,
    "hysteresis": 2,
    "min_loss_scale": 1
  },
  "zero_optimization": {
    "stage": 2,
    "allgather_partitions": true,
    "allgather_bucket_size": 2e8,
    "overlap_comm": true,
    "reduce_scatter": true,
    "reduce_bucket_size": 2e8,
    "contiguous_gradients": true
  },
  "gradient_clipping": 1.0,
  "wall_clock_breakdown": false
}

Advanced Configuration with ZeRO Stage 3

{
  "train_batch_size": 64,
  "train_micro_batch_size_per_gpu": 4,
  "gradient_accumulation_steps": 4,
  "optimizer": {
    "type": "AdamW",
    "params": {
      "lr": 3e-4,
      "betas": [0.9, 0.95],
      "eps": 1e-8,
      "weight_decay": 0.1
    }
  },
  "fp16": {
    "enabled": true,
    "auto_cast": false,
    "loss_scale": 0,
    "initial_scale_power": 16,
    "loss_scale_window": 1000,
    "hysteresis": 2,
    "min_loss_scale": 1
  },
  "zero_optimization": {
    "stage": 3,
    "offload_optimizer": {
      "device": "cpu",
      "pin_memory": true
    },
    "offload_param": {
      "device": "cpu",
      "pin_memory": true
    },
    "overlap_comm": true,
    "contiguous_gradients": true,
    "sub_group_size": 1e9,
    "reduce_bucket_size": "auto",
    "stage3_prefetch_bucket_size": "auto",
    "stage3_param_persistence_threshold": "auto",
    "stage3_max_live_parameters": 1e9,
    "stage3_max_reuse_distance": 1e9,
    "stage3_gather_16bit_weights_on_model_save": true
  },
  "activation_checkpointing": {
    "partition_activations": false,
    "cpu_checkpointing": true,
    "contiguous_memory_optimization": false,
    "number_checkpoints": null,
    "synchronize_checkpoint_boundary": false,
    "profile": false
  }
}

ZeRO Optimizer States

ZeRO Stage 1: Optimizer State Partitioning

# Configuration for ZeRO Stage 1
zero_stage1_config = {
    "train_batch_size": 64,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "zero_optimization": {
        "stage": 1,
        "reduce_bucket_size": 5e8
    },
    "fp16": {"enabled": True}
}

model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    model_parameters=model.parameters(),
    config_params=zero_stage1_config
)

ZeRO Stage 2: Gradient + Optimizer State Partitioning

# Configuration for ZeRO Stage 2
zero_stage2_config = {
    "train_batch_size": 64,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "zero_optimization": {
        "stage": 2,
        "allgather_partitions": True,
        "allgather_bucket_size": 2e8,
        "overlap_comm": True,
        "reduce_scatter": True,
        "reduce_bucket_size": 2e8,
        "contiguous_gradients": True
    },
    "fp16": {"enabled": True}
}

ZeRO Stage 3: Full Parameter Partitioning

# Configuration for ZeRO Stage 3
zero_stage3_config = {
    "train_batch_size": 32,
    "train_micro_batch_size_per_gpu": 8,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "zero_optimization": {
        "stage": 3,
        "overlap_comm": True,
        "contiguous_gradients": True,
        "sub_group_size": 1e9,
        "reduce_bucket_size": "auto",
        "stage3_prefetch_bucket_size": "auto",
        "stage3_param_persistence_threshold": "auto",
        "stage3_max_live_parameters": 1e9,
        "stage3_max_reuse_distance": 1e9
    },
    "fp16": {"enabled": True}
}

# Special handling for ZeRO Stage 3
with deepspeed.zero.Init(config_dict_or_path=zero_stage3_config):
    model = SimpleModel()

model_engine, optimizer, _, _ = deepspeed.initialize(
    model=model,
    config_params=zero_stage3_config
)

Model Parallelism

Pipeline Parallelism

import deepspeed
from deepspeed.pipe import PipelineModule

class PipelineModel(nn.Module):
    def __init__(self, layers_per_stage=2):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(1000, 1000) for _ in range(8)
        ])
        
    def forward(self, x):
        for layer in self.layers:
            x = torch.relu(layer(x))
        return x

# Convert to pipeline model
def partition_layers():
    layers = []
    for i in range(8):
        layers.append(nn.Sequential(
            nn.Linear(1000, 1000),
            nn.ReLU()
        ))
    return layers

# Create pipeline
model = PipelineModule(
    layers=partition_layers(),
    num_stages=4,  # Number of pipeline stages
    partition_method='type:Linear'
)

# Pipeline-specific config
pipeline_config = {
    "train_batch_size": 64,
    "train_micro_batch_size_per_gpu": 16,
    "gradient_accumulation_steps": 1,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "pipeline": {
        "stages": "auto",
        "partition": "balanced"
    },
    "fp16": {"enabled": True}
}

engine, _, _, _ = deepspeed.initialize(
    model=model,
    config_params=pipeline_config
)

Tensor Parallelism (with Megatron)

# Example using DeepSpeed with Megatron-style tensor parallelism
import deepspeed
from deepspeed.moe import MoE

class TensorParallelLinear(nn.Module):
    def __init__(self, input_size, output_size, world_size):
        super().__init__()
        self.world_size = world_size
        self.rank = torch.distributed.get_rank()
        
        # Split output dimension across ranks
        self.output_size_per_partition = output_size // world_size
        self.weight = nn.Parameter(
            torch.randn(input_size, self.output_size_per_partition)
        )
        
    def forward(self, x):
        output = torch.matmul(x, self.weight)
        # All-gather outputs from all partitions
        gathered = [torch.zeros_like(output) for _ in range(self.world_size)]
        torch.distributed.all_gather(gathered, output)
        return torch.cat(gathered, dim=-1)

Mixed Precision Training

FP16 Configuration

fp16_config = {
    "train_batch_size": 64,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "fp16": {
        "enabled": True,
        "auto_cast": False,
        "loss_scale": 0,
        "initial_scale_power": 16,
        "loss_scale_window": 1000,
        "hysteresis": 2,
        "min_loss_scale": 1
    }
}

# Custom loss scaling
def train_with_custom_scaling(model_engine, dataloader):
    for batch in dataloader:
        outputs = model_engine(batch)
        loss = compute_loss(outputs, batch)
        
        # DeepSpeed handles scaling automatically
        model_engine.backward(loss)
        model_engine.step()

BF16 Configuration

bf16_config = {
    "train_batch_size": 64,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "bf16": {
        "enabled": True
    }
}

Advanced Features

Activation Checkpointing

activation_checkpointing_config = {
    "train_batch_size": 64,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "activation_checkpointing": {
        "partition_activations": False,
        "cpu_checkpointing": True,
        "contiguous_memory_optimization": False,
        "number_checkpoints": None,
        "synchronize_checkpoint_boundary": False,
        "profile": False
    },
    "fp16": {"enabled": True}
}

CPU Offloading

cpu_offload_config = {
    "train_batch_size": 32,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "zero_optimization": {
        "stage": 3,
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": True
        },
        "offload_param": {
            "device": "cpu",
            "pin_memory": True
        }
    },
    "fp16": {"enabled": True}
}

Mixture of Experts (MoE)

from deepspeed.moe import MoE

class MoEModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.embedding = nn.Embedding(1000, 512)
        
        # MoE layer
        self.moe_layer = MoE(
            hidden_size=512,
            expert=nn.Sequential(
                nn.Linear(512, 2048),
                nn.ReLU(),
                nn.Linear(2048, 512)
            ),
            num_experts=8,
            k=2  # Top-k routing
        )
        
        self.output = nn.Linear(512, 1000)
    
    def forward(self, x):
        x = self.embedding(x)
        x, _, _ = self.moe_layer(x)
        return self.output(x)

moe_config = {
    "train_batch_size": 64,
    "optimizer": {
        "type": "Adam",
        "params": {"lr": 0.001}
    },
    "fp16": {"enabled": True}
}

Model Saving and Loading

# Saving model
def save_model(model_engine, checkpoint_dir):
    model_engine.save_checkpoint(checkpoint_dir)

# Loading model
def load_model(model_engine, checkpoint_dir):
    _, client_states = model_engine.load_checkpoint(checkpoint_dir)
    return client_states

# Usage
checkpoint_dir = "./checkpoints"
save_model(model_engine, checkpoint_dir)

# Later, load the model
client_states = load_model(model_engine, checkpoint_dir)

Troubleshooting

Common Issues and Solutions

# 1. Memory issues
# Solution: Reduce batch size or enable CPU offloading

# 2. Slow training
# Check communication overlap settings
overlap_config = {
    "zero_optimization": {
        "stage": 2,
        "overlap_comm": True,
        "contiguous_gradients": True
    }
}

# 3. Gradient explosion
# Enable gradient clipping
gradient_clip_config = {
    "gradient_clipping": 1.0
}

# 4. Loss scaling issues with FP16
# Use automatic loss scaling
auto_loss_scale_config = {
    "fp16": {
        "enabled": True,
        "loss_scale": 0,  # 0 means automatic
        "initial_scale_power": 16
    }
}

Debugging Tools

# Enable profiling
profiling_config = {
    "wall_clock_breakdown": True,
    "memory_breakdown": True
}

# Memory monitoring
def monitor_memory():
    if torch.cuda.is_available():
        print(f"GPU Memory: {torch.cuda.memory_allocated() / 1e9:.2f}GB")
        print(f"GPU Memory Cached: {torch.cuda.memory_reserved() / 1e9:.2f}GB")

# Communication profiling
def profile_communication():
    torch.distributed.barrier()  # Synchronize all processes
    start_time = time.time()
    # Your training step here
    torch.distributed.barrier()
    end_time = time.time()
    print(f"Step time: {end_time - start_time:.4f}s")

Best Practices

1. Batch Size Tuning

# Find optimal batch size
def find_optimal_batch_size(model, start_batch_size=16):
    batch_size = start_batch_size
    while True:
        try:
            config = {
                "train_micro_batch_size_per_gpu": batch_size,
                "gradient_accumulation_steps": 64 // batch_size,
                "optimizer": {"type": "Adam", "params": {"lr": 0.001}},
                "fp16": {"enabled": True}
            }
            
            model_engine, _, _, _ = deepspeed.initialize(
                model=model, config_params=config
            )
            
            # Test with dummy data
            dummy_input = torch.randn(batch_size, 1000).cuda()
            output = model_engine(dummy_input)
            loss = output.sum()
            model_engine.backward(loss)
            model_engine.step()
            
            print(f"Batch size {batch_size} works!")
            batch_size *= 2
            
        except RuntimeError as e:
            if "out of memory" in str(e):
                print(f"Max batch size: {batch_size // 2}")
                break
            else:
                raise e

2. Learning Rate Scaling

# Scale learning rate with batch size
def scale_learning_rate(base_lr, base_batch_size, actual_batch_size):
    return base_lr * (actual_batch_size / base_batch_size)

# Example
base_config = {
    "train_batch_size": 1024,
    "optimizer": {
        "type": "Adam",
        "params": {
            "lr": scale_learning_rate(3e-4, 64, 1024)
        }
    }
}

3. Efficient Data Loading

class EfficientDataLoader:
    def __init__(self, dataset, batch_size, num_workers=4):
        self.dataloader = DataLoader(
            dataset,
            batch_size=batch_size,
            shuffle=True,
            num_workers=num_workers,
            pin_memory=True,
            persistent_workers=True
        )
    
    def __iter__(self):
        for batch in self.dataloader:
            # Move to GPU asynchronously
            batch = [x.cuda(non_blocking=True) for x in batch]
            yield batch

4. Model Architecture Tips

# Use activation checkpointing for large models
class CheckpointedModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layers = nn.ModuleList([
            nn.Linear(1000, 1000) for _ in range(100)
        ])
    
    def forward(self, x):
        # Checkpoint every 10 layers
        for i in range(0, len(self.layers), 10):
            def create_forward(start_idx):
                def forward_chunk(x):
                    for j in range(start_idx, min(start_idx + 10, len(self.layers))):
                        x = torch.relu(self.layers[j](x))
                    return x
                return forward_chunk
            
            x = torch.utils.checkpoint.checkpoint(create_forward(i), x)
        return x

5. Multi-Node Training Script

# launch_script.py
import subprocess
import sys

def launch_distributed_training():
    cmd = [
        "deepspeed",
        "--num_gpus=8",
        "--num_nodes=4",
        "--master_addr=your_master_node",
        "--master_port=29500",
        "train.py",
        "--deepspeed_config=ds_config.json"
    ]
    
    subprocess.run(cmd)

if __name__ == "__main__":
    launch_distributed_training()

Conclusion

This guide covers the essential aspects of using DeepSpeed with PyTorch. Remember to experiment with different configurations based on your specific model architecture and hardware setup. Start with simpler configurations (ZeRO Stage 1-2) and gradually move to more advanced features (ZeRO Stage 3, CPU offloading) as needed.

TipGetting Started
  1. Start with ZeRO Stage 1 or 2 for your first DeepSpeed experiments
  2. Use FP16 mixed precision to reduce memory usage
  3. Tune batch sizes to maximize GPU utilization
  4. Monitor memory usage and communication overhead
  5. Scale learning rates appropriately with batch size changes
WarningCommon Pitfalls
  • Not setting gradient_accumulation_steps correctly
  • Using too large batch sizes leading to OOM errors
  • Not enabling communication overlap for better performance
  • Forgetting to scale learning rates when changing batch sizes