import torch
import torch.nn as nn
import deepspeed
from torch.utils.data import DataLoader, Dataset
# Define a simple model
class SimpleModel(nn.Module):
def __init__(self, input_size=1000, hidden_size=2000, output_size=1000):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size)
)
def forward(self, x):
return self.layers(x)
# Dummy dataset
class DummyDataset(Dataset):
def __init__(self, size=1000):
self.size = size
def __len__(self):
return self.size
def __getitem__(self, idx):
return torch.randn(1000), torch.randn(1000)
# Initialize model and data
model = SimpleModel()
dataset = DummyDataset()
dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# Initialize DeepSpeed
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config_params={
"train_batch_size": 32,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"fp16": {"enabled": True}
}
)
# Training loop
for epoch in range(10):
for batch_idx, (data, target) in enumerate(dataloader):
# Forward pass
outputs = model_engine(data)
loss = nn.MSELoss()(outputs, target)
# Backward pass
model_engine.backward(loss)
model_engine.step()
if batch_idx % 10 == 0:
print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')DeepSpeed with PyTorch: Complete Code Guide

Introduction
DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective. It provides system innovations like ZeRO (Zero Redundancy Optimizer) to enable training massive models with trillions of parameters.
Key benefits:
- Memory Efficiency: ZeRO reduces memory consumption by partitioning optimizer states, gradients, and model parameters
- Speed: Achieves high training throughput through optimized kernels and communication
- Scale: Enables training of models with billions/trillions of parameters
- Ease of Use: Simple integration with existing PyTorch code
Installation
#| eval: false
# Install DeepSpeed
pip install deepspeed
# Or install from source for latest features
git clone https://github.com/microsoft/DeepSpeed.git
cd DeepSpeed
pip install .
# Verify installation
ds_reportBasic Setup
Simple Model Training
Configuration File Approach
import deepspeed
import argparse
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', type=int, default=-1,
help='local rank passed from distributed launcher')
parser.add_argument('--deepspeed_config', type=str, default='ds_config.json',
help='deepspeed config file')
args = parser.parse_args()
# Initialize distributed training
deepspeed.init_distributed()
model = SimpleModel()
# Initialize with config file
model_engine, optimizer, trainloader, _ = deepspeed.initialize(
args=args,
model=model,
model_parameters=model.parameters(),
training_data=dataset
)
# Training loop
for step, batch in enumerate(trainloader):
loss = model_engine(batch)
model_engine.backward(loss)
model_engine.step()
if __name__ == '__main__':
main()Configuration Files
Basic Configuration
Create a file called ds_config.json:
{
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 16,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 3e-5,
"betas": [0.8, 0.999],
"eps": 1e-8,
"weight_decay": 3e-7
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 3e-5,
"warmup_num_steps": 1000
}
},
"fp16": {
"enabled": true,
"auto_cast": false,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 2,
"allgather_partitions": true,
"allgather_bucket_size": 2e8,
"overlap_comm": true,
"reduce_scatter": true,
"reduce_bucket_size": 2e8,
"contiguous_gradients": true
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": false
}Advanced Configuration with ZeRO Stage 3
{
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 4,
"gradient_accumulation_steps": 4,
"optimizer": {
"type": "AdamW",
"params": {
"lr": 3e-4,
"betas": [0.9, 0.95],
"eps": 1e-8,
"weight_decay": 0.1
}
},
"fp16": {
"enabled": true,
"auto_cast": false,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": true
},
"offload_param": {
"device": "cpu",
"pin_memory": true
},
"overlap_comm": true,
"contiguous_gradients": true,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": true
},
"activation_checkpointing": {
"partition_activations": false,
"cpu_checkpointing": true,
"contiguous_memory_optimization": false,
"number_checkpoints": null,
"synchronize_checkpoint_boundary": false,
"profile": false
}
}ZeRO Optimizer States
ZeRO Stage 1: Optimizer State Partitioning
# Configuration for ZeRO Stage 1
zero_stage1_config = {
"train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"zero_optimization": {
"stage": 1,
"reduce_bucket_size": 5e8
},
"fp16": {"enabled": True}
}
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
model_parameters=model.parameters(),
config_params=zero_stage1_config
)ZeRO Stage 2: Gradient + Optimizer State Partitioning
# Configuration for ZeRO Stage 2
zero_stage2_config = {
"train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"zero_optimization": {
"stage": 2,
"allgather_partitions": True,
"allgather_bucket_size": 2e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 2e8,
"contiguous_gradients": True
},
"fp16": {"enabled": True}
}ZeRO Stage 3: Full Parameter Partitioning
# Configuration for ZeRO Stage 3
zero_stage3_config = {
"train_batch_size": 32,
"train_micro_batch_size_per_gpu": 8,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"zero_optimization": {
"stage": 3,
"overlap_comm": True,
"contiguous_gradients": True,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9
},
"fp16": {"enabled": True}
}
# Special handling for ZeRO Stage 3
with deepspeed.zero.Init(config_dict_or_path=zero_stage3_config):
model = SimpleModel()
model_engine, optimizer, _, _ = deepspeed.initialize(
model=model,
config_params=zero_stage3_config
)Model Parallelism
Pipeline Parallelism
import deepspeed
from deepspeed.pipe import PipelineModule
class PipelineModel(nn.Module):
def __init__(self, layers_per_stage=2):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(1000, 1000) for _ in range(8)
])
def forward(self, x):
for layer in self.layers:
x = torch.relu(layer(x))
return x
# Convert to pipeline model
def partition_layers():
layers = []
for i in range(8):
layers.append(nn.Sequential(
nn.Linear(1000, 1000),
nn.ReLU()
))
return layers
# Create pipeline
model = PipelineModule(
layers=partition_layers(),
num_stages=4, # Number of pipeline stages
partition_method='type:Linear'
)
# Pipeline-specific config
pipeline_config = {
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 16,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"pipeline": {
"stages": "auto",
"partition": "balanced"
},
"fp16": {"enabled": True}
}
engine, _, _, _ = deepspeed.initialize(
model=model,
config_params=pipeline_config
)Tensor Parallelism (with Megatron)
# Example using DeepSpeed with Megatron-style tensor parallelism
import deepspeed
from deepspeed.moe import MoE
class TensorParallelLinear(nn.Module):
def __init__(self, input_size, output_size, world_size):
super().__init__()
self.world_size = world_size
self.rank = torch.distributed.get_rank()
# Split output dimension across ranks
self.output_size_per_partition = output_size // world_size
self.weight = nn.Parameter(
torch.randn(input_size, self.output_size_per_partition)
)
def forward(self, x):
output = torch.matmul(x, self.weight)
# All-gather outputs from all partitions
gathered = [torch.zeros_like(output) for _ in range(self.world_size)]
torch.distributed.all_gather(gathered, output)
return torch.cat(gathered, dim=-1)Mixed Precision Training
FP16 Configuration
fp16_config = {
"train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"fp16": {
"enabled": True,
"auto_cast": False,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
}
}
# Custom loss scaling
def train_with_custom_scaling(model_engine, dataloader):
for batch in dataloader:
outputs = model_engine(batch)
loss = compute_loss(outputs, batch)
# DeepSpeed handles scaling automatically
model_engine.backward(loss)
model_engine.step()BF16 Configuration
bf16_config = {
"train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"bf16": {
"enabled": True
}
}Advanced Features
Activation Checkpointing
activation_checkpointing_config = {
"train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"activation_checkpointing": {
"partition_activations": False,
"cpu_checkpointing": True,
"contiguous_memory_optimization": False,
"number_checkpoints": None,
"synchronize_checkpoint_boundary": False,
"profile": False
},
"fp16": {"enabled": True}
}CPU Offloading
cpu_offload_config = {
"train_batch_size": 32,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": True
},
"offload_param": {
"device": "cpu",
"pin_memory": True
}
},
"fp16": {"enabled": True}
}Mixture of Experts (MoE)
from deepspeed.moe import MoE
class MoEModel(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(1000, 512)
# MoE layer
self.moe_layer = MoE(
hidden_size=512,
expert=nn.Sequential(
nn.Linear(512, 2048),
nn.ReLU(),
nn.Linear(2048, 512)
),
num_experts=8,
k=2 # Top-k routing
)
self.output = nn.Linear(512, 1000)
def forward(self, x):
x = self.embedding(x)
x, _, _ = self.moe_layer(x)
return self.output(x)
moe_config = {
"train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},
"fp16": {"enabled": True}
}Model Saving and Loading
# Saving model
def save_model(model_engine, checkpoint_dir):
model_engine.save_checkpoint(checkpoint_dir)
# Loading model
def load_model(model_engine, checkpoint_dir):
_, client_states = model_engine.load_checkpoint(checkpoint_dir)
return client_states
# Usage
checkpoint_dir = "./checkpoints"
save_model(model_engine, checkpoint_dir)
# Later, load the model
client_states = load_model(model_engine, checkpoint_dir)Troubleshooting
Common Issues and Solutions
# 1. Memory issues
# Solution: Reduce batch size or enable CPU offloading
# 2. Slow training
# Check communication overlap settings
overlap_config = {
"zero_optimization": {
"stage": 2,
"overlap_comm": True,
"contiguous_gradients": True
}
}
# 3. Gradient explosion
# Enable gradient clipping
gradient_clip_config = {
"gradient_clipping": 1.0
}
# 4. Loss scaling issues with FP16
# Use automatic loss scaling
auto_loss_scale_config = {
"fp16": {
"enabled": True,
"loss_scale": 0, # 0 means automatic
"initial_scale_power": 16
}
}Debugging Tools
# Enable profiling
profiling_config = {
"wall_clock_breakdown": True,
"memory_breakdown": True
}
# Memory monitoring
def monitor_memory():
if torch.cuda.is_available():
print(f"GPU Memory: {torch.cuda.memory_allocated() / 1e9:.2f}GB")
print(f"GPU Memory Cached: {torch.cuda.memory_reserved() / 1e9:.2f}GB")
# Communication profiling
def profile_communication():
torch.distributed.barrier() # Synchronize all processes
start_time = time.time()
# Your training step here
torch.distributed.barrier()
end_time = time.time()
print(f"Step time: {end_time - start_time:.4f}s")Best Practices
1. Batch Size Tuning
# Find optimal batch size
def find_optimal_batch_size(model, start_batch_size=16):
batch_size = start_batch_size
while True:
try:
config = {
"train_micro_batch_size_per_gpu": batch_size,
"gradient_accumulation_steps": 64 // batch_size,
"optimizer": {"type": "Adam", "params": {"lr": 0.001}},
"fp16": {"enabled": True}
}
model_engine, _, _, _ = deepspeed.initialize(
model=model, config_params=config
)
# Test with dummy data
dummy_input = torch.randn(batch_size, 1000).cuda()
output = model_engine(dummy_input)
loss = output.sum()
model_engine.backward(loss)
model_engine.step()
print(f"Batch size {batch_size} works!")
batch_size *= 2
except RuntimeError as e:
if "out of memory" in str(e):
print(f"Max batch size: {batch_size // 2}")
break
else:
raise e2. Learning Rate Scaling
# Scale learning rate with batch size
def scale_learning_rate(base_lr, base_batch_size, actual_batch_size):
return base_lr * (actual_batch_size / base_batch_size)
# Example
base_config = {
"train_batch_size": 1024,
"optimizer": {
"type": "Adam",
"params": {
"lr": scale_learning_rate(3e-4, 64, 1024)
}
}
}3. Efficient Data Loading
class EfficientDataLoader:
def __init__(self, dataset, batch_size, num_workers=4):
self.dataloader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
persistent_workers=True
)
def __iter__(self):
for batch in self.dataloader:
# Move to GPU asynchronously
batch = [x.cuda(non_blocking=True) for x in batch]
yield batch4. Model Architecture Tips
# Use activation checkpointing for large models
class CheckpointedModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
nn.Linear(1000, 1000) for _ in range(100)
])
def forward(self, x):
# Checkpoint every 10 layers
for i in range(0, len(self.layers), 10):
def create_forward(start_idx):
def forward_chunk(x):
for j in range(start_idx, min(start_idx + 10, len(self.layers))):
x = torch.relu(self.layers[j](x))
return x
return forward_chunk
x = torch.utils.checkpoint.checkpoint(create_forward(i), x)
return x5. Multi-Node Training Script
# launch_script.py
import subprocess
import sys
def launch_distributed_training():
cmd = [
"deepspeed",
"--num_gpus=8",
"--num_nodes=4",
"--master_addr=your_master_node",
"--master_port=29500",
"train.py",
"--deepspeed_config=ds_config.json"
]
subprocess.run(cmd)
if __name__ == "__main__":
launch_distributed_training()Conclusion
This guide covers the essential aspects of using DeepSpeed with PyTorch. Remember to experiment with different configurations based on your specific model architecture and hardware setup. Start with simpler configurations (ZeRO Stage 1-2) and gradually move to more advanced features (ZeRO Stage 3, CPU offloading) as needed.
TipGetting Started
- Start with ZeRO Stage 1 or 2 for your first DeepSpeed experiments
- Use FP16 mixed precision to reduce memory usage
- Tune batch sizes to maximize GPU utilization
- Monitor memory usage and communication overhead
- Scale learning rates appropriately with batch size changes
WarningCommon Pitfalls
- Not setting
gradient_accumulation_stepscorrectly - Using too large batch sizes leading to OOM errors
- Not enabling communication overlap for better performance
- Forgetting to scale learning rates when changing batch sizes