YouTip LogoYouTip

Pytorch Distributed

When models become larger and data increases, a single GPU can no longer meet training requirements. Distributed training enables parallel computation across multiple GPUs or even multiple machines, significantly reducing training time. This section provides a detailed introduction to distributed training techniques in PyTorch, including DataParallel, DistributedDataParallel, and mixed-precision distributed training. * * * ## 1. Fundamentals of Distributed Training ### 1.1 Why Distributed Training Is Needed The scale of deep learning models has been increasing year by year, leading to longer training times. The core objective of distributed training is to distribute computational tasks across multiple computing units, enabling large-scale models and datasets to be trained within an acceptable timeframe. Distributed training primarily addresses two issues: * **Insufficient GPU memory**: Large models require substantial GPU memory for parameters, optimizer states, and gradients. * **Excessively long training time**: Training large models on a single GPU may take weeks or even months. ### 1.2 Two Main Modes of Distributed Training Distributed training mainly falls into two modes: | Mode | Principle | Applicable Scenarios | | --- | --- | --- | | Data Parallel | Each computing unit holds a full copy of the model; different units process different data subsets | Model fits into a single GPU’s memory; large dataset requiring faster training; small teams with limited hardware resources | | Model Parallel | The model is split across multiple computing units; each unit holds only part of the model | Model is too large to fit on a single GPU; dedicated clusters for multi-machine training | > In practical production environments, data parallelism is the most commonly used approach. PyTorch provides two implementations: DataParallel and DistributedDataParallel. * * * ## 2. DataParallel Usage Guide ### 2.1 Single-Machine Multi-GPU: DataParallel DataParallel (DP) is the simplest way to leverage multiple GPUs in PyTorch. It requires only minor code modifications to enable training across multiple GPUs on a single machine. ## Examples import torch import torch.nn as nn import torch.optim as optim # ── Model Definition ────────────────────────────────────── class SimpleModel(nn.Module): def __init__ (self): super(). __init__ () self.net= nn.Sequential( nn.Linear(128,256), nn.ReLU(), nn.Linear(256,256), nn.ReLU(), nn.Linear(256,10) ) def forward(self, x): return self.net(x) # Check Number of Available GPUs print(f"Number of Available GPUs: {torch.cuda.device_count()}") # Create Model and Move to GPU model = SimpleModel() # ── Method 1: Using DataParallel (Simplest Approach)─────── if torch.cuda.device_count()>1: model = nn.DataParallel(model) # Move to GPU (If Using DataParallel, device_ids Will Be Handled Automatically) device = torch.device("cuda"if torch.cuda.is_available()else"cpu") model = model.to(device) # Loss function and optimizer criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=1e-3) # ── Training Loop ────────────────────────────────────── def train_epoch_dp(model, loader, criterion, optimizer, device): model.train() total_loss =0 correct =0 total =0 for inputs, labels in loader: inputs = inputs.to(device, non_blocking=True) labels = labels.to(device, non_blocking=True) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss += loss.item() * inputs.size(0) _, predicted = outputs.max(1) correct += predicted.eq(labels).sum().item() total += labels.size(0) return total_loss / total, correct / total # Mock Data train_loader =[ (torch.randn(32,128), torch.randint(0,10,(32,)))for _ in range(10) ] # Start Training for epoch in range(3): loss, acc = train_epoch_dp(model, train_loader, criterion, optimizer, device) print(f"Epoch {epoch+1}: Loss={loss:.4f}, Acc={acc:.4f}") print("DataParallel Training Complete!") ### 2.2 How DataParallel Works The DataParallel workflow consists of the following steps: 1. **Model Replication**: Copy the model to each GPU. 2. **Data Distribution**: Split the batch data evenly across GPUs. 3. **Parallel Computation**: Each GPU independently performs forward propagation. 4. **Gradient Aggregation**: Gather gradients from all GPUs to the primary (master) GPU. 5. **Parameter Update**: The master GPU updates parameters and synchronizes them to other GPUs. ## Examples # DataParallel Key Parameter Description model = nn.DataParallel( module,# Model to Parallelize (Required) device_ids=[0,1,2,3],# GPU Device IDs to Use, Defaults to All GPUs output_device=0,# Device to Aggregate Output Results, Default is First GPU dim=0# Dimension for Data Splitting, Default is Batch Dimension ) # Check Current Model Device print(f"Model Device: {next(model.parameters()).device}") # View Actually Used Devices print(f"Number of GPUs Used: {model.device_ids if hasattr(model, 'device_ids') else 'N/A'}") > DataParallel is simple and easy to use, but has two main drawbacks: 1) high GPU memory pressure on the master GPU (due to gradient aggregation); 2) relatively low communication efficiency among GPUs. Therefore, the official recommendation is to use DistributedDataParallel. * * * ## 3. DistributedDataParallel Explained ### 3.1 DDP Fundamentals DistributedDataParallel (DDP) is PyTorch’s recommended approach for distributed training. Compared to DataParallel, DDP offers the following advantages: * Each GPU computes gradients independentlyβ€”no need to aggregate gradients on a master GPU. * Uses efficient gradient synchronization algorithms (Ring AllReduce). * Supports multi-node, multi-GPU training. * Delivers faster training speed and more efficient GPU memory utilization. ### 3.2 Single-Machine Multi-GPU DDP Training ## Examples import torch import torch.nn as nn import torch.optim as optim import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP import os # ── InitializationDistributed Environment ──────────────────────────── def setup(rank, world_size): """Set Up Distributed Environment""" # Set GPU for Current Process torch.cuda.set_device(rank) # InitializationProcess Group dist.init_process_group( backend="nccl",# Use NCCL Backend (Recommended for GPUs) init_method="env://",# Environment Variable Initialization Method world_size=world_size,# Total Number of Processes rank=rank # Current Process Rank ) def cleanup(): """CleanupDistributed Environment""" dist.destroy_process_group() # ── DataLoader (Distributed Supported)─────────────────────── def get_distributed_loader(batch_size, world_size): """Create Distributed-Supported DataLoader""" from torch.utils.data import DataLoader, DistributedSampler # Mock DataSet dataset = torch.utils.data.TensorDataset( torch.randn(100,3,32,32), torch.randint(0,10,(100,)) ) # DistributedSampler Automatically Splits Data sampler = DistributedSampler( dataset, num_replicas=world_size, rank=rank, shuffle=True ) loader = DataLoader( dataset, batch_size=batch_size, sampler=sampler, num_workers=2, pin_memory=True ) return loader # ── Model Definition ────────────────────────────────────── class ImageClassifier(nn.Module): def __init__ (self): super(). __init__ () self.features= nn.Sequential( nn.Conv2d(3,32,3, padding=1), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(32,64,3, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d(1), nn.Flatten() ) self.classifier= nn.Linear(64,10) def forward(self, x): x =self.features(x) x =self.classifier(x) return x # ── DDP Training Function ────────────────────────────────── def train_ddp(rank, world_size, epochs=3): """Distributed TrainingMain Function""" # Initialization setup(rank, world_size) # Create Model and Move to Corresponding GPU model = ImageClassifier().cuda(rank) # Wrap as DDP Model ddp_model = DDP( model, device_ids=,# Specify Device for Current Process output_device=rank, find_unused_parameters=False# Detect Unused Parameters ) # Loss function and optimizer criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(ddp_model.parameters(), lr=1e-3) # Get Data Loader loader = get_distributed_loader(batch_size=16, world_size=world_size) # Training Loop for epoch in range(epochs): # Set epoch No. at the beginning of each epoch.(for shuffling data) loader.sampler.set_epoch(epoch) for batch_idx,(inputs, labels)in enumerate(loader): inputs = inputs.cuda(rank, non_blocking=True) labels = labels.cuda(rank, non_blocking=True) optimizer.zero_grad() outputs = ddp_model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() if rank ==0 and batch_idx % 5==0: print(f"Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item():.4f}") # Cleanup if rank ==0: print("Training Complete!") cleanup() # Note: rank and world_size must be specified via command-line arguments during actual execution # This Code Needs to Run Independently in Each Process ### 3.3 Launching Distributed Training PyTorch distributed training must be launched using `torchrun` or `torch.distributed.launch`. Common launch methods are shown below: ## Examples # Method 1: Using torchrun (Recommended, PyTorch 2.0+οΌ‰ # File: train_ddp.py """ Usage: torchrun --nproc_per_node=4 train_ddp.py """ # Method 2: Using python -m (Traditional Method) """ python -m torch.distributed.launch --nproc_per_node=4 train_ddp.py """ # Method 3: Multi-Node Multi-GPU Launch # Machine 1 (Master Node) torchrun --nnodes=2 --nproc_per_node=4 --node_rank=0 --master_addr=192.168.1.1 --master_port=29500 train_ddp.py # Machine 2 (From node) torchrun --nnodes=2 --nproc_per_node=4 --node_rank=1 --master_addr=192.168.1.1 --master_port=29500 train_ddp.py # Parameter Description: # --nproc_per_node: Number of GPUs per Node # --nnodes: Number of Nodes # --node_rank: Current node No..(From 0 Start) # --master_addr: Master Node IP Address # --master_port: Master node port No.. ### 3.4 Complete DDP Training Script ## Examples # Complete DDP Training Script (Save as train_ddp.pyοΌ‰ import torch import torch.nn as nn import torch.optim as optim import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader, DistributedSampler import argparse import os def parse_args(): parser= argparse.ArgumentParser() parser.add_argument("--local_rank",type=int, default=-1,help="by torchrun Automatic propagation") parser.add_argument("--epochs",type=int, default=10) parser.add_argument("--batch_size",type=int, default=32) parser.add_argument("--lr",type=float, default=1e-3) return parser.parse_args() class TrainModel(nn.Module): def __init__ (self): super(). __init__ () self.net= nn.Sequential( nn.Conv2d(3,64,3, padding=1), nn.ReLU(), nn.MaxPool2d(2), nn.Conv2d(64,128,3, padding=1), nn.ReLU(), nn.AdaptiveAvgPool2d(1), nn.Flatten(), nn.Linear(128,10) ) def forward(self, x): return self.net(x) def setup(rank, world_size): os.environ="localhost
← Vue3 Is AttrPytorch Amp β†’