Adapted from pytorch-cifar to make it Horovod compatible
I'm playing with PyTorch on the CIFAR10 dataset.
- Python 3.6+
- PyTorch 1.0+
- Horovod with Pytorch support
# Start training with:
# You can manually resume the training with:
python --resume --lr=0.01
Model | Acc. |
VGG16 | 92.64% |
ResNet18 | 93.02% |
ResNet50 | 93.62% |
ResNet101 | 93.75% |
RegNetX_200MF | 94.24% |
RegNetY_400MF | 94.29% |
MobileNetV2 | 94.43% |
ResNeXt29(32x4d) | 94.73% |
ResNeXt29(2x64d) | 94.82% |
SimpleDLA | 94.89% |
DenseNet121 | 95.04% |
PreActResNet18 | 95.11% |
DPN92 | 95.16% |
DLA | 95.47% |
This tutorial will take you step by step through the changes required in the existing training code ( to run it across multiple GPUs using Horovod.
The final script( with all the changes has been included in the repository.
Add the following code after from utils import progress_bar
import horovod.torch as hvd
Add the following code after args = parser.parse_args()
# Horovod: initialize Horovod.
With Horovod, usually one GPU is assigned per process to simplify distributed training across processes.
Comment out or remove the following device/GPU allocation code
device = 'cuda' if torch.cuda.is_available() else 'cpu'
Add the following code after hvd.init()
## Get the rank of current process
device = hvd.local_rank()
# Pin GPU to be used to process local rank (one GPU per process)
For distributed training, it is efficient to have each copy((on different processes)) of the model work with mutually exclusive subsamples of the training dataset.
For this reason, we add a DistributedSampler to sample the training examples. Notice that we add the sampler as an argument to the DataLoader
Replace the following lines:-
trainloader =
trainset, batch_size=128, shuffle=True, num_workers=2)
# Partition dataset among workers using DistributedSampler
train_sampler =
trainset, shuffle=True, num_replicas=hvd.size(), rank=hvd.rank())
trainloader =
trainset, batch_size=128, num_workers=2, sampler=train_sampler)
Similarly, we can distribute the evaluation load across processes during the validation phase
Replace the following lines:-
testloader =
testset, batch_size=100, shuffle=False, num_workers=2)
# Partition val dataset among workers using DistributedSampler
val_sampler =
testset, shuffle=True, num_replicas=hvd.size(), rank=hvd.rank())
testloader =
testset, batch_size=100, shuffle=False, num_workers=2, sampler=val_sampler)
Instead of loading the checkpoint from each worker process, it is more efficient to load the checkpoint through a single worker process(typically the root) and broadcast it to others.
This is usually done in tandem with the checkpointing (Use single processes to store checkpoints)
Replace the following code:
if args.resume:
# Load checkpoint.
print('==> Resuming from checkpoint..')
if args.resume and hvd.rank() == 0:
# Load checkpoint.
print('==> Resuming from checkpoint..')
As mentioned in the previous section, the checkpoint and model parameters are broadcast(from the root process) and synchronized with other processes.
Add the following lines of code after the checkpoint reading code :-
start_epoch = hvd.broadcast(torch.Tensor(1).fill_(start_epoch)[0], name="start_epoch", root_rank=0)
start_epoch = int(start_epoch)
Also add :-
hvd.broadcast_parameters(net.state_dict(), 0)
Horovod uses an operation that averages gradients across workers. Gradient averaging typically requires a corresponding increase in learning rate to make bigger steps in the direction of a higher-quality gradient.
Replace optimizer = optim.SGD(net.parameters(),, momentum=0.9, weight_decay=5e-4)
optimizer = optim.SGD(net.parameters(), * hvd.size(),
momentum=0.9, weight_decay=5e-4)
## Add distributed optimizer
optimizer = hvd.DistributedOptimizer(optimizer, named_parameters=net.named_parameters())
Add the following line after scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)
hvd.broadcast_optimizer_state(optimizer, 0)
As mentioned in Section 4, every process works on their own subsample of the training set. We don't want every process to report their own training progress but rather have a single process report the aggregate training metrics.
For reporting aggregate metrics, we need to average them across all the processes.
Add the following lines after correct += predicted.eq(targets).sum().item()
train_loss_sum_across_batches_multiprocess = hvd.allreduce(torch.Tensor(1).fill_(train_loss)[0],
name="train_loss_sum_multiprocess", op=Sum)
total_sum_across_batches_multiprocess = hvd.allreduce(torch.Tensor(1).fill_(total)[0],
name="total_sum_multiprocess", op=Sum)
correct_sum_across_batches_multiprocess = hvd.allreduce(torch.Tensor(1).fill_(correct)[0],
name="correct_sum_multiprocess", op=Sum)
These lines are summing up the per process loss, number of total examples and number of correct examples respectively and storing them into new variables, which will later be used for reporting.
To enable reports from a single(root) process only, replace the following code :-
progress_bar(batch_idx, len(trainloader), 'Loss: %.3f | Acc: %.3f%% (%d/%d)'
% (train_loss/(batch_idx+1), 100.*correct/total, correct, total))
if hvd.local_rank() == 0:
progress_bar(batch_idx, len(trainloader), '[Train] Average(all procs) Loss : %.3f | Average(all procs) Acc: %.3f%% (%d/%d)'
% (train_loss_sum_across_batches_multiprocess/((batch_idx+1) * hvd.size()),
correct_sum_across_batches_multiprocess, total_sum_across_batches_multiprocess))
Note that similar aggregation is also performed for validation (See line 179-190 inside test function)
Assuming the libraries mentioned as pre-requisites are installed in your python environment :-
horovodrun -np <num-gpus> python <args>