Skip to content

Commit

Permalink
[Distributed] fix docs for distributed training. (dmlc#2840)
Browse files Browse the repository at this point in the history
* update distributed training doc.

* explain data split.

* fix message passing.

* id mapping.

* fix.

* test data reshuffling.

* fix a bug.

* fix test.

* Revert "fix test."

This reverts commit 907f1dd.

* Revert "fix a bug."

This reverts commit ff0a4d8.

* Revert "test data reshuffling."

This reverts commit 99bb2f6.

Co-authored-by: Zheng <[email protected]>
  • Loading branch information
zheng-da and Zheng authored Apr 29, 2021
1 parent 703d4b9 commit 45ec21b
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 19 deletions.
22 changes: 13 additions & 9 deletions docs/source/guide/distributed-apis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -258,18 +258,22 @@ the same as single-process sampling.
Split workloads
~~~~~~~~~~~~~~~

Users need to split the training set so that each trainer works on its own subset. Similarly,
To train a model, users first need to split the dataset into training, validation and test sets.
For distributed training, this step is usually done before we invoke :func:`dgl.distributed.partition_graph`
to partition a graph. We recommend to store the data split in boolean arrays as node data or edge data.
For node classification tasks, the length of these boolean arrays is the number of nodes in a graph
and each of their elements indicates the existence of a node in a training/validation/test set.
Similar boolean arrays should be used for link prediction tasks.
:func:`dgl.distributed.partition_graph` splits these boolean arrays (because they are stored as
the node data or edge data of the graph) based on the graph partitioning
result and store them with graph partitions.

During distributed training, users need to assign training nodes/edges to each trainer. Similarly,
we also need to split the validation and test set in the same way.

For distributed training and evaluation, the recommended approach is to use boolean arrays to
indicate the training/validation/test set. For node classification tasks, the length of these
boolean arrays is the number of nodes in a graph and each of their elements indicates the existence
of a node in a training/validation/test set. Similar boolean arrays should be used for
link prediction tasks.

DGL provides :func:`~dgl.distributed.node_split` and :func:`~dgl.distributed.edge_split` to
split the training, validation and test set at runtime for distributed training. The two functions
take the boolean arrays as input, split them and return a portion for the local trainer.
take the boolean arrays constructed before graph partitioning as input, split them and
return a portion for the local trainer.
By default, they ensure that all portions have the same number of nodes/edges. This is
important for synchronous SGD, which assumes each trainer has the same number of mini-batches.

Expand Down
37 changes: 30 additions & 7 deletions docs/source/guide/distributed-preprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ the graph structure of the partition as well as some metadata on nodes and edges
|-- graph.dgl
Load balancing
^^^^^^^^^^^^^^
~~~~~~~~~~~~~~

When partitioning a graph, by default, Metis only balances the number of nodes in each partition.
This can result in suboptimal configuration, depending on the task at hand. For example, in the case
Expand All @@ -79,9 +79,32 @@ the number of edges incident to the nodes of different types.
The graph name will be used by :class:`dgl.distributed.DistGraph` to identify a distributed graph.
A legal graph name should only contain alphabetic characters and underscores.

ID mapping
~~~~~~~~~~

:func:`dgl.distributed.partition_graph` shuffles node IDs and edge IDs during the partitioning and shuffles
node data and edge data accordingly. After training, we may need to save the computed node embeddings for
any downstream tasks. Therefore, we need to reshuffle the saved node embeddings according to their original
IDs.

When `return_mapping=True`, :func:`dgl.distributed.partition_graph` returns the mappings between shuffled
node/edge IDs and their original IDs. For a homogeneous graph, it returns two vectors. The first
vector maps every shuffled node ID to its original ID; the second vector maps every shuffled edge ID to its
original ID. For a heterogeneous graph, it returns two dictionaries of vectors. The first dictionary contains
the mapping for each node type; the second dictionary contains the mapping for each edge type.

.. code:: python
node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test',
balance_ntypes=g.ndata['train_mask'],
return_mapping=True)
# Let's assume that node_emb is saved from the distributed training.
orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype)
orig_labels[node_map] = node_emb
7.1.1 Distributed partitioning
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

For a large graph, DGL uses `ParMetis <http://glaros.dtc.umn.edu/gkhome/metis/parmetis/overview>`__ to partition
a graph in a cluster of machines. This solution requires users to prepare data for ParMETIS and use a DGL script
Expand All @@ -90,7 +113,7 @@ a graph in a cluster of machines. This solution requires users to prepare data f
**Note**: `convert_partition.py` uses the `pyarrow` package to load csv files. Please install `pyarrow`.

ParMETIS Installation
^^^^^^^^^^^^^^^^^^^^^
~~~~~~~~~~~~~~~~~~~~~

ParMETIS requires METIS and GKLib. Please follow the instructions `here <https://github.com/KarypisLab/GKlib>`__
to compile and install GKLib. For compiling and install METIS, please follow the instructions below to
Expand Down Expand Up @@ -124,7 +147,7 @@ Before running ParMETIS, we need to set two environment variables: `PATH` and `L
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/
Input format for ParMETIS
^^^^^^^^^^^^^^^^^^^^^^^^^
~~~~~~~~~~~~~~~~~~~~~~~~~

The input graph for ParMETIS is stored in three files with the following names: `xxx_nodes.txt`,
`xxx_edges.txt` and `xxx_stats.txt`, where `xxx` is a graph name.
Expand Down Expand Up @@ -198,7 +221,7 @@ separated by whitespace:
* `num_node_weights` stores the number of node weights in the node file.

Run ParMETIS and output formats
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

ParMETIS contains a command called `pm_dglpart`, which loads the graph stored in the three
files from the machine where `pm_dglpart` is invoked, distributes data to all machines in
Expand Down Expand Up @@ -247,7 +270,7 @@ processes to partition the graph named `xxx` into eight partitions (each process
mpirun -np 4 pm_dglpart xxx 2
Convert ParMETIS outputs to DGLGraph
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

DGL provides a script named `convert_partition.py`, located in the `tools` directory, to convert the data
in the partition files into :class:`dgl.DGLGraph` objects and save them into files.
Expand Down Expand Up @@ -373,7 +396,7 @@ Below shows the demo code to construct the schema file.
json.dump({'nid': nid_ranges, 'eid': eid_ranges}, outfile, indent=4)
Construct node/edge features for a heterogeneous graph
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

:class:`dgl.DGLGraph` output by `convert_partition.py` stores a heterogeneous graph partition
as a homogeneous graph. Its node data contains a field called `orig_id` to store the node IDs
Expand Down
2 changes: 0 additions & 2 deletions docs/source/guide/distributed.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ to the cluster's machines and launch the training job on all machines.

**Note**: The current distributed training API only supports the Pytorch backend.

**Note**: The current implementation only supports graphs with one node type and one edge type.

DGL implements a few distributed components to support distributed training. The figure below
shows the components and their interactions.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/guide/message-efficient.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ implementation would be like:
linear = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim * 2)))
def concat_message_function(edges):
return {'cat_feat': torch.cat([edges.src.ndata['feat'], edges.dst.ndata['feat']])}
return {'cat_feat': torch.cat([edges.src['feat'], edges.dst['feat']])}
g.apply_edges(concat_message_function)
g.edata['out'] = g.edata['cat_feat'] * linear
Expand Down

0 comments on commit 45ec21b

Please sign in to comment.