Skip to content

Commit

Permalink
Clean up the redundant files and unify the launch interface. (PaddleP…
Browse files Browse the repository at this point in the history
  • Loading branch information
gongweibao authored Nov 26, 2020
1 parent 47af5c3 commit 1358397
Show file tree
Hide file tree
Showing 22 changed files with 745 additions and 679 deletions.
25 changes: 23 additions & 2 deletions python/paddle/distributed/cloud_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import os
import paddle
from paddle.distributed.utils import get_cluster, logger
from paddle.distributed.utils import get_cluster, logger, get_gpus, get_cluster_from_args


def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus):
Expand Down Expand Up @@ -94,5 +94,26 @@ def get_cloud_cluster(args_node_ips, args_node_ip, args_port, selected_gpus):
return cluster, cluster.pods[node_rank]


def get_trainers_num():
def _get_trainers_num():
return int(os.getenv("PADDLE_TRAINERS_NUM", "1"))


def get_cluster_and_pod(args):
# parse arguments, used for cloud-single-machine and local
selected_gpus = get_gpus(args.selected_gpus)
trainers_num = _get_trainers_num()
logger.debug("parsed from args trainerss_num:{} selected_gpus:{}".format(
trainers_num, selected_gpus))

cluster = None
pod = None

if args.use_paddlecloud and trainers_num != 1:
cluster, pod = get_cloud_cluster(args.cluster_node_ips, args.node_ip,
args.started_port, selected_gpus)
logger.info("get cluster from cloud:{}".format(cluster))
else:
cluster, pod = get_cluster_from_args(args, selected_gpus)
logger.info("get cluster from args:{}".format(cluster))

return cluster, pod
13 changes: 8 additions & 5 deletions python/paddle/distributed/fleet/cloud_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
from paddle.distributed.fleet.launch_utils import get_cluster, logger


def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170):
def get_cloud_cluster(args_node_ips,
device_mode,
devices_per_proc,
args_port=6170):
"""
args_node_ips:string, selected_gpus:list, args_port: int
args_node_ips:string, device_mode:DeviceMode(IntEnum), device_per_proc:list, args_port: int
"""
#you can automatically get ip info while using paddlecloud multi nodes mode.
node_ips = os.getenv("PADDLE_TRAINERS")
Expand Down Expand Up @@ -55,7 +58,7 @@ def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170):
paddle_port = int(os.getenv("PADDLE_PORT", ""))

if paddle_ports_num >= len(
selected_gpus) and paddle_port != args_port:
devices_per_proc) and paddle_port != args_port:
logger.warning("Use Cloud specified port:{}.".format(
paddle_port))
started_port = paddle_port
Expand All @@ -67,7 +70,7 @@ def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170):
if started_port is None:
started_port = 6170
ports = [
x for x in range(started_port, started_port + len(selected_gpus))
x for x in range(started_port, started_port + len(devices_per_proc))
]
trainer_endpoints = []
for ip in node_ips:
Expand All @@ -85,7 +88,7 @@ def get_cloud_cluster(args_node_ips, selected_gpus, args_port=6170):
.format(node_ips, node_ip, node_rank, trainer_endpoints))

cluster, pod = get_cluster(node_ips, node_ip, trainer_endpoints,
selected_gpus)
device_mode, devices_per_proc)
return cluster, cluster.pods[node_rank]


Expand Down
39 changes: 27 additions & 12 deletions python/paddle/distributed/fleet/launch.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
from argparse import ArgumentParser, REMAINDER
import paddle
import paddle.fluid as fluid
from paddle.distributed.fleet import launch_utils

# TODO(danleifeng): Don't import * from a module
from paddle.distributed.fleet.launch_utils import *
import paddle.distributed.fleet.cloud_utils as cloud_utils

Expand Down Expand Up @@ -98,12 +100,21 @@ def _parse_args():
help="The path for each process's log.If it's not set, the log will printed to default pipe."
)

base_group.add_argument(
"--nproc_per_node",
type=int,
default=None,
help="The number of processes to launch on a node."
"In gpu training, it should be less or equal to the gpus number of you system(or you set by --gpus). And so each process can"
" bound to one or average number of gpus.")

base_group.add_argument(
"--gpus",
type=str,
default=None,
help="It's for gpu training and the training process will run on the gpus,"
"each process is bound to a single GPU. And if it's not set, this module will use all the gpu cards for training."
help="It's for gpu training."
"For example:"
"--gpus=\"0,1,2,3\" will launch four training processes each bound to one gpu."
)

base_group.add_argument(
Expand Down Expand Up @@ -146,14 +157,13 @@ def _parse_args():
return parser.parse_args()


def get_cluster_from_args(args, gpus):
def get_cluster_from_args(args, device_mode, devices_per_proc):
node_ips = [x.strip() for x in args.ips.split(',')]
if len(node_ips) == 1:
node_ip = node_ips[0]
else:
_, node_ip = get_host_name_ip()

# node_ip = args.node_ip
assert node_ip in node_ips, "Can't find your local ip {%s} in node_ips: {%s}" \
% (node_ip, node_ips)
node_rank = node_ips.index(node_ip)
Expand All @@ -164,28 +174,31 @@ def get_cluster_from_args(args, gpus):
free_ports = None
if not cloud_utils.use_paddlecloud() and len(
node_ips) <= 1 and os.environ.get('FLAGS_START_PORT') is None:
free_ports = find_free_ports(len(gpus))
free_ports = find_free_ports(len(devices_per_proc))
if free_ports is not None:
free_ports = list(free_ports)
else:
start_port = 6070
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = int(os.environ.get('FLAGS_START_PORT'))

free_ports = [x for x in range(start_port, start_port + len(gpus))]
free_ports = [
x for x in range(start_port, start_port + len(devices_per_proc))
]

trainer_endpoints = []
for ip in node_ips:
trainer_endpoints.append(["%s:%d" % (ip, port) for port in free_ports])
return get_cluster(node_ips, node_ip, trainer_endpoints, gpus)
return get_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
devices_per_proc)


def launch_collective(args):
# parse arguments, used for cloud-single-machine and local
gpus = get_gpus(args.gpus)
(device_mode, devices_per_proc) = launch_utils.get_device_proc_info(args)
trainers_num = cloud_utils.get_trainers_num()
logger.debug("parsed from args trainerss_num:{} gpus:{}".format(
trainers_num, gpus))
logger.debug("parsed from args trainerss_num:{} mode:{} devices:{}".format(
trainers_num, device_mode, devices_per_proc))

cluster = None
pod = None
Expand All @@ -194,11 +207,13 @@ def launch_collective(args):
if os.environ.get('FLAGS_START_PORT') is not None:
start_port = os.environ.get('FLAGS_START_PORT')
if cloud_utils.use_paddlecloud() and trainers_num != 1:
cluster, pod = cloud_utils.get_cloud_cluster(args.ips, gpus, start_port)
cluster, pod = cloud_utils.get_cloud_cluster(
args.ips, device_mode, devices_per_proc, start_port)
logger.debug("get cluster from cloud:{}".format(cluster))
else:
# trainers_num = 1 or not use paddlecloud ips="a,b"
cluster, pod = get_cluster_from_args(args, gpus)
cluster, pod = get_cluster_from_args(args, device_mode,
devices_per_proc)
logger.debug("get cluster from args:{}".format(cluster))

global_envs = copy.copy(os.environ.copy())
Expand Down
75 changes: 68 additions & 7 deletions python/paddle/distributed/fleet/launch_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@
from contextlib import closing
import socket
import warnings
import six
from enum import IntEnum

import paddle
import paddle.fluid as fluid
logger = logging.getLogger("root")
logger.propagate = False


class DistributeMode:
class DistributeMode(IntEnum):
"""
There are various mode for fleetrun, each of them is designed for different model.
"""
Expand All @@ -42,6 +44,16 @@ class DistributeMode:
PS_HETER = 2


class DeviceMode(IntEnum):
"""
Training devices type
"""
CPU = 0
GPU = 1
KUNLUN = 2
UNKNOWN = 3


class Cluster(object):
def __init__(self, hdfs):
self.job_server = None
Expand Down Expand Up @@ -243,7 +255,8 @@ def get_logger(log_level=20, name="root"):
return logger


def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
def get_cluster(node_ips, node_ip, trainer_endpoints, device_mode,
devices_per_proc):
assert type(trainer_endpoints) is list, "trainer_endpoints must be list"
cluster = Cluster(hdfs=None)
trainer_rank = 0
Expand All @@ -252,13 +265,17 @@ def get_cluster(node_ips, node_ip, trainer_endpoints, selected_gpus):
pod.rank = node_rank
pod.addr = ip
cur_node_endpoints = trainer_endpoints[node_rank]
# when use paddlecloud, endpoints may > selected_gpus(user_defined)
# when use paddlecloud, endpoints may > devices_per_proc(user_defined)
assert len(cur_node_endpoints) >= len(
selected_gpus
devices_per_proc
), "current trainer_endpoints size should be greater equal than selected_gpus size."
for i in range(len(selected_gpus)):
for i in range(len(devices_per_proc)):
trainer = Trainer()
trainer.gpus.append(selected_gpus[i])
if device_mode == DeviceMode.GPU:
if isinstance(devices_per_proc[i], (list, tuple)):
trainer.gpus.extend(devices_per_proc[i])
else:
trainer.gpus.append(devices_per_proc[i])
trainer.endpoint = "%s" % (cur_node_endpoints[i])
trainer.rank = trainer_rank
trainer_rank += 1
Expand Down Expand Up @@ -432,13 +449,16 @@ def start_local_trainers(cluster,
procs = []
for idx, t in enumerate(pod.trainers):
proc_env = {
"FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
"PADDLE_TRAINER_ID": "%d" % t.rank,
"PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
"PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
"PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints())
}

if len(t.gpus) > 0:
proc_env["FLAGS_selected_gpus"] = "%s" % ",".join(
[str(g) for g in t.gpus])

current_env.update(proc_env)

cmd = [sys.executable, "-u", training_script] + training_script_args
Expand Down Expand Up @@ -565,6 +585,47 @@ def get_gpus(gpus):
return res_gpus


def get_device_mode():
#TODO(gongwb):Add XPU supported
if not fluid.core.is_compiled_with_cuda(
) or fluid.core.get_cuda_device_count() <= 0:
print("launch train in CPU mode")
return DeviceMode.CPU

print("launch train in GPU mode")
return DeviceMode.GPU


def get_device_proc_info(args):
# device_mode
device_mode = get_device_mode()

# devices
devices_per_proc = []
if device_mode == DeviceMode.GPU:
gpus = get_gpus(args.gpus)
if args.nproc_per_node is not None:
assert (len(gpus) % int(args.nproc_per_node)) ==0, \
"gpus' number:{} mod args.nproc_per_node:{} must == 0".format(len(gpus), arg.nproc_per_node)

n = int(len(gpus) / int(args.nproc_per_node))
devices_per_proc = [
gpus[i:i + n] for i in six.moves.range(0, len(gpus), n)
]
else:
devices_per_proc = gpus
elif device_mode == DeviceMode.CPU:
if args.nproc_per_node is None:
devices_per_proc = [0]
else:
devices_per_proc = [x for x in range(0, args.nproc_per_node)]
else:
assert False, "Can't support device_mode:{}, support only cpu and gpu now.".format(
device_mode)

return (device_mode, devices_per_proc)


def direct_start(args):
# run ps-cpu mode on paddlecloud, using given envs
cmd = [sys.executable, "-u", args.training_script] + \
Expand Down
Loading

0 comments on commit 1358397

Please sign in to comment.