Skip to content

Commit

Permalink
Initialize gloo for low level collective apis (PaddlePaddle#27356)
Browse files Browse the repository at this point in the history
* add gloo initializer, test=develop
  • Loading branch information
lilong12 authored Sep 28, 2020
1 parent bf99bc4 commit fa73e4a
Show file tree
Hide file tree
Showing 13 changed files with 116 additions and 93 deletions.
4 changes: 2 additions & 2 deletions paddle/fluid/platform/gloo_context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ void GlooParallelContext::Init() {
auto gloo_ptr = paddle::framework::GlooWrapper::GetInstance();
gloo_ptr->SetRank(strategy_.rank);
gloo_ptr->SetSize(strategy_.rank_num);
gloo_ptr->SetPrefix(strategy_.prefix);
gloo_ptr->SetIface(strategy_.iface);
gloo_ptr->SetTimeoutSeconds(strategy_.init_seconds, strategy_.run_seconds);
gloo_ptr->SetHdfsStore(strategy_.path, strategy_.fs_name, strategy_.fs_ugi);
gloo_ptr->SetHttpStore(strategy_.ip_address, strategy_.ip_port,
strategy_.scope);
gloo_ptr->Init();
}
#endif
Expand Down
7 changes: 3 additions & 4 deletions paddle/fluid/platform/gloo_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ struct GlooParallelStrategy {
int rank{0};
int rank_num{1};
std::string iface;
std::string prefix;
int init_seconds{9999999};
int run_seconds{9999999};
std::string path;
std::string fs_name;
std::string fs_ugi;
std::string ip_address;
int ip_port;
std::string scope{"worker"};
};

class GlooParallelContext {
Expand Down
32 changes: 11 additions & 21 deletions paddle/fluid/pybind/gloo_context_py.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,6 @@ void BindGlooContext(py::module *m) {
[](platform::GlooParallelStrategy &self, const std::string &iface) {
self.iface = iface;
})
.def_property("prefix",
[](const platform::GlooParallelStrategy &self) {
return self.prefix;
},
[](platform::GlooParallelStrategy &self,
const std::string &prefix) { self.prefix = prefix; })
.def_property("init_seconds",
[](const platform::GlooParallelStrategy &self) {
return self.init_seconds;
Expand All @@ -83,23 +77,19 @@ void BindGlooContext(py::module *m) {
self.run_seconds = run_seconds;
})
.def_property(
"path",
[](const platform::GlooParallelStrategy &self) { return self.path; },
[](platform::GlooParallelStrategy &self, const std::string &path) {
self.path = path;
})
.def_property("fs_name",
[](const platform::GlooParallelStrategy &self) {
return self.fs_name;
},
[](platform::GlooParallelStrategy &self,
const std::string &fs_name) { self.fs_name = fs_name; })
.def_property("fs_ugi",
"ip_address",
[](const platform::GlooParallelStrategy &self) {
return self.ip_address;
},
[](platform::GlooParallelStrategy &self,
const std::string &ip_address) { self.ip_address = ip_address; })
.def_property("ip_port",
[](const platform::GlooParallelStrategy &self) {
return self.fs_ugi;
return self.ip_port;
},
[](platform::GlooParallelStrategy &self,
const std::string &fs_ugi) { self.fs_ugi = fs_ugi; });
[](platform::GlooParallelStrategy &self, int ip_port) {
self.ip_port = ip_port;
});

py::class_<platform::GlooParallelContext> gloo_ctx(*m, "GlooParallelContext");
gloo_ctx.def(py::init<const platform::GlooParallelStrategy &>())
Expand Down
3 changes: 1 addition & 2 deletions python/paddle/distributed/fleet/base/role_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,7 @@ def __start_kv_server(http_server_d, size_d):
http_server = KVServer(port, size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
while http_server_d.get("running", False):
time.sleep(wait_seconds)
http_server.stop()

Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/fleet/utils/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ def stop(self):
self.listen_thread.join()
self.http_server.server_close()

def shoud_stop(self):
def should_stop(self):
"""
return whether the server should stop.
Expand Down
79 changes: 76 additions & 3 deletions python/paddle/distributed/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import os
import six
import warnings
from multiprocessing import Process, Manager
import netifaces
import time
import sys

from paddle import compat as cpt

Expand All @@ -29,6 +33,39 @@
ParallelStrategy = core.ParallelStrategy


def _start_kv_server(port, http_server_d):
from paddle.distributed.fleet.utils.http_server import KVServer
http_server = KVServer(int(port))
http_server.start()
wait_seconds = 5
while http_server_d.get("running", False):
time.sleep(wait_seconds)
http_server.stop()


def _get_iface_by_ip(ip_address):
"""
Get network interface name by its ip address.
Args:
ip_address (string): ip address
Returns:
Name of the network interface which has the ip address 'ip_address',
or None if the ip_address is not found.
"""
nicfaces = netifaces.interfaces()
for nicface in nicfaces:
message = netifaces.ifaddresses(nicface)
iface_info = message.get(netifaces.AF_INET)
if iface_info:
iface_dict = iface_info[0]
ipaddr = iface_dict.get('addr')
if ipaddr == ip_address:
return nicface
return None


def init_parallel_env():
"""
Initialize parallel training environment in dynamic graph mode.
Expand Down Expand Up @@ -110,16 +147,52 @@ def _check_var_exists(var_name):
_check_var_exists("PADDLE_TRAINERS_NUM")
_check_var_exists("PADDLE_TRAINER_ENDPOINTS")

# 3. init NCCL ParallelStrategy
if ParallelEnv().world_size < 2:
return

# 3: init gloo context
ep_rank_0 = ParallelEnv().trainer_endpoints[0].split(":")
ep_rank = ParallelEnv().trainer_endpoints[ParallelEnv().rank].split(":")
manager = Manager()
# glboal dict to store status
http_server_d = manager.dict()
http_server_d["running"] = False
if ParallelEnv().rank == 0:
http_server = Process(
target=_start_kv_server, args=(int(ep_rank_0[1]), http_server_d))
http_server.daemon = True
http_server_d["running"] = True
http_server.start()

iface = _get_iface_by_ip(ep_rank[0])
if iface is None:
raise ValueError("No network interface associated with the "
"ip address: {}.".format(ep_rank_0[0]))
gloo_strategy = core.GlooParallelStrategy()
gloo_strategy.rank = ParallelEnv().rank
gloo_strategy.rank_num = ParallelEnv().world_size
gloo_strategy.ip_address = ep_rank_0[0]
gloo_strategy.ip_port = int(ep_rank_0[1])
gloo_strategy.iface = iface
default_init_timeout_seconds = 3600
default_run_timeout_seconds = 9999999
gloo_strategy.init_seconds = default_init_timeout_seconds
gloo_strategy.run_seconds = default_run_timeout_seconds
gloo = core.GlooParallelContext(gloo_strategy)
gloo.init()
if ParallelEnv().rank == 0:
http_server_d["running"] = False
http_server.join()

# 4. init NCCL ParallelStrategy
strategy = ParallelStrategy()
if parallel_helper._is_parallel_ctx_initialized():
warnings.warn("The parallel environment has been initialized.")
strategy.nranks = ParallelEnv().world_size
strategy.local_rank = ParallelEnv().rank
strategy.trainer_endpoints = ParallelEnv().trainer_endpoints
strategy.current_endpoint = ParallelEnv().current_endpoint
if strategy.nranks < 2:
return

# NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode,
# users will not call `dygraph.guard` or `enable_dygraph`
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/distributed/spawn.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ def train(print_result=False):
device = get_device()
if device == 'cpu':
# TODO: not supports cpu parallel now
nprocs = _cpu_num
nprocs = _cpu_num()
else:
nprocs = core.get_cuda_device_count()

Expand Down
3 changes: 1 addition & 2 deletions python/paddle/fluid/incubate/fleet/base/role_maker.py
Original file line number Diff line number Diff line change
Expand Up @@ -982,8 +982,7 @@ def __start_kv_server(self, http_server_d, size_d):
http_server = KVServer(int(self._http_ip_port[1]), size_d)
http_server.start()
wait_seconds = 5
while http_server_d.get("running",
False) and not http_server.shoud_stop():
while http_server_d.get("running", False):
time.sleep(wait_seconds)
http_server.stop()

Expand Down
2 changes: 1 addition & 1 deletion python/paddle/fluid/incubate/fleet/utils/http_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ def stop(self):
self.listen_thread.join()
self.http_server.server_close()

def shoud_stop(self):
def should_stop(self):
"""
return whether the server should stop.
Expand Down
12 changes: 6 additions & 6 deletions python/paddle/fluid/tests/unittests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ list(APPEND DIST_TEST_OPS test_parallel_dygraph_sparse_embedding)
list(APPEND DIST_TEST_OPS test_parallel_dygraph_transformer)
list(APPEND DIST_TEST_OPS test_listen_and_serv_op)
list(APPEND DIST_TEST_OPS test_fleet_graph_execution_meta_optimizer)
list(APPEND DIST_TEST_OPS test_collective_reduce_api)
list(APPEND DIST_TEST_OPS test_collective_scatter_api)
list(APPEND DIST_TEST_OPS test_collective_barrier_api)
list(APPEND DIST_TEST_OPS test_collective_allreduce_api)
list(APPEND DIST_TEST_OPS test_collective_broadcast_api)
list(APPEND DIST_TEST_OPS test_collective_allgather_api)
set(MIXED_DIST_TEST_OPS ${DIST_TEST_OPS})
#remove distribute unittests.
list(APPEND MIXED_DIST_TEST_OPS test_dgc_op)
Expand Down Expand Up @@ -62,12 +68,6 @@ if(NOT WITH_GPU OR WIN32)
LIST(REMOVE_ITEM TEST_OPS test_broadcast)
LIST(REMOVE_ITEM TEST_OPS test_collective_reduce)
LIST(REMOVE_ITEM TEST_OPS test_collective_scatter)
LIST(REMOVE_ITEM TEST_OPS test_collective_reduce_api)
LIST(REMOVE_ITEM TEST_OPS test_collective_scatter_api)
LIST(REMOVE_ITEM TEST_OPS test_collective_barrier_api)
LIST(REMOVE_ITEM TEST_OPS test_collective_allreduce_api)
LIST(REMOVE_ITEM TEST_OPS test_collective_broadcast_api)
LIST(REMOVE_ITEM TEST_OPS test_collective_allgather_api)
LIST(REMOVE_ITEM TEST_OPS test_reducescatter)
LIST(REMOVE_ITEM TEST_OPS test_reducescatter_api)
endif()
Expand Down
55 changes: 9 additions & 46 deletions python/paddle/fluid/tests/unittests/test_collective_api_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import pickle
from contextlib import closing
from six import string_types
import paddle
import paddle.fluid as fluid
import paddle.fluid.unique_name as nameGen
from paddle.fluid import core
Expand Down Expand Up @@ -60,38 +61,6 @@ def wait_server_ready(self, endpoints):
else:
break

def initCommunicator(self, program, rank, nranks, wait_port,
current_endpoint, endpoints):
other_endpoints = endpoints[:]
other_endpoints.remove(current_endpoint)
if rank == 0 and wait_port:
self.wait_server_ready(other_endpoints)
block = program.global_block()
nccl_id_var = block.create_var(
name=nameGen.generate('nccl_id'),
persistable=True,
type=core.VarDesc.VarType.RAW)

block.append_op(
type='c_gen_nccl_id',
inputs={},
outputs={'Out': nccl_id_var},
attrs={
'rank': rank,
'endpoint': current_endpoint,
'other_endpoints': other_endpoints
})

block.append_op(
type='c_comm_init',
inputs={'X': nccl_id_var},
outputs={},
attrs={
'nranks': nranks,
'rank': rank,
'ring_id': self.global_ring_id
})

def run_trainer(self, args):
train_prog = fluid.Program()
startup_prog = fluid.Program()
Expand All @@ -100,23 +69,12 @@ def run_trainer(self, args):
current_endpoint = args["currentendpoint"]
nranks = 2
result = self.get_model(train_prog, startup_prog, rank)
paddle.distributed.init_parallel_env()
if args['backend'] == 'nccl':
self.initCommunicator(startup_prog, rank, nranks, True,
current_endpoint, endpoints)
device_id = int(os.getenv("FLAGS_selected_gpus", "0"))
place = fluid.CUDAPlace(
device_id) #if args.use_gpu else fluid.CPUPlace()
else:
strategy = fluid.core.GlooParallelStrategy()
strategy.rank = rank
strategy.rank_num = nranks
strategy.prefix = ""
strategy.iface = "lo"
strategy.init_seconds = 999999
strategy.run_seconds = 999999
strategy.path = "/tmp/tmp%d" % args['path_id']
gloo = fluid.core.GlooParallelContext(strategy)
gloo.init()
place = fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup_prog)
Expand Down Expand Up @@ -199,8 +157,8 @@ def _run_cluster(self, model_file, envs):
tr_cmd = "%s %s"
tr0_cmd = tr_cmd % (self._python_interp, model_file)
tr1_cmd = tr_cmd % (self._python_interp, model_file)
tr0_pipe = open("/tmp/tr0_err.log", "wb")
tr1_pipe = open("/tmp/tr1_err.log", "wb")
tr0_pipe = open("/tmp/tr0_err.log", "w")
tr1_pipe = open("/tmp/tr1_err.log", "w")
#print(tr0_cmd)
tr0_proc = subprocess.Popen(
tr0_cmd.strip().split(),
Expand All @@ -221,6 +179,10 @@ def _run_cluster(self, model_file, envs):
# close trainer file
tr0_pipe.close()
tr1_pipe.close()
with open("/tmp/tr0_err.log", "r") as f:
sys.stderr.write('trainer 0 stderr file: %s\n' % f.read())
with open("/tmp/tr1_err.log", "r") as f:
sys.stderr.write('trainer 1 stderr file: %s\n' % f.read())
return pickle.loads(tr0_out), pickle.loads(
tr1_out), tr0_proc.pid, tr1_proc.pid

Expand All @@ -247,6 +209,7 @@ def check_with_place(self,
if check_error_log:
required_envs["GLOG_v"] = "3"
required_envs["GLOG_logtostderr"] = "1"
required_envs["GLOO_LOG_LEVEL"] = "TRACE"
tr0_out, tr1_out, pid0, pid1 = self._run_cluster(model_file,
required_envs)
np.random.seed(pid0)
Expand Down
6 changes: 3 additions & 3 deletions python/paddle/fluid/tests/unittests/test_dist_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def run_trainer(self, args):
fluid.default_main_program().random_seed = seed
np.random.seed(seed)
import random
random.seed = seed
random.seed(seed)
model, train_reader, opt = self.get_model()
nranks = len(args.endpoints.split(",")) if args.endpoints else 1

Expand Down Expand Up @@ -456,7 +456,7 @@ def run_trainer_with_spawn(self, args):
paddle.static.default_startup_program().random_seed = seed
paddle.static.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed = seed
random.seed(seed)
# get trainer id
args.trainer_id = paddle.distributed.get_rank()

Expand Down Expand Up @@ -499,7 +499,7 @@ def run_gpu_fleet_api_trainer(self, args):
paddle.static.default_startup_program().random_seed = seed
paddle.static.default_main_program().random_seed = seed
np.random.seed(seed)
random.seed = seed
random.seed(seed)
# get trainer id
args.trainer_id = paddle.distributed.get_rank()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def __init__(self):
h.log_message("666")
s.get_deleted_size("haha")
s1 = TmpS()
s1.shoud_stop()
s1.should_stop()


if __name__ == "__main__":
Expand Down

0 comments on commit fa73e4a

Please sign in to comment.