Pytorch Distributed
When models become larger and data increases, a single GPU can no longer meet training requirements.
Distributed training enables parallel computation across multiple GPUs or even multiple machines, significantly reducing training time.
This section provides a detailed introduction to distributed training techniques in PyTorch, including DataParallel, DistributedDataParallel, and mixed-precision distributed training.
* * *
## 1. Fundamentals of Distributed Training
### 1.1 Why Distributed Training Is Needed
The scale of deep learning models has been increasing year by year, leading to longer training times. The core objective of distributed training is to distribute computational tasks across multiple computing units, enabling large-scale models and datasets to be trained within an acceptable timeframe.
Distributed training primarily addresses two issues:
* **Insufficient GPU memory**: Large models require substantial GPU memory for parameters, optimizer states, and gradients.
* **Excessively long training time**: Training large models on a single GPU may take weeks or even months.
### 1.2 Two Main Modes of Distributed Training
Distributed training mainly falls into two modes:
| Mode | Principle | Applicable Scenarios |
| --- | --- | --- |
| Data Parallel | Each computing unit holds a full copy of the model; different units process different data subsets | Model fits into a single GPUβs memory; large dataset requiring faster training; small teams with limited hardware resources |
| Model Parallel | The model is split across multiple computing units; each unit holds only part of the model | Model is too large to fit on a single GPU; dedicated clusters for multi-machine training |
> In practical production environments, data parallelism is the most commonly used approach. PyTorch provides two implementations: DataParallel and DistributedDataParallel.
* * *
## 2. DataParallel Usage Guide
### 2.1 Single-Machine Multi-GPU: DataParallel
DataParallel (DP) is the simplest way to leverage multiple GPUs in PyTorch. It requires only minor code modifications to enable training across multiple GPUs on a single machine.
## Examples
import torch
import torch.nn as nn
import torch.optim as optim
# ββ Model Definition ββββββββββββββββββββββββββββββββββββββ
class SimpleModel(nn.Module):
def __init__ (self):
super(). __init__ ()
self.net= nn.Sequential(
nn.Linear(128,256),
nn.ReLU(),
nn.Linear(256,256),
nn.ReLU(),
nn.Linear(256,10)
)
def forward(self, x):
return self.net(x)
# Check Number of Available GPUs
print(f"Number of Available GPUs: {torch.cuda.device_count()}")
# Create Model and Move to GPU
model = SimpleModel()
# ββ Method 1: Using DataParallel (Simplest Approach)βββββββ
if torch.cuda.device_count()>1:
model = nn.DataParallel(model)
# Move to GPU (If Using DataParallel, device_ids Will Be Handled Automatically)
device = torch.device("cuda"if torch.cuda.is_available()else"cpu")
model = model.to(device)
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-3)
# ββ Training Loop ββββββββββββββββββββββββββββββββββββββ
def train_epoch_dp(model, loader, criterion, optimizer, device):
model.train()
total_loss =0
correct =0
total =0
for inputs, labels in loader:
inputs = inputs.to(device, non_blocking=True)
labels = labels.to(device, non_blocking=True)
optimizer.zero_grad()
outputs = model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item() * inputs.size(0)
_, predicted = outputs.max(1)
correct += predicted.eq(labels).sum().item()
total += labels.size(0)
return total_loss / total, correct / total
# Mock Data
train_loader =[
(torch.randn(32,128), torch.randint(0,10,(32,)))for _ in range(10)
]
# Start Training
for epoch in range(3):
loss, acc = train_epoch_dp(model, train_loader, criterion, optimizer, device)
print(f"Epoch {epoch+1}: Loss={loss:.4f}, Acc={acc:.4f}")
print("DataParallel Training Complete!")
### 2.2 How DataParallel Works
The DataParallel workflow consists of the following steps:
1. **Model Replication**: Copy the model to each GPU.
2. **Data Distribution**: Split the batch data evenly across GPUs.
3. **Parallel Computation**: Each GPU independently performs forward propagation.
4. **Gradient Aggregation**: Gather gradients from all GPUs to the primary (master) GPU.
5. **Parameter Update**: The master GPU updates parameters and synchronizes them to other GPUs.
## Examples
# DataParallel Key Parameter Description
model = nn.DataParallel(
module,# Model to Parallelize (Required)
device_ids=[0,1,2,3],# GPU Device IDs to Use, Defaults to All GPUs
output_device=0,# Device to Aggregate Output Results, Default is First GPU
dim=0# Dimension for Data Splitting, Default is Batch Dimension
)
# Check Current Model Device
print(f"Model Device: {next(model.parameters()).device}")
# View Actually Used Devices
print(f"Number of GPUs Used: {model.device_ids if hasattr(model, 'device_ids') else 'N/A'}")
> DataParallel is simple and easy to use, but has two main drawbacks: 1) high GPU memory pressure on the master GPU (due to gradient aggregation); 2) relatively low communication efficiency among GPUs. Therefore, the official recommendation is to use DistributedDataParallel.
* * *
## 3. DistributedDataParallel Explained
### 3.1 DDP Fundamentals
DistributedDataParallel (DDP) is PyTorchβs recommended approach for distributed training. Compared to DataParallel, DDP offers the following advantages:
* Each GPU computes gradients independentlyβno need to aggregate gradients on a master GPU.
* Uses efficient gradient synchronization algorithms (Ring AllReduce).
* Supports multi-node, multi-GPU training.
* Delivers faster training speed and more efficient GPU memory utilization.
### 3.2 Single-Machine Multi-GPU DDP Training
## Examples
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
import os
# ββ InitializationDistributed Environment ββββββββββββββββββββββββββββ
def setup(rank, world_size):
"""Set Up Distributed Environment"""
# Set GPU for Current Process
torch.cuda.set_device(rank)
# InitializationProcess Group
dist.init_process_group(
backend="nccl",# Use NCCL Backend (Recommended for GPUs)
init_method="env://",# Environment Variable Initialization Method
world_size=world_size,# Total Number of Processes
rank=rank # Current Process Rank
)
def cleanup():
"""CleanupDistributed Environment"""
dist.destroy_process_group()
# ββ DataLoader (Distributed Supported)βββββββββββββββββββββββ
def get_distributed_loader(batch_size, world_size):
"""Create Distributed-Supported DataLoader"""
from torch.utils.data import DataLoader, DistributedSampler
# Mock DataSet
dataset = torch.utils.data.TensorDataset(
torch.randn(100,3,32,32),
torch.randint(0,10,(100,))
)
# DistributedSampler Automatically Splits Data
sampler = DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=True
)
loader = DataLoader(
dataset,
batch_size=batch_size,
sampler=sampler,
num_workers=2,
pin_memory=True
)
return loader
# ββ Model Definition ββββββββββββββββββββββββββββββββββββββ
class ImageClassifier(nn.Module):
def __init__ (self):
super(). __init__ ()
self.features= nn.Sequential(
nn.Conv2d(3,32,3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(32,64,3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d(1),
nn.Flatten()
)
self.classifier= nn.Linear(64,10)
def forward(self, x):
x =self.features(x)
x =self.classifier(x)
return x
# ββ DDP Training Function ββββββββββββββββββββββββββββββββββ
def train_ddp(rank, world_size, epochs=3):
"""Distributed TrainingMain Function"""
# Initialization
setup(rank, world_size)
# Create Model and Move to Corresponding GPU
model = ImageClassifier().cuda(rank)
# Wrap as DDP Model
ddp_model = DDP(
model,
device_ids=,# Specify Device for Current Process
output_device=rank,
find_unused_parameters=False# Detect Unused Parameters
)
# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3)
# Get Data Loader
loader = get_distributed_loader(batch_size=16, world_size=world_size)
# Training Loop
for epoch in range(epochs):
# Set epoch No. at the beginning of each epoch.οΌfor shuffling data)
loader.sampler.set_epoch(epoch)
for batch_idx,(inputs, labels)in enumerate(loader):
inputs = inputs.cuda(rank, non_blocking=True)
labels = labels.cuda(rank, non_blocking=True)
optimizer.zero_grad()
outputs = ddp_model(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
if rank ==0 and batch_idx % 5==0:
print(f"Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item():.4f}")
# Cleanup
if rank ==0:
print("Training Complete!")
cleanup()
# Note: rank and world_size must be specified via command-line arguments during actual execution
# This Code Needs to Run Independently in Each Process
### 3.3 Launching Distributed Training
PyTorch distributed training must be launched using `torchrun` or `torch.distributed.launch`. Common launch methods are shown below:
## Examples
# Method 1: Using torchrun (Recommended, PyTorch 2.0+οΌ
# File: train_ddp.py
"""
Usage:
torchrun --nproc_per_node=4 train_ddp.py
"""
# Method 2: Using python -m (Traditional Method)
"""
python -m torch.distributed.launch --nproc_per_node=4 train_ddp.py
"""
# Method 3: Multi-Node Multi-GPU Launch
# Machine 1 (Master Node)
torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr=192.168.1.1 --master_port=29500 train_ddp.py
# Machine 2 (From node)
torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 --master_addr=192.168.1.1 --master_port=29500 train_ddp.py
# Parameter Description:
# --nproc_per_node: Number of GPUs per Node
# --nnodes: Number of Nodes
# --node_rank: Current node No..οΌFrom 0 Start)
# --master_addr: Master Node IP Address
# --master_port: Master node port No..
### 3.4 Complete DDP Training Script
## Examples
# Complete DDP Training Script (Save as train_ddp.pyοΌ
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import DataLoader, DistributedSampler
import argparse
import os
def parse_args():
parser= argparse.ArgumentParser()
parser.add_argument("--local_rank",type=int, default=-1,help="by torchrun Automatic propagation")
parser.add_argument("--epochs",type=int, default=10)
parser.add_argument("--batch_size",type=int, default=32)
parser.add_argument("--lr",type=float, default=1e-3)
return parser.parse_args()
class TrainModel(nn.Module):
def __init__ (self):
super(). __init__ ()
self.net= nn.Sequential(
nn.Conv2d(3,64,3, padding=1),
nn.ReLU(),
nn.MaxPool2d(2),
nn.Conv2d(64,128,3, padding=1),
nn.ReLU(),
nn.AdaptiveAvgPool2d(1),
nn.Flatten(),
nn.Linear(128,10)
)
def forward(self, x):
return self.net(x)
def setup(rank, world_size):
os.environ="localhost
YouTip