import torch
import torch.nn as nn
import deepspeed
from torch.utils.data import DataLoader, Dataset
# Define a simple model
class SimpleModel(nn.Module):
def __init__(self, input_size=1000, hidden_size=2000, output_size=1000):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, output_size)
)
def forward(self, x):
return self.layers(x)
# Dummy dataset
class DummyDataset(Dataset):
def __init__(self, size=1000):
self.size = size
def __len__(self):
return self.size
def __getitem__(self, idx):
return torch.randn(1000), torch.randn(1000)
# Initialize model and data
= SimpleModel()
model = DummyDataset()
dataset = DataLoader(dataset, batch_size=32, shuffle=True)
dataloader
# Initialize DeepSpeed
= deepspeed.initialize(
model_engine, optimizer, _, _ =model,
model=model.parameters(),
model_parameters={
config_params"train_batch_size": 32,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"fp16": {"enabled": True}
}
)
# Training loop
for epoch in range(10):
for batch_idx, (data, target) in enumerate(dataloader):
# Forward pass
= model_engine(data)
outputs = nn.MSELoss()(outputs, target)
loss
# Backward pass
model_engine.backward(loss)
model_engine.step()
if batch_idx % 10 == 0:
print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item():.4f}')
DeepSpeed with PyTorch: Complete Code Guide
Introduction
DeepSpeed is a deep learning optimization library that makes distributed training easy, efficient, and effective. It provides system innovations like ZeRO (Zero Redundancy Optimizer) to enable training massive models with trillions of parameters.
Key benefits:
- Memory Efficiency: ZeRO reduces memory consumption by partitioning optimizer states, gradients, and model parameters
- Speed: Achieves high training throughput through optimized kernels and communication
- Scale: Enables training of models with billions/trillions of parameters
- Ease of Use: Simple integration with existing PyTorch code
Installation
#| eval: false
# Install DeepSpeed
pip install deepspeed
# Or install from source for latest features
git clone https://github.com/microsoft/DeepSpeed.git
cd DeepSpeed
pip install .
# Verify installation
ds_report
Basic Setup
Simple Model Training
Configuration File Approach
import deepspeed
import argparse
def main():
= argparse.ArgumentParser()
parser '--local_rank', type=int, default=-1,
parser.add_argument(help='local rank passed from distributed launcher')
'--deepspeed_config', type=str, default='ds_config.json',
parser.add_argument(help='deepspeed config file')
= parser.parse_args()
args
# Initialize distributed training
deepspeed.init_distributed()
= SimpleModel()
model
# Initialize with config file
= deepspeed.initialize(
model_engine, optimizer, trainloader, _ =args,
args=model,
model=model.parameters(),
model_parameters=dataset
training_data
)
# Training loop
for step, batch in enumerate(trainloader):
= model_engine(batch)
loss
model_engine.backward(loss)
model_engine.step()
if __name__ == '__main__':
main()
Configuration Files
Basic Configuration
Create a file called ds_config.json
:
{
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 16,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {
"lr": 3e-5,
"betas": [0.8, 0.999],
"eps": 1e-8,
"weight_decay": 3e-7
}
},
"scheduler": {
"type": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": 3e-5,
"warmup_num_steps": 1000
}
},
"fp16": {
"enabled": true,
"auto_cast": false,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 2,
"allgather_partitions": true,
"allgather_bucket_size": 2e8,
"overlap_comm": true,
"reduce_scatter": true,
"reduce_bucket_size": 2e8,
"contiguous_gradients": true
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": false
}
Advanced Configuration with ZeRO Stage 3
{
"train_batch_size": 64,
"train_micro_batch_size_per_gpu": 4,
"gradient_accumulation_steps": 4,
"optimizer": {
"type": "AdamW",
"params": {
"lr": 3e-4,
"betas": [0.9, 0.95],
"eps": 1e-8,
"weight_decay": 0.1
}
},
"fp16": {
"enabled": true,
"auto_cast": false,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
},
"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": true
},
"offload_param": {
"device": "cpu",
"pin_memory": true
},
"overlap_comm": true,
"contiguous_gradients": true,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9,
"stage3_gather_16bit_weights_on_model_save": true
},
"activation_checkpointing": {
"partition_activations": false,
"cpu_checkpointing": true,
"contiguous_memory_optimization": false,
"number_checkpoints": null,
"synchronize_checkpoint_boundary": false,
"profile": false
}
}
ZeRO Optimizer States
ZeRO Stage 1: Optimizer State Partitioning
# Configuration for ZeRO Stage 1
= {
zero_stage1_config "train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"zero_optimization": {
"stage": 1,
"reduce_bucket_size": 5e8
},"fp16": {"enabled": True}
}
= deepspeed.initialize(
model_engine, optimizer, _, _ =model,
model=model.parameters(),
model_parameters=zero_stage1_config
config_params )
ZeRO Stage 2: Gradient + Optimizer State Partitioning
# Configuration for ZeRO Stage 2
= {
zero_stage2_config "train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"zero_optimization": {
"stage": 2,
"allgather_partitions": True,
"allgather_bucket_size": 2e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 2e8,
"contiguous_gradients": True
},"fp16": {"enabled": True}
}
ZeRO Stage 3: Full Parameter Partitioning
# Configuration for ZeRO Stage 3
= {
zero_stage3_config "train_batch_size": 32,
"train_micro_batch_size_per_gpu": 8,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"zero_optimization": {
"stage": 3,
"overlap_comm": True,
"contiguous_gradients": True,
"sub_group_size": 1e9,
"reduce_bucket_size": "auto",
"stage3_prefetch_bucket_size": "auto",
"stage3_param_persistence_threshold": "auto",
"stage3_max_live_parameters": 1e9,
"stage3_max_reuse_distance": 1e9
},"fp16": {"enabled": True}
}
# Special handling for ZeRO Stage 3
with deepspeed.zero.Init(config_dict_or_path=zero_stage3_config):
= SimpleModel()
model
= deepspeed.initialize(
model_engine, optimizer, _, _ =model,
model=zero_stage3_config
config_params )
Model Parallelism
Pipeline Parallelism
import deepspeed
from deepspeed.pipe import PipelineModule
class PipelineModel(nn.Module):
def __init__(self, layers_per_stage=2):
super().__init__()
self.layers = nn.ModuleList([
1000, 1000) for _ in range(8)
nn.Linear(
])
def forward(self, x):
for layer in self.layers:
= torch.relu(layer(x))
x return x
# Convert to pipeline model
def partition_layers():
= []
layers for i in range(8):
layers.append(nn.Sequential(1000, 1000),
nn.Linear(
nn.ReLU()
))return layers
# Create pipeline
= PipelineModule(
model =partition_layers(),
layers=4, # Number of pipeline stages
num_stages='type:Linear'
partition_method
)
# Pipeline-specific config
= {
pipeline_config "train_batch_size": 64,
"train_micro_batch_size_per_gpu": 16,
"gradient_accumulation_steps": 1,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"pipeline": {
"stages": "auto",
"partition": "balanced"
},"fp16": {"enabled": True}
}
= deepspeed.initialize(
engine, _, _, _ =model,
model=pipeline_config
config_params )
Tensor Parallelism (with Megatron)
# Example using DeepSpeed with Megatron-style tensor parallelism
import deepspeed
from deepspeed.moe import MoE
class TensorParallelLinear(nn.Module):
def __init__(self, input_size, output_size, world_size):
super().__init__()
self.world_size = world_size
self.rank = torch.distributed.get_rank()
# Split output dimension across ranks
self.output_size_per_partition = output_size // world_size
self.weight = nn.Parameter(
self.output_size_per_partition)
torch.randn(input_size,
)
def forward(self, x):
= torch.matmul(x, self.weight)
output # All-gather outputs from all partitions
= [torch.zeros_like(output) for _ in range(self.world_size)]
gathered
torch.distributed.all_gather(gathered, output)return torch.cat(gathered, dim=-1)
Mixed Precision Training
FP16 Configuration
= {
fp16_config "train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"fp16": {
"enabled": True,
"auto_cast": False,
"loss_scale": 0,
"initial_scale_power": 16,
"loss_scale_window": 1000,
"hysteresis": 2,
"min_loss_scale": 1
}
}
# Custom loss scaling
def train_with_custom_scaling(model_engine, dataloader):
for batch in dataloader:
= model_engine(batch)
outputs = compute_loss(outputs, batch)
loss
# DeepSpeed handles scaling automatically
model_engine.backward(loss) model_engine.step()
BF16 Configuration
= {
bf16_config "train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"bf16": {
"enabled": True
} }
Advanced Features
Activation Checkpointing
= {
activation_checkpointing_config "train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"activation_checkpointing": {
"partition_activations": False,
"cpu_checkpointing": True,
"contiguous_memory_optimization": False,
"number_checkpoints": None,
"synchronize_checkpoint_boundary": False,
"profile": False
},"fp16": {"enabled": True}
}
CPU Offloading
= {
cpu_offload_config "train_batch_size": 32,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"zero_optimization": {
"stage": 3,
"offload_optimizer": {
"device": "cpu",
"pin_memory": True
},"offload_param": {
"device": "cpu",
"pin_memory": True
}
},"fp16": {"enabled": True}
}
Mixture of Experts (MoE)
from deepspeed.moe import MoE
class MoEModel(nn.Module):
def __init__(self):
super().__init__()
self.embedding = nn.Embedding(1000, 512)
# MoE layer
self.moe_layer = MoE(
=512,
hidden_size=nn.Sequential(
expert512, 2048),
nn.Linear(
nn.ReLU(),2048, 512)
nn.Linear(
),=8,
num_experts=2 # Top-k routing
k
)
self.output = nn.Linear(512, 1000)
def forward(self, x):
= self.embedding(x)
x = self.moe_layer(x)
x, _, _ return self.output(x)
= {
moe_config "train_batch_size": 64,
"optimizer": {
"type": "Adam",
"params": {"lr": 0.001}
},"fp16": {"enabled": True}
}
Model Saving and Loading
# Saving model
def save_model(model_engine, checkpoint_dir):
model_engine.save_checkpoint(checkpoint_dir)
# Loading model
def load_model(model_engine, checkpoint_dir):
= model_engine.load_checkpoint(checkpoint_dir)
_, client_states return client_states
# Usage
= "./checkpoints"
checkpoint_dir
save_model(model_engine, checkpoint_dir)
# Later, load the model
= load_model(model_engine, checkpoint_dir) client_states
Troubleshooting
Common Issues and Solutions
# 1. Memory issues
# Solution: Reduce batch size or enable CPU offloading
# 2. Slow training
# Check communication overlap settings
= {
overlap_config "zero_optimization": {
"stage": 2,
"overlap_comm": True,
"contiguous_gradients": True
}
}
# 3. Gradient explosion
# Enable gradient clipping
= {
gradient_clip_config "gradient_clipping": 1.0
}
# 4. Loss scaling issues with FP16
# Use automatic loss scaling
= {
auto_loss_scale_config "fp16": {
"enabled": True,
"loss_scale": 0, # 0 means automatic
"initial_scale_power": 16
} }
Debugging Tools
# Enable profiling
= {
profiling_config "wall_clock_breakdown": True,
"memory_breakdown": True
}
# Memory monitoring
def monitor_memory():
if torch.cuda.is_available():
print(f"GPU Memory: {torch.cuda.memory_allocated() / 1e9:.2f}GB")
print(f"GPU Memory Cached: {torch.cuda.memory_reserved() / 1e9:.2f}GB")
# Communication profiling
def profile_communication():
# Synchronize all processes
torch.distributed.barrier() = time.time()
start_time # Your training step here
torch.distributed.barrier()= time.time()
end_time print(f"Step time: {end_time - start_time:.4f}s")
Best Practices
1. Batch Size Tuning
# Find optimal batch size
def find_optimal_batch_size(model, start_batch_size=16):
= start_batch_size
batch_size while True:
try:
= {
config "train_micro_batch_size_per_gpu": batch_size,
"gradient_accumulation_steps": 64 // batch_size,
"optimizer": {"type": "Adam", "params": {"lr": 0.001}},
"fp16": {"enabled": True}
}
= deepspeed.initialize(
model_engine, _, _, _ =model, config_params=config
model
)
# Test with dummy data
= torch.randn(batch_size, 1000).cuda()
dummy_input = model_engine(dummy_input)
output = output.sum()
loss
model_engine.backward(loss)
model_engine.step()
print(f"Batch size {batch_size} works!")
*= 2
batch_size
except RuntimeError as e:
if "out of memory" in str(e):
print(f"Max batch size: {batch_size // 2}")
break
else:
raise e
2. Learning Rate Scaling
# Scale learning rate with batch size
def scale_learning_rate(base_lr, base_batch_size, actual_batch_size):
return base_lr * (actual_batch_size / base_batch_size)
# Example
= {
base_config "train_batch_size": 1024,
"optimizer": {
"type": "Adam",
"params": {
"lr": scale_learning_rate(3e-4, 64, 1024)
}
} }
3. Efficient Data Loading
class EfficientDataLoader:
def __init__(self, dataset, batch_size, num_workers=4):
self.dataloader = DataLoader(
dataset,=batch_size,
batch_size=True,
shuffle=num_workers,
num_workers=True,
pin_memory=True
persistent_workers
)
def __iter__(self):
for batch in self.dataloader:
# Move to GPU asynchronously
= [x.cuda(non_blocking=True) for x in batch]
batch yield batch
4. Model Architecture Tips
# Use activation checkpointing for large models
class CheckpointedModel(nn.Module):
def __init__(self):
super().__init__()
self.layers = nn.ModuleList([
1000, 1000) for _ in range(100)
nn.Linear(
])
def forward(self, x):
# Checkpoint every 10 layers
for i in range(0, len(self.layers), 10):
def create_forward(start_idx):
def forward_chunk(x):
for j in range(start_idx, min(start_idx + 10, len(self.layers))):
= torch.relu(self.layers[j](x))
x return x
return forward_chunk
= torch.utils.checkpoint.checkpoint(create_forward(i), x)
x return x
5. Multi-Node Training Script
# launch_script.py
import subprocess
import sys
def launch_distributed_training():
= [
cmd "deepspeed",
"--num_gpus=8",
"--num_nodes=4",
"--master_addr=your_master_node",
"--master_port=29500",
"train.py",
"--deepspeed_config=ds_config.json"
]
subprocess.run(cmd)
if __name__ == "__main__":
launch_distributed_training()
Conclusion
This guide covers the essential aspects of using DeepSpeed with PyTorch. Remember to experiment with different configurations based on your specific model architecture and hardware setup. Start with simpler configurations (ZeRO Stage 1-2) and gradually move to more advanced features (ZeRO Stage 3, CPU offloading) as needed.
Getting Started
- Start with ZeRO Stage 1 or 2 for your first DeepSpeed experiments
- Use FP16 mixed precision to reduce memory usage
- Tune batch sizes to maximize GPU utilization
- Monitor memory usage and communication overhead
- Scale learning rates appropriately with batch size changes
Common Pitfalls
- Not setting
gradient_accumulation_steps
correctly - Using too large batch sizes leading to OOM errors
- Not enabling communication overlap for better performance
- Forgetting to scale learning rates when changing batch sizes