Skip to content

Commit

Permalink
[Distributed] adjust various APIs. (dmlc#1993)
Browse files Browse the repository at this point in the history
* rename get_data_size.

* remove g from DistTensor.

* remove g from DistEmbedding.

* clean up API of graph partition book.

* fix DistGraph

* fix lint.

* collect all part policies.

* fix.

* fix.

* support distributed sampler.

* remove partition.py
  • Loading branch information
zheng-da authored Aug 12, 2020
1 parent a6b44e7 commit d1cf5c3
Show file tree
Hide file tree
Showing 14 changed files with 147 additions and 182 deletions.
6 changes: 3 additions & 3 deletions examples/pytorch/graphsage/experimental/train_dist.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ def inference(self, g, x, batch_size, device):
# TODO: can we standardize this?
nodes = dgl.distributed.node_split(np.arange(g.number_of_nodes()),
g.get_partition_book(), force_even=True)
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_hidden), th.float32, 'h',
y = dgl.distributed.DistTensor((g.number_of_nodes(), self.n_hidden), th.float32, 'h',
persistent=True)
for l, layer in enumerate(self.layers):
if l == len(self.layers) - 1:
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_classes),
y = dgl.distributed.DistTensor((g.number_of_nodes(), self.n_classes),
th.float32, 'h_last', persistent=True)

sampler = NeighborSampler(g, [-1], dgl.distributed.sample_neighbors, device)
Expand Down Expand Up @@ -263,7 +263,7 @@ def main(args):
dgl.distributed.initialize(args.ip_config, args.num_servers, num_workers=args.num_workers)
if not args.standalone:
th.distributed.init_process_group(backend='gloo')
g = dgl.distributed.DistGraph(args.ip_config, args.graph_name, part_config=args.part_config)
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
print('rank:', g.rank())

pb = g.get_partition_book()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,11 +195,11 @@ def inference(self, g, x, batch_size, device):
# TODO: can we standardize this?
nodes = dgl.distributed.node_split(np.arange(g.number_of_nodes()),
g.get_partition_book(), force_even=True)
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_hidden), th.float32, 'h',
y = dgl.distributed.DistTensor((g.number_of_nodes(), self.n_hidden), th.float32, 'h',
persistent=True)
for l, layer in enumerate(self.layers):
if l == len(self.layers) - 1:
y = dgl.distributed.DistTensor(g, (g.number_of_nodes(), self.n_classes),
y = dgl.distributed.DistTensor((g.number_of_nodes(), self.n_classes),
th.float32, 'h_last', persistent=True)

sampler = PosNeighborSampler(g, [-1], dgl.distributed.sample_neighbors)
Expand Down Expand Up @@ -421,7 +421,7 @@ def main(args):
dgl.distributed.initialize(args.ip_config, args.num_servers, num_workers=args.num_workers)
if not args.standalone:
th.distributed.init_process_group(backend='gloo')
g = dgl.distributed.DistGraph(args.ip_config, args.graph_name, part_config=args.part_config)
g = dgl.distributed.DistGraph(args.graph_name, part_config=args.part_config)
print('rank:', g.rank())
print('number of edges', g.number_of_edges())

Expand Down
14 changes: 10 additions & 4 deletions python/dgl/dataloading/neighbor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Data loading components for neighbor sampling"""
from .dataloader import BlockSampler
from .. import sampling, subgraph
from .. import sampling, subgraph, distributed

class MultiLayerNeighborSampler(BlockSampler):
"""Sampler that builds computational dependency of node representations via
Expand Down Expand Up @@ -59,10 +59,16 @@ def __init__(self, fanouts, replace=False, return_eids=False):

def sample_frontier(self, block_id, g, seed_nodes):
fanout = self.fanouts[block_id]
if fanout is None:
frontier = subgraph.in_subgraph(g, seed_nodes)
if isinstance(g, distributed.DistGraph):
if fanout is None:
frontier = distributed.in_subgraph(g, seed_nodes)
else:
frontier = distributed.sample_neighbors(g, seed_nodes, fanout, replace=self.replace)
else:
frontier = sampling.sample_neighbors(g, seed_nodes, fanout, replace=self.replace)
if fanout is None:
frontier = subgraph.in_subgraph(g, seed_nodes)
else:
frontier = sampling.sample_neighbors(g, seed_nodes, fanout, replace=self.replace)
return frontier

class MultiLayerFullNeighborSampler(MultiLayerNeighborSampler):
Expand Down
58 changes: 46 additions & 12 deletions python/dgl/distributed/dist_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
from .. import backend as F
from ..base import NID, EID
from .kvstore import KVServer, get_kvstore
from .standalone_kvstore import KVClient as SA_KVClient
from .._ffi.ndarray import empty_shared_mem
from ..frame import infer_scheme
from .partition import load_partition, load_partition_book
Expand Down Expand Up @@ -142,7 +141,7 @@ def __init__(self, g):
name1 = _get_data_name(name, policy.policy_str)
dtype, shape, _ = g._client.get_data_meta(name1)
# We create a wrapper on the existing tensor in the kvstore.
self._data[name] = DistTensor(g, shape, dtype, name, part_policy=policy)
self._data[name] = DistTensor(shape, dtype, name, part_policy=policy)

def _get_names(self):
return list(self._data.keys())
Expand Down Expand Up @@ -188,7 +187,7 @@ def __init__(self, g):
name1 = _get_data_name(name, policy.policy_str)
dtype, shape, _ = g._client.get_data_meta(name1)
# We create a wrapper on the existing tensor in the kvstore.
self._data[name] = DistTensor(g, shape, dtype, name, part_policy=policy)
self._data[name] = DistTensor(shape, dtype, name, part_policy=policy)

def _get_names(self):
return list(self._data.keys())
Expand Down Expand Up @@ -321,23 +320,20 @@ class DistGraph:
Parameters
----------
ip_config : str
Path of IP configuration file.
graph_name : str
The name of the graph. This name has to be the same as the one used in DistGraphServer.
gpb : PartitionBook
The partition book object
part_config : str
The partition config file. It's used in the standalone mode.
'''
def __init__(self, ip_config, graph_name, gpb=None, part_config=None):
self.ip_config = ip_config
def __init__(self, graph_name, gpb=None, part_config=None):
self.graph_name = graph_name
self._gpb_input = gpb
if os.environ.get('DGL_DIST_MODE', 'standalone') == 'standalone':
assert part_config is not None, \
'When running in the standalone model, the partition config file is required'
self._client = SA_KVClient()
self._client = get_kvstore()
# Load graph partition data.
g, node_feats, edge_feats, self._gpb, _ = load_partition(part_config, 0)
assert self._gpb.num_partitions() == 1, \
Expand All @@ -349,6 +345,7 @@ def __init__(self, ip_config, graph_name, gpb=None, part_config=None):
self._client.add_data(_get_data_name(name, NODE_PART_POLICY), node_feats[name])
for name in edge_feats:
self._client.add_data(_get_data_name(name, EDGE_PART_POLICY), edge_feats[name])
self._client.map_shared_data(self._gpb)
rpc.set_num_client(1)
else:
self._init()
Expand Down Expand Up @@ -377,10 +374,10 @@ def _init(self):
self._client.map_shared_data(self._gpb)

def __getstate__(self):
return self.ip_config, self.graph_name, self._gpb
return self.graph_name, self._gpb

def __setstate__(self, state):
self.ip_config, self.graph_name, self._gpb_input = state
self.graph_name, self._gpb_input = state
self._init()

self._ndata = NodeDataView(self)
Expand Down Expand Up @@ -428,6 +425,43 @@ def edata(self):
"""
return self._edata

@property
def idtype(self):
"""The dtype of graph index
Returns
-------
backend dtype object
th.int32/th.int64 or tf.int32/tf.int64 etc.
See Also
--------
long
int
"""
return self._g.idtype

@property
def device(self):
"""Get the device context of this graph.
Examples
--------
The following example uses PyTorch backend.
>>> g = dgl.bipartite(([0, 1, 1, 2], [0, 0, 2, 1]), 'user', 'plays', 'game')
>>> print(g.device)
device(type='cpu')
>>> g = g.to('cuda:0')
>>> print(g.device)
device(type='cuda', index=0)
Returns
-------
Device context object
"""
return self._g.device

@property
def ntypes(self):
"""Return the list of node types of this graph.
Expand All @@ -439,7 +473,7 @@ def ntypes(self):
Examples
--------
>>> g = DistGraph("ip_config.txt", "test")
>>> g = DistGraph("test")
>>> g.ntypes
['_U']
"""
Expand All @@ -457,7 +491,7 @@ def etypes(self):
Examples
--------
>>> g = DistGraph("ip_config.txt", "test")
>>> g = DistGraph("test")
>>> g.etypes
['_E']
"""
Expand Down
36 changes: 20 additions & 16 deletions python/dgl/distributed/dist_tensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import os

from .graph_partition_book import PartitionPolicy, NODE_PART_POLICY, EDGE_PART_POLICY
from .dist_context import is_initialized
from ..base import DGLError
from .kvstore import get_kvstore
from .. import utils
from .. import backend as F

Expand Down Expand Up @@ -35,8 +34,6 @@ class DistTensor:
Parameters
----------
g : DistGraph
The distributed graph object.
shape : tuple
The shape of the tensor
dtype : dtype
Expand All @@ -50,27 +47,34 @@ class DistTensor:
persistent : bool
Whether the created tensor is persistent.
'''
def __init__(self, g, shape, dtype, name=None, init_func=None, part_policy=None,
def __init__(self, shape, dtype, name=None, init_func=None, part_policy=None,
persistent=False):
self.kvstore = g._client
self.kvstore = get_kvstore()
self._shape = shape
self._dtype = dtype

part_policies = self.kvstore.all_possible_part_policy
# If a user doesn't provide a partition policy, we should find one based on
# the input shape.
if part_policy is None:
assert shape[0] != g.number_of_nodes() or shape[0] != g.number_of_edges(), \
for policy_name in part_policies:
policy = part_policies[policy_name]
if policy.get_size() == shape[0]:
# If multiple partition policies match the input shape, we cannot
# decide which is the right one automatically. We should ask users
# to provide one.
assert part_policy is None, \
'Multiple partition policies match the input shape. ' \
+ 'Please provide a partition policy explicitly.'
part_policy = policy
assert part_policy is not None, \
'Cannot determine the partition policy. Please provide it.'
if shape[0] == g.number_of_nodes():
part_policy = PartitionPolicy(NODE_PART_POLICY, g.get_partition_book())
elif shape[0] == g.number_of_edges():
part_policy = PartitionPolicy(EDGE_PART_POLICY, g.get_partition_book())
else:
raise DGLError('Cannot determine the partition policy. Please provide it.')

self._part_policy = part_policy

if init_func is None:
init_func = _default_init_data
exist_names = g._client.data_name_list()
exist_names = self.kvstore.data_name_list()
# If a user doesn't provide a name, we generate a name ourselves.
# We need to generate the name in a deterministic way.
if name is None:
Expand All @@ -79,11 +83,11 @@ def __init__(self, g, shape, dtype, name=None, init_func=None, part_policy=None,
self._name = _get_data_name(name, part_policy.policy_str)
self._persistent = persistent
if self._name not in exist_names:
g._client.init_data(self._name, shape, dtype, part_policy, init_func)
self.kvstore.init_data(self._name, shape, dtype, part_policy, init_func)
self._owner = True
else:
self._owner = False
dtype1, shape1, _ = g._client.get_data_meta(self._name)
dtype1, shape1, _ = self.kvstore.get_data_meta(self._name)
assert dtype == dtype1, 'The dtype does not match with the existing tensor'
assert shape == shape1, 'The shape does not match with the existing tensor'

Expand Down
Loading

0 comments on commit d1cf5c3

Please sign in to comment.