From 45ec21b033ce09f7f14a4d5a1081b6b34c5ee6ce Mon Sep 17 00:00:00 2001 From: Da Zheng Date: Thu, 29 Apr 2021 14:38:41 +0800 Subject: [PATCH] [Distributed] fix docs for distributed training. (#2840) * update distributed training doc. * explain data split. * fix message passing. * id mapping. * fix. * test data reshuffling. * fix a bug. * fix test. * Revert "fix test." This reverts commit 907f1dd88994894db94ffcd77b5995279f76b489. * Revert "fix a bug." This reverts commit ff0a4d893bbe4911a75ecb04bcbd9bc65315166b. * Revert "test data reshuffling." This reverts commit 99bb2f6405de67fc73652220b02e7184461c3e5e. Co-authored-by: Zheng --- docs/source/guide/distributed-apis.rst | 22 ++++++----- .../guide/distributed-preprocessing.rst | 37 +++++++++++++++---- docs/source/guide/distributed.rst | 2 - docs/source/guide/message-efficient.rst | 2 +- 4 files changed, 44 insertions(+), 19 deletions(-) diff --git a/docs/source/guide/distributed-apis.rst b/docs/source/guide/distributed-apis.rst index c043e22987b6..c59d181e539d 100644 --- a/docs/source/guide/distributed-apis.rst +++ b/docs/source/guide/distributed-apis.rst @@ -258,18 +258,22 @@ the same as single-process sampling. Split workloads ~~~~~~~~~~~~~~~ -Users need to split the training set so that each trainer works on its own subset. Similarly, +To train a model, users first need to split the dataset into training, validation and test sets. +For distributed training, this step is usually done before we invoke :func:`dgl.distributed.partition_graph` +to partition a graph. We recommend to store the data split in boolean arrays as node data or edge data. +For node classification tasks, the length of these boolean arrays is the number of nodes in a graph +and each of their elements indicates the existence of a node in a training/validation/test set. +Similar boolean arrays should be used for link prediction tasks. +:func:`dgl.distributed.partition_graph` splits these boolean arrays (because they are stored as +the node data or edge data of the graph) based on the graph partitioning +result and store them with graph partitions. + +During distributed training, users need to assign training nodes/edges to each trainer. Similarly, we also need to split the validation and test set in the same way. - -For distributed training and evaluation, the recommended approach is to use boolean arrays to -indicate the training/validation/test set. For node classification tasks, the length of these -boolean arrays is the number of nodes in a graph and each of their elements indicates the existence -of a node in a training/validation/test set. Similar boolean arrays should be used for -link prediction tasks. - DGL provides :func:`~dgl.distributed.node_split` and :func:`~dgl.distributed.edge_split` to split the training, validation and test set at runtime for distributed training. The two functions -take the boolean arrays as input, split them and return a portion for the local trainer. +take the boolean arrays constructed before graph partitioning as input, split them and +return a portion for the local trainer. By default, they ensure that all portions have the same number of nodes/edges. This is important for synchronous SGD, which assumes each trainer has the same number of mini-batches. diff --git a/docs/source/guide/distributed-preprocessing.rst b/docs/source/guide/distributed-preprocessing.rst index 81994659e738..170194d7915b 100644 --- a/docs/source/guide/distributed-preprocessing.rst +++ b/docs/source/guide/distributed-preprocessing.rst @@ -54,7 +54,7 @@ the graph structure of the partition as well as some metadata on nodes and edges |-- graph.dgl Load balancing -^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~ When partitioning a graph, by default, Metis only balances the number of nodes in each partition. This can result in suboptimal configuration, depending on the task at hand. For example, in the case @@ -79,9 +79,32 @@ the number of edges incident to the nodes of different types. The graph name will be used by :class:`dgl.distributed.DistGraph` to identify a distributed graph. A legal graph name should only contain alphabetic characters and underscores. +ID mapping +~~~~~~~~~~ + +:func:`dgl.distributed.partition_graph` shuffles node IDs and edge IDs during the partitioning and shuffles +node data and edge data accordingly. After training, we may need to save the computed node embeddings for +any downstream tasks. Therefore, we need to reshuffle the saved node embeddings according to their original +IDs. + +When `return_mapping=True`, :func:`dgl.distributed.partition_graph` returns the mappings between shuffled +node/edge IDs and their original IDs. For a homogeneous graph, it returns two vectors. The first +vector maps every shuffled node ID to its original ID; the second vector maps every shuffled edge ID to its +original ID. For a heterogeneous graph, it returns two dictionaries of vectors. The first dictionary contains +the mapping for each node type; the second dictionary contains the mapping for each edge type. + +.. code:: python + + node_map, edge_map = dgl.distributed.partition_graph(g, 'graph_name', 4, '/tmp/test', + balance_ntypes=g.ndata['train_mask'], + return_mapping=True) + # Let's assume that node_emb is saved from the distributed training. + orig_node_emb = th.zeros(node_emb.shape, dtype=node_emb.dtype) + orig_labels[node_map] = node_emb + 7.1.1 Distributed partitioning -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ For a large graph, DGL uses `ParMetis `__ to partition a graph in a cluster of machines. This solution requires users to prepare data for ParMETIS and use a DGL script @@ -90,7 +113,7 @@ a graph in a cluster of machines. This solution requires users to prepare data f **Note**: `convert_partition.py` uses the `pyarrow` package to load csv files. Please install `pyarrow`. ParMETIS Installation -^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~ ParMETIS requires METIS and GKLib. Please follow the instructions `here `__ to compile and install GKLib. For compiling and install METIS, please follow the instructions below to @@ -124,7 +147,7 @@ Before running ParMETIS, we need to set two environment variables: `PATH` and `L export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HOME/local/lib/ Input format for ParMETIS -^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~ The input graph for ParMETIS is stored in three files with the following names: `xxx_nodes.txt`, `xxx_edges.txt` and `xxx_stats.txt`, where `xxx` is a graph name. @@ -198,7 +221,7 @@ separated by whitespace: * `num_node_weights` stores the number of node weights in the node file. Run ParMETIS and output formats -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ParMETIS contains a command called `pm_dglpart`, which loads the graph stored in the three files from the machine where `pm_dglpart` is invoked, distributes data to all machines in @@ -247,7 +270,7 @@ processes to partition the graph named `xxx` into eight partitions (each process mpirun -np 4 pm_dglpart xxx 2 Convert ParMETIS outputs to DGLGraph -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ DGL provides a script named `convert_partition.py`, located in the `tools` directory, to convert the data in the partition files into :class:`dgl.DGLGraph` objects and save them into files. @@ -373,7 +396,7 @@ Below shows the demo code to construct the schema file. json.dump({'nid': nid_ranges, 'eid': eid_ranges}, outfile, indent=4) Construct node/edge features for a heterogeneous graph -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ :class:`dgl.DGLGraph` output by `convert_partition.py` stores a heterogeneous graph partition as a homogeneous graph. Its node data contains a field called `orig_id` to store the node IDs diff --git a/docs/source/guide/distributed.rst b/docs/source/guide/distributed.rst index 25d57dd73a44..9c1fb86654a7 100644 --- a/docs/source/guide/distributed.rst +++ b/docs/source/guide/distributed.rst @@ -67,8 +67,6 @@ to the cluster's machines and launch the training job on all machines. **Note**: The current distributed training API only supports the Pytorch backend. -**Note**: The current implementation only supports graphs with one node type and one edge type. - DGL implements a few distributed components to support distributed training. The figure below shows the components and their interactions. diff --git a/docs/source/guide/message-efficient.rst b/docs/source/guide/message-efficient.rst index 8e670ba73cb8..7b5f1c4a1111 100644 --- a/docs/source/guide/message-efficient.rst +++ b/docs/source/guide/message-efficient.rst @@ -32,7 +32,7 @@ implementation would be like: linear = nn.Parameter(torch.FloatTensor(size=(1, node_feat_dim * 2))) def concat_message_function(edges): - return {'cat_feat': torch.cat([edges.src.ndata['feat'], edges.dst.ndata['feat']])} + return {'cat_feat': torch.cat([edges.src['feat'], edges.dst['feat']])} g.apply_edges(concat_message_function) g.edata['out'] = g.edata['cat_feat'] * linear