Skip to content

Commit

Permalink
Polish api Program/CompiledProgram/ParallelEnv doc & code example (Pa…
Browse files Browse the repository at this point in the history
…ddlePaddle#27656)

* polish Program api doc & example

* polish CompiledProgram api doc & example

* polish ParallelEnv api doc & examples

* polish details, test=document_fix

* polish program doc details, test=document_fix

* polish details, test=document_fix

* fix note format error, test=document_fix

* add lost example, test=document_fix

* fix lost example, test=document_fix
  • Loading branch information
chenwhql authored Sep 29, 2020
1 parent b14ecb8 commit 199da96
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 259 deletions.
143 changes: 76 additions & 67 deletions python/paddle/fluid/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,42 +93,45 @@ class CompiledProgram(object):
for example, the operators' fusion in the computation graph, memory
optimization during the execution of the computation graph, etc.
For more information about build_strategy, please refer to
:code:`fluid.BuildStrategy`.
:code:`paddle.static.BuildStrategy`.
Args:
program_or_graph (Graph|Program): This parameter is the Program or Graph
program_or_graph (Graph|Program): This argument is the Program or Graph
being executed.
build_strategy(BuildStrategy): This parameter is used to compile the
build_strategy(BuildStrategy): This argument is used to compile the
program or graph with the specified options, such as operators' fusion
in the computational graph and memory optimization during the execution
of the computational graph. For more information about build_strategy,
please refer to :code:`fluid.BuildStrategy`. The default is None.
please refer to :code:`paddle.static.BuildStrategy`. The default is None.
Returns:
CompiledProgram
Example:
.. code-block:: python
import paddle.fluid as fluid
import numpy
import numpy
import paddle
import paddle.static as static
place = fluid.CUDAPlace(0) # fluid.CPUPlace()
exe = fluid.Executor(place)
paddle.enable_static()
data = fluid.data(name='X', shape=[None, 1], dtype='float32')
hidden = fluid.layers.fc(input=data, size=10)
loss = fluid.layers.mean(hidden)
fluid.optimizer.SGD(learning_rate=0.01).minimize(loss)
place = paddle.CUDAPlace(0) # paddle.CPUPlace()
exe = static.Executor(place)
exe.run(fluid.default_startup_program())
compiled_prog = fluid.CompiledProgram(
fluid.default_main_program())
data = static.data(name='X', shape=[None, 1], dtype='float32')
hidden = static.nn.fc(input=data, size=10)
loss = paddle.mean(hidden)
paddle.optimizer.SGD(learning_rate=0.01).minimize(loss)
x = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = exe.run(compiled_prog,
feed={"X": x},
fetch_list=[loss.name])
exe.run(static.default_startup_program())
compiled_prog = static.CompiledProgram(
static.default_main_program())
x = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = exe.run(compiled_prog,
feed={"X": x},
fetch_list=[loss.name])
"""

def __init__(self, program_or_graph, build_strategy=None):
Expand Down Expand Up @@ -169,13 +172,16 @@ def with_data_parallel(self,
exec_strategy to set some optimizations that can be applied during the construction
and computation of the Graph, such as reducing the number of AllReduce operations,
specifying the size of the thread pool used in the computation Graph running the model,
and so on. **Note: If build_strategy is specified when building CompiledProgram and calling
with_data_parallel, build_strategy in CompiledProgram will be overwritten, therefore,
if it is data parallel training, it is recommended to set build_strategy when calling
with_data_parallel interface.**
and so on.
.. note::
If build_strategy is specified when building CompiledProgram and calling
with_data_parallel, build_strategy in CompiledProgram will be overwritten, therefore,
if it is data parallel training, it is recommended to set build_strategy when calling
with_data_parallel interface.
Args:
loss_name (str): This parameter is the name of the loss variable of the model.
loss_name (str): This parameter is the name of the loss Tensor of the model.
**Note: If it is model training, you must set loss_name, otherwise the
result may be problematic**. The default is None.
build_strategy(BuildStrategy): This parameter is used to compile the
Expand All @@ -192,7 +198,7 @@ def with_data_parallel(self,
specified by share_vars_from. This parameter needs to be set when model testing
is required during model training, and the data parallel mode is used for
training and testing. Since CompiledProgram will only distribute parameter
variables to other devices when it is first executed, the CompiledProgram
Tensors to other devices when it is first executed, the CompiledProgram
specified by share_vars_from must be run before the current CompiledProgram.
The default is None.
places(list(CUDAPlace)|list(CPUPlace)|None): This parameter specifies the device
Expand All @@ -214,50 +220,53 @@ def with_data_parallel(self,
Example:
.. code-block:: python
import paddle.fluid as fluid
import numpy
import os
use_cuda = True
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
parallel_places = [fluid.CUDAPlace(0), fluid.CUDAPlace(1)] if use_cuda else [fluid.CPUPlace()] * 2
# NOTE: If you use CPU to run the program, you need
# to specify the CPU_NUM, otherwise, fluid will use
# all the number of the logic core as the CPU_NUM,
# in that case, the batch size of the input should be
# greater than CPU_NUM, if not, the process will be
# failed by an exception.
if not use_cuda:
os.environ['CPU_NUM'] = str(2)
exe = fluid.Executor(place)
data = fluid.data(name='X', shape=[None, 1], dtype='float32')
hidden = fluid.layers.fc(input=data, size=10)
loss = fluid.layers.mean(hidden)
test_program = fluid.default_main_program().clone(for_test=True)
fluid.optimizer.SGD(learning_rate=0.01).minimize(loss)
exe.run(fluid.default_startup_program())
compiled_train_prog = fluid.CompiledProgram(
fluid.default_main_program()).with_data_parallel(
loss_name=loss.name, places=parallel_places)
# NOTE: if not set share_vars_from=compiled_train_prog,
# the parameters used in test process are different with
# the parameters used by train process
compiled_test_prog = fluid.CompiledProgram(
test_program).with_data_parallel(
share_vars_from=compiled_train_prog,
places=parallel_places)
train_data = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = exe.run(compiled_train_prog,
import numpy
import os
import paddle
import paddle.static as static
paddle.enable_static()
use_cuda = True
place = paddle.CUDAPlace(0) if use_cuda else paddle.CPUPlace()
parallel_places = [paddle.CUDAPlace(0), paddle.CUDAPlace(1)] if use_cuda else [paddle.CPUPlace()] * 2
# NOTE: If you use CPU to run the program, you need
# to specify the CPU_NUM, otherwise, paddle will use
# all the number of the logic core as the CPU_NUM,
# in that case, the batch size of the input should be
# greater than CPU_NUM, if not, the process will be
# failed by an exception.
if not use_cuda:
os.environ['CPU_NUM'] = str(2)
exe = static.Executor(place)
data = static.data(name='X', shape=[None, 1], dtype='float32')
hidden = static.nn.fc(input=data, size=10)
loss = paddle.mean(hidden)
test_program = static.default_main_program().clone(for_test=True)
paddle.optimizer.SGD(learning_rate=0.01).minimize(loss)
exe.run(static.default_startup_program())
compiled_train_prog = static.CompiledProgram(
static.default_main_program()).with_data_parallel(
loss_name=loss.name, places=parallel_places)
# NOTE: if not set share_vars_from=compiled_train_prog,
# the parameters used in test process are different with
# the parameters used by train process
compiled_test_prog = static.CompiledProgram(
test_program).with_data_parallel(
share_vars_from=compiled_train_prog,
places=parallel_places)
train_data = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = exe.run(compiled_train_prog,
feed={"X": train_data},
fetch_list=[loss.name])
test_data = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = exe.run(compiled_test_prog,
test_data = numpy.random.random(size=(10, 1)).astype('float32')
loss_data, = exe.run(compiled_test_prog,
feed={"X": test_data},
fetch_list=[loss.name])
"""
Expand Down
78 changes: 31 additions & 47 deletions python/paddle/fluid/dygraph/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,60 +61,44 @@ def prepare_context(strategy=None):

class ParallelEnv(object):
"""
**Notes**:
**The old class name was Env and will be deprecated. Please use new class name ParallelEnv.**
.. note::
This API is not recommended, if you need to get rank and world_size,
it is recommended to use ``paddle.distributed.get_rank()`` and
``paddle.distributed.get_world_size()`` .
This class is used to obtain the environment variables required for
the parallel execution of dynamic graph model.
the parallel execution of ``paddle.nn.Layer`` in dynamic mode.
The dynamic graph parallel mode needs to be started using paddle.distributed.launch.
By default, the related environment variable is automatically configured by this module.
This class is generally used in with `fluid.dygraph.DataParallel` to configure dynamic graph models
to run in parallel.
The parallel execution in dynamic mode needs to be started using ``paddle.distributed.launch``
or ``paddle.distributed.spawn`` .
Examples:
.. code-block:: python
# This example needs to run with paddle.distributed.launch, The usage is:
# python -m paddle.distributed.launch --selected_gpus=0,1 example.py
# And the content of `example.py` is the code of following example.
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.dygraph as dygraph
from paddle.fluid.optimizer import AdamOptimizer
from paddle.fluid.dygraph.nn import Linear
from paddle.fluid.dygraph.base import to_variable
place = fluid.CUDAPlace(fluid.dygraph.ParallelEnv().dev_id)
with fluid.dygraph.guard(place=place):
# prepare the data parallel context
strategy=dygraph.prepare_context()
linear = Linear(1, 10, act="softmax")
adam = fluid.optimizer.AdamOptimizer()
# make the module become the data parallelism module
linear = dygraph.DataParallel(linear, strategy)
x_data = np.random.random(size=[10, 1]).astype(np.float32)
data = to_variable(x_data)
hidden = linear(data)
avg_loss = fluid.layers.mean(hidden)
# scale the loss according to the number of trainers.
avg_loss = linear.scale_loss(avg_loss)
avg_loss.backward()
# collect the gradients of trainers.
linear.apply_collective_grads()
adam.minimize(avg_loss)
linear.clear_gradients()
import paddle
import paddle.distributed as dist
def train():
# 1. initialize parallel environment
dist.init_parallel_env()
# 2. get current ParallelEnv
parallel_env = dist.ParallelEnv()
print("rank: ", parallel_env.rank)
print("world_size: ", parallel_env.world_size)
# print result in process 1:
# rank: 1
# world_size: 2
# print result in process 2:
# rank: 2
# world_size: 2
if __name__ == '__main__':
# 1. start by ``paddle.distributed.spawn`` (default)
dist.spawn(train, nprocs=2)
# 2. start by ``paddle.distributed.launch``
# train()
"""

def __init__(self):
Expand Down
Loading

0 comments on commit 199da96

Please sign in to comment.