From 9af38c02765b2070b217f50dd20ae5b129584b16 Mon Sep 17 00:00:00 2001 From: Luo Mai Date: Sat, 19 Oct 2019 14:30:08 +0100 Subject: [PATCH] Enable NCCL in Sync SGD optimizer. (#159) * Enable NCCL in Sync SGD optimizer. * update names * Fix * Minor change * fix lint * fix * bypass np=1 * Improve naming * Extra check for NCCL. * Fix bug. * Add a Kungfu option --- .../synchronisation/benchmark_kungfu.py | 7 ++- .../modelarts/kungfu/run_benchmark_kungfu.sh | 4 +- .../cpp/src/tensorflow/ops/collective_gpu.cpp | 8 ++-- srcs/python/kungfu/ops/__init__.py | 7 ++- srcs/python/kungfu/ops/collective.py | 48 +++++++++---------- srcs/python/kungfu/optimizers/core.py | 3 +- srcs/python/kungfu/optimizers/sync_sgd.py | 31 ++++++++++-- 7 files changed, 68 insertions(+), 40 deletions(-) diff --git a/benchmarks/synchronisation/benchmark_kungfu.py b/benchmarks/synchronisation/benchmark_kungfu.py index e73f057a9..a10d57abb 100755 --- a/benchmarks/synchronisation/benchmark_kungfu.py +++ b/benchmarks/synchronisation/benchmark_kungfu.py @@ -78,8 +78,13 @@ elif args.kungfu == 'async-sgd': from kungfu.optimizers import PeerModelAveragingOptimizer opt = PeerModelAveragingOptimizer(opt) +elif args.kungfu == 'sync-sgd-nccl': + from kungfu.optimizers import SyncSGDOptimizer + opt = SyncSGDOptimizer(opt, nccl=True, nccl_fusion=True) +elif args.kungfu == 'ideal': + opt = opt else: - pass + raise Exception('Unknown kungfu option') data = tf.random_uniform([args.batch_size, 224, 224, 3]) target = tf.random_uniform([args.batch_size, 1], diff --git a/benchmarks/synchronisation/modelarts/kungfu/run_benchmark_kungfu.sh b/benchmarks/synchronisation/modelarts/kungfu/run_benchmark_kungfu.sh index e40836ccc..9a359a2c8 100644 --- a/benchmarks/synchronisation/modelarts/kungfu/run_benchmark_kungfu.sh +++ b/benchmarks/synchronisation/modelarts/kungfu/run_benchmark_kungfu.sh @@ -8,6 +8,7 @@ BATCH_SIZE=64 NUM_WORKERS=16 MODEL="ResNet50" NUM_ITERS=100 +KUNGFU="sync-sgd" # Assume each host has 8 GPUs. WORKER_HOSTS="169.254.128.207:8,169.254.128.185:8" @@ -27,4 +28,5 @@ export TF_CPP_MIN_LOG_LEVEL=1 run_experiment $NUM_WORKERS python3 $SCRIPT_PATH \ --batch-size=$BATCH_SIZE \ --model=$MODEL \ - --num-iters=$NUM_ITERS + --num-iters=$NUM_ITERS \ + --kungfu=$KUNGFU diff --git a/srcs/cpp/src/tensorflow/ops/collective_gpu.cpp b/srcs/cpp/src/tensorflow/ops/collective_gpu.cpp index bbdc781e1..2602d3c98 100644 --- a/srcs/cpp/src/tensorflow/ops/collective_gpu.cpp +++ b/srcs/cpp/src/tensorflow/ops/collective_gpu.cpp @@ -9,9 +9,9 @@ namespace tensorflow { -REGISTER_OP("StartGpuGroup").Input("input: string"); +REGISTER_OP("StartNcclScheduler").Input("input: string"); -class StartGpuGroup : public OpKernel +class StartNcclScheduler : public OpKernel { using OpKernel::OpKernel; @@ -28,8 +28,8 @@ class StartGpuGroup : public OpKernel } }; -REGISTER_KERNEL_BUILDER(Name("StartGpuGroup").Device(DEVICE_CPU), - StartGpuGroup); +REGISTER_KERNEL_BUILDER(Name("StartNcclScheduler").Device(DEVICE_CPU), + StartNcclScheduler); REGISTER_OP("AllReduceGpu") .Attr("T: {int32, int64, float16, float32, float64}") diff --git a/srcs/python/kungfu/ops/__init__.py b/srcs/python/kungfu/ops/__init__.py index 7d08593af..42135ce56 100644 --- a/srcs/python/kungfu/ops/__init__.py +++ b/srcs/python/kungfu/ops/__init__.py @@ -1,7 +1,6 @@ from .adapt import get_init_checkpoint, resize_cluster -from .collective import (all_reduce, all_reduce_gpu, barrier, broadcast, - cpu_group_all_reduce, gpu_group_all_reduce, - group_all_reduce) +from .collective import (all_reduce, barrier, broadcast, group_all_reduce, + group_nccl_all_reduce) from .loader import _has_gpu, _init_lib, _op_lib from .local import save_variable, save_variables from .monitor import global_noise_scale @@ -288,7 +287,7 @@ def partial_exchange_with_gpu_allreduce(ts, for i, partition in enumerate(groups): negotiated_partition = tf.cond( tf.equal(tf.mod(gs - 1, num_partitions), i), - lambda partition=partition: gpu_group_all_reduce(partition), + lambda partition=partition: group_nccl_all_reduce(partition), lambda partition=partition: partition) if len(partition) == 1: negotiated_partition = [negotiated_partition] diff --git a/srcs/python/kungfu/ops/collective.py b/srcs/python/kungfu/ops/collective.py index 24fb2b3bd..239613094 100644 --- a/srcs/python/kungfu/ops/collective.py +++ b/srcs/python/kungfu/ops/collective.py @@ -14,38 +14,36 @@ def all_reduce(t): return _op_lib.all_reduce(t, input_tensor_name=t.name) -def all_reduce_gpu(t): - return _op_lib.all_reduce_gpu(t, input_tensor_name=t.name) - - -def start_gpu_group(*args, **kwargs): - return _op_lib.start_gpu_group(*args, **kwargs) +def _maybe_group_all_reduce(ts, group_all_reduce_fn): + # a helper function to bypass all_reduce for np = 1 + _rank, np = peer_info() + import tensorflow as tf + return tf.cond(np > 1, lambda: group_all_reduce_fn(ts), + lambda: [tf.identity(t) for t in ts]) -def cpu_group_all_reduce(ts): +def group_all_reduce(ts): return [all_reduce(t) for t in ts] -def gpu_group_all_reduce(ts): - names = [t.name for t in ts] - names = list(sorted(names)) # FIXME: use topsort - import tensorflow as tf - with tf.control_dependencies([ - start_gpu_group(names), - ]): - return [all_reduce_gpu(t) for t in ts] +def _nccl_all_reduce(t): + return _op_lib.all_reduce_gpu(t, input_tensor_name=t.name) -def _group_all_reduce(ts, use_nccl): - if use_nccl: - print('Try to use GPU NCCL to perform all-reduce') - return gpu_group_all_reduce(ts) - print('Try to use KungFu MPI to perform all-reduce') - return cpu_group_all_reduce(ts) +def _start_nccl_scheduler(*args, **kwargs): + if hasattr(_op_lib, 'start_nccl_scheduler'): + return _op_lib.start_nccl_scheduler(*args, **kwargs) + else: + raise RuntimeError("KungFu is not installed with NCCL.") -def group_all_reduce(ts, use_nccl=False): - _rank, np = peer_info() +def group_nccl_all_reduce(ts): + names = [t.name for t in ts] + if len(names) > 1: + print("WARNING: Please fuse tensors before using NCCL.") + names = list(sorted(names)) # FIXME: use topsort import tensorflow as tf - return tf.cond(np > 1, lambda: _group_all_reduce(ts, use_nccl), - lambda: [tf.identity(t) for t in ts]) + with tf.control_dependencies([ + _start_nccl_scheduler(names), + ]): + return [_nccl_all_reduce(t) for t in ts] diff --git a/srcs/python/kungfu/optimizers/core.py b/srcs/python/kungfu/optimizers/core.py index 9f1103a24..09e5b0bcf 100644 --- a/srcs/python/kungfu/optimizers/core.py +++ b/srcs/python/kungfu/optimizers/core.py @@ -46,7 +46,8 @@ def compute_gradients(self, *args, **kwargs): # An optimizer could minimize variables other than tf.trainable_variables # It is safer to get the correct list of variables that need synchornisation here - self._model_variables = [v for g, v in grads_and_vars] + if self._model_variables is None: + self._model_variables = [v for g, v in grads_and_vars] return grads_and_vars diff --git a/srcs/python/kungfu/optimizers/sync_sgd.py b/srcs/python/kungfu/optimizers/sync_sgd.py index 05e306b3f..b04ddf947 100644 --- a/srcs/python/kungfu/optimizers/sync_sgd.py +++ b/srcs/python/kungfu/optimizers/sync_sgd.py @@ -1,8 +1,8 @@ import tensorflow as tf from kungfu.ops import (broadcast, global_noise_scale, group_all_reduce, - peer_info) + group_nccl_all_reduce, peer_info) -from .core import KungFuOptimizer, fuse +from .core import KungFuOptimizer, defuse, fuse class SyncSGDOptimizer(KungFuOptimizer): @@ -19,22 +19,45 @@ class SyncSGDOptimizer(KungFuOptimizer): Optional name prefix for the operations created when applying gradients. Defaults to "Distributed" followed by the provided optimizer type. + nccl: + Optional flag for using NCCL to perform all-reduce. + nccl_fusion: + Optional flag to fuse all gradients before launch NCCL all-reduce. + This is useful to amortise the cost of NCCL calls. use_locking: Whether to use locking when updating variables. See Optimizer.__init__ for more info. """ - def __init__(self, optimizer, name=None, use_locking=False): + def __init__(self, + optimizer, + nccl=False, + nccl_fusion=True, + name=None, + use_locking=False): super(SyncSGDOptimizer, self).__init__(optimizer, name, use_locking=use_locking) _rank, np = peer_info() # FIXME: use type of gradient self._num_workers = tf.cast(np, tf.float32) + self._nccl = nccl + self._nccl_fusion = nccl_fusion def apply_gradients(self, grads_and_vars, **kwargs): gradients, variables = list(zip(*grads_and_vars)) - summed_gradients = group_all_reduce(gradients) + + if self._nccl: + if self._nccl_fusion: + fused_grad = fuse(gradients) + summed_fused_gradients = group_nccl_all_reduce([fused_grad]) + summed_gradients = defuse(summed_fused_gradients[0], + [g.shape for g in gradients]) + else: + summed_gradients = group_nccl_all_reduce(gradients) + else: + summed_gradients = group_all_reduce(gradients) + reduced_grads = [g / self._num_workers for g in summed_gradients] reduced_grads_and_vars = zip(reduced_grads, variables) return self._optimizer.apply_gradients(reduced_grads_and_vars,