Skip to content

Commit

Permalink
Enable NCCL in Sync SGD optimizer. (lsds#159)
Browse files Browse the repository at this point in the history
* Enable NCCL in Sync SGD optimizer.

* update names

* Fix

* Minor change

* fix lint

* fix

* bypass np=1

* Improve naming

* Extra check for NCCL.

* Fix bug.

* Add a Kungfu option
  • Loading branch information
luomai authored Oct 19, 2019
1 parent 46e1fae commit 9af38c0
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 40 deletions.
7 changes: 6 additions & 1 deletion benchmarks/synchronisation/benchmark_kungfu.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,13 @@
elif args.kungfu == 'async-sgd':
from kungfu.optimizers import PeerModelAveragingOptimizer
opt = PeerModelAveragingOptimizer(opt)
elif args.kungfu == 'sync-sgd-nccl':
from kungfu.optimizers import SyncSGDOptimizer
opt = SyncSGDOptimizer(opt, nccl=True, nccl_fusion=True)
elif args.kungfu == 'ideal':
opt = opt
else:
pass
raise Exception('Unknown kungfu option')

data = tf.random_uniform([args.batch_size, 224, 224, 3])
target = tf.random_uniform([args.batch_size, 1],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ BATCH_SIZE=64
NUM_WORKERS=16
MODEL="ResNet50"
NUM_ITERS=100
KUNGFU="sync-sgd"

# Assume each host has 8 GPUs.
WORKER_HOSTS="169.254.128.207:8,169.254.128.185:8"
Expand All @@ -27,4 +28,5 @@ export TF_CPP_MIN_LOG_LEVEL=1
run_experiment $NUM_WORKERS python3 $SCRIPT_PATH \
--batch-size=$BATCH_SIZE \
--model=$MODEL \
--num-iters=$NUM_ITERS
--num-iters=$NUM_ITERS \
--kungfu=$KUNGFU
8 changes: 4 additions & 4 deletions srcs/cpp/src/tensorflow/ops/collective_gpu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
namespace tensorflow
{

REGISTER_OP("StartGpuGroup").Input("input: string");
REGISTER_OP("StartNcclScheduler").Input("input: string");

class StartGpuGroup : public OpKernel
class StartNcclScheduler : public OpKernel
{
using OpKernel::OpKernel;

Expand All @@ -28,8 +28,8 @@ class StartGpuGroup : public OpKernel
}
};

REGISTER_KERNEL_BUILDER(Name("StartGpuGroup").Device(DEVICE_CPU),
StartGpuGroup);
REGISTER_KERNEL_BUILDER(Name("StartNcclScheduler").Device(DEVICE_CPU),
StartNcclScheduler);

REGISTER_OP("AllReduceGpu")
.Attr("T: {int32, int64, float16, float32, float64}")
Expand Down
7 changes: 3 additions & 4 deletions srcs/python/kungfu/ops/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from .adapt import get_init_checkpoint, resize_cluster
from .collective import (all_reduce, all_reduce_gpu, barrier, broadcast,
cpu_group_all_reduce, gpu_group_all_reduce,
group_all_reduce)
from .collective import (all_reduce, barrier, broadcast, group_all_reduce,
group_nccl_all_reduce)
from .loader import _has_gpu, _init_lib, _op_lib
from .local import save_variable, save_variables
from .monitor import global_noise_scale
Expand Down Expand Up @@ -288,7 +287,7 @@ def partial_exchange_with_gpu_allreduce(ts,
for i, partition in enumerate(groups):
negotiated_partition = tf.cond(
tf.equal(tf.mod(gs - 1, num_partitions), i),
lambda partition=partition: gpu_group_all_reduce(partition),
lambda partition=partition: group_nccl_all_reduce(partition),
lambda partition=partition: partition)
if len(partition) == 1:
negotiated_partition = [negotiated_partition]
Expand Down
48 changes: 23 additions & 25 deletions srcs/python/kungfu/ops/collective.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,36 @@ def all_reduce(t):
return _op_lib.all_reduce(t, input_tensor_name=t.name)


def all_reduce_gpu(t):
return _op_lib.all_reduce_gpu(t, input_tensor_name=t.name)


def start_gpu_group(*args, **kwargs):
return _op_lib.start_gpu_group(*args, **kwargs)
def _maybe_group_all_reduce(ts, group_all_reduce_fn):
# a helper function to bypass all_reduce for np = 1
_rank, np = peer_info()
import tensorflow as tf
return tf.cond(np > 1, lambda: group_all_reduce_fn(ts),
lambda: [tf.identity(t) for t in ts])


def cpu_group_all_reduce(ts):
def group_all_reduce(ts):
return [all_reduce(t) for t in ts]


def gpu_group_all_reduce(ts):
names = [t.name for t in ts]
names = list(sorted(names)) # FIXME: use topsort
import tensorflow as tf
with tf.control_dependencies([
start_gpu_group(names),
]):
return [all_reduce_gpu(t) for t in ts]
def _nccl_all_reduce(t):
return _op_lib.all_reduce_gpu(t, input_tensor_name=t.name)


def _group_all_reduce(ts, use_nccl):
if use_nccl:
print('Try to use GPU NCCL to perform all-reduce')
return gpu_group_all_reduce(ts)
print('Try to use KungFu MPI to perform all-reduce')
return cpu_group_all_reduce(ts)
def _start_nccl_scheduler(*args, **kwargs):
if hasattr(_op_lib, 'start_nccl_scheduler'):
return _op_lib.start_nccl_scheduler(*args, **kwargs)
else:
raise RuntimeError("KungFu is not installed with NCCL.")


def group_all_reduce(ts, use_nccl=False):
_rank, np = peer_info()
def group_nccl_all_reduce(ts):
names = [t.name for t in ts]
if len(names) > 1:
print("WARNING: Please fuse tensors before using NCCL.")
names = list(sorted(names)) # FIXME: use topsort
import tensorflow as tf
return tf.cond(np > 1, lambda: _group_all_reduce(ts, use_nccl),
lambda: [tf.identity(t) for t in ts])
with tf.control_dependencies([
_start_nccl_scheduler(names),
]):
return [_nccl_all_reduce(t) for t in ts]
3 changes: 2 additions & 1 deletion srcs/python/kungfu/optimizers/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ def compute_gradients(self, *args, **kwargs):

# An optimizer could minimize variables other than tf.trainable_variables
# It is safer to get the correct list of variables that need synchornisation here
self._model_variables = [v for g, v in grads_and_vars]
if self._model_variables is None:
self._model_variables = [v for g, v in grads_and_vars]

return grads_and_vars

Expand Down
31 changes: 27 additions & 4 deletions srcs/python/kungfu/optimizers/sync_sgd.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import tensorflow as tf
from kungfu.ops import (broadcast, global_noise_scale, group_all_reduce,
peer_info)
group_nccl_all_reduce, peer_info)

from .core import KungFuOptimizer, fuse
from .core import KungFuOptimizer, defuse, fuse


class SyncSGDOptimizer(KungFuOptimizer):
Expand All @@ -19,22 +19,45 @@ class SyncSGDOptimizer(KungFuOptimizer):
Optional name prefix for the operations created when applying
gradients. Defaults to "Distributed" followed by the provided
optimizer type.
nccl:
Optional flag for using NCCL to perform all-reduce.
nccl_fusion:
Optional flag to fuse all gradients before launch NCCL all-reduce.
This is useful to amortise the cost of NCCL calls.
use_locking:
Whether to use locking when updating variables.
See Optimizer.__init__ for more info.
"""
def __init__(self, optimizer, name=None, use_locking=False):
def __init__(self,
optimizer,
nccl=False,
nccl_fusion=True,
name=None,
use_locking=False):
super(SyncSGDOptimizer, self).__init__(optimizer,
name,
use_locking=use_locking)
_rank, np = peer_info()
# FIXME: use type of gradient
self._num_workers = tf.cast(np, tf.float32)
self._nccl = nccl
self._nccl_fusion = nccl_fusion

def apply_gradients(self, grads_and_vars, **kwargs):
gradients, variables = list(zip(*grads_and_vars))
summed_gradients = group_all_reduce(gradients)

if self._nccl:
if self._nccl_fusion:
fused_grad = fuse(gradients)
summed_fused_gradients = group_nccl_all_reduce([fused_grad])
summed_gradients = defuse(summed_fused_gradients[0],
[g.shape for g in gradients])
else:
summed_gradients = group_nccl_all_reduce(gradients)
else:
summed_gradients = group_all_reduce(gradients)

reduced_grads = [g / self._num_workers for g in summed_gradients]
reduced_grads_and_vars = zip(reduced_grads, variables)
return self._optimizer.apply_gradients(reduced_grads_and_vars,
Expand Down

0 comments on commit 9af38c0

Please sign in to comment.