Skip to content

Commit

Permalink
[BUGFIX] Fix bugs in shared mem graph store. (dmlc#630)
Browse files Browse the repository at this point in the history
* fix graph store for Pytorch.

* add test.

* fix dtype error in test

* disable test on GPU.

* test avoid windows.

* fix shared-memory test.

* use script to control testing environment.

* update test.

* enable all tests.

* fix test script.
  • Loading branch information
zheng-da authored Jun 9, 2019
1 parent fc7775a commit 2e9949d
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 32 deletions.
25 changes: 14 additions & 11 deletions python/dgl/contrib/graph_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,13 @@ def _get_edata_path(graph_name, edata_name):
def _get_graph_path(graph_name):
return "/" + graph_name

dtype_dict = F.data_type_dict
dtype_dict = {dtype_dict[key]:key for key in dtype_dict}

def _move_data_to_shared_mem_array(arr, name):
dlpack = F.zerocopy_to_dlpack(arr)
dgl_tensor = nd.from_dlpack(dlpack)
new_arr = empty_shared_mem(name, True, F.shape(arr), np.dtype(F.dtype(arr)).name)
new_arr = empty_shared_mem(name, True, F.shape(arr), dtype_dict[F.dtype(arr)])
dgl_tensor.copyto(new_arr)
dlpack = new_arr.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
Expand Down Expand Up @@ -344,7 +347,7 @@ def get_graph_info(graph_name):
def init_ndata(init, ndata_name, shape, dtype):
if ndata_name in self._graph.ndata:
ndata = self._graph.ndata[ndata_name]
assert np.all(ndata.shape == tuple(shape))
assert np.all(tuple(F.shape(ndata)) == tuple(shape))
return 0

assert self._graph.number_of_nodes() == shape[0]
Expand All @@ -357,7 +360,7 @@ def init_ndata(init, ndata_name, shape, dtype):
def init_edata(init, edata_name, shape, dtype):
if edata_name in self._graph.edata:
edata = self._graph.edata[edata_name]
assert np.all(edata.shape == tuple(shape))
assert np.all(tuple(F.shape(edata)) == tuple(shape))
return 0

assert self._graph.number_of_edges() == shape[0]
Expand All @@ -369,12 +372,12 @@ def init_edata(init, edata_name, shape, dtype):
# RPC command: get the names of all node embeddings.
def list_ndata():
ndata = self._graph.ndata
return [[key, F.shape(ndata[key]), np.dtype(F.dtype(ndata[key])).name] for key in ndata]
return [[key, tuple(F.shape(ndata[key])), dtype_dict[F.dtype(ndata[key])]] for key in ndata]

# RPC command: get the names of all edge embeddings.
def list_edata():
edata = self._graph.edata
return [[key, F.shape(edata[key]), np.dtype(F.dtype(edata[key])).name] for key in edata]
return [[key, tuple(F.shape(edata[key])), dtype_dict[F.dtype(edata[key])]] for key in edata]

# RPC command: notify the server of the termination of the client.
def terminate():
Expand Down Expand Up @@ -560,16 +563,16 @@ def __init__(self, graph_name, port):
# These two functions create initialized tensors on the server.
def node_initializer(init, name, shape, dtype, ctx):
init = self._init_manager.serialize(init)
dtype = np.dtype(dtype).name
self.proxy.init_ndata(init, name, shape, dtype)
dtype = dtype_dict[dtype]
self.proxy.init_ndata(init, name, tuple(shape), dtype)
data = empty_shared_mem(_get_ndata_path(self._graph_name, name),
False, shape, dtype)
dlpack = data.to_dlpack()
return F.zerocopy_from_dlpack(dlpack)
def edge_initializer(init, name, shape, dtype, ctx):
init = self._init_manager.serialize(init)
dtype = np.dtype(dtype).name
self.proxy.init_edata(init, name, shape, dtype)
dtype = dtype_dict[dtype]
self.proxy.init_edata(init, name, tuple(shape), dtype)
data = empty_shared_mem(_get_edata_path(self._graph_name, name),
False, shape, dtype)
dlpack = data.to_dlpack()
Expand Down Expand Up @@ -665,7 +668,7 @@ def init_ndata(self, ndata_name, shape, dtype, ctx=F.cpu()):
self._node_frame._frame._warn_and_set_initializer()
init = self._node_frame.get_initializer(ndata_name)
init = self._init_manager.serialize(init)
self.proxy.init_ndata(init, ndata_name, shape, dtype)
self.proxy.init_ndata(init, ndata_name, tuple(shape), dtype)
self._init_ndata(ndata_name, shape, dtype)

def init_edata(self, edata_name, shape, dtype, ctx=F.cpu()):
Expand Down Expand Up @@ -693,7 +696,7 @@ def init_edata(self, edata_name, shape, dtype, ctx=F.cpu()):
self._edge_frame._frame._warn_and_set_initializer()
init = self._edge_frame.get_initializer(edata_name)
init = self._init_manager.serialize(init)
self.proxy.init_edata(init, edata_name, shape, dtype)
self.proxy.init_edata(init, edata_name, tuple(shape), dtype)
self._init_edata(edata_name, shape, dtype)

def get_n_repr(self, u=ALL):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import random
import time
import numpy as np
from numpy.testing import assert_array_equal
from multiprocessing import Process, Manager
from scipy import sparse as spsp
import mxnet as mx
import backend as F
import unittest
import dgl.function as fn
Expand All @@ -20,12 +20,12 @@
def check_array_shared_memory(g, worker_id, arrays):
if worker_id == 0:
for i, arr in enumerate(arrays):
arr[0] = i
arr[0] = i + 10
g._sync_barrier(60)
else:
g._sync_barrier(60)
for i, arr in enumerate(arrays):
assert_almost_equal(arr[0].asnumpy(), i)
assert_almost_equal(F.asnumpy(arr[0]), i + 10)

def create_graph_store(graph_name):
for _ in range(10):
Expand Down Expand Up @@ -53,22 +53,24 @@ def check_init_func(worker_id, graph_name, return_dict):

src, dst = g.all_edges()
coo = csr.tocoo()
assert F.array_equal(dst, F.tensor(coo.row))
assert F.array_equal(src, F.tensor(coo.col))
assert F.array_equal(g.nodes[0].data['feat'], F.tensor(np.arange(10), dtype=np.float32))
assert F.array_equal(g.edges[0].data['feat'], F.tensor(np.arange(10), dtype=np.float32))
assert_array_equal(F.asnumpy(dst), coo.row)
assert_array_equal(F.asnumpy(src), coo.col)
feat = F.asnumpy(g.nodes[0].data['feat'])
assert_array_equal(np.squeeze(feat), np.arange(10, dtype=feat.dtype))
feat = F.asnumpy(g.edges[0].data['feat'])
assert_array_equal(np.squeeze(feat), np.arange(10, dtype=feat.dtype))
g.init_ndata('test4', (g.number_of_nodes(), 10), 'float32')
g.init_edata('test4', (g.number_of_edges(), 10), 'float32')
g._sync_barrier(60)
check_array_shared_memory(g, worker_id, [g.nodes[:].data['test4'], g.edges[:].data['test4']])

data = g.nodes[:].data['test4']
g.set_n_repr({'test4': mx.nd.ones((1, 10)) * 10}, u=[0])
assert_almost_equal(data[0].asnumpy(), np.squeeze(g.nodes[0].data['test4'].asnumpy()))
g.set_n_repr({'test4': F.ones((1, 10)) * 10}, u=[0])
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.nodes[0].data['test4'])))

data = g.edges[:].data['test4']
g.set_e_repr({'test4': mx.nd.ones((1, 10)) * 20}, edges=[0])
assert_almost_equal(data[0].asnumpy(), np.squeeze(g.edges[0].data['test4'].asnumpy()))
g.set_e_repr({'test4': F.ones((1, 10)) * 20}, edges=[0])
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.edges[0].data['test4'])))

g.destroy()
return_dict[worker_id] = 0
Expand All @@ -87,8 +89,10 @@ def server_func(num_workers, graph_name):
False, edge_dir="in", port=rand_port)
assert num_nodes == g._graph.number_of_nodes()
assert num_edges == g._graph.number_of_edges()
g.ndata['feat'] = mx.nd.arange(num_nodes * 10).reshape((num_nodes, 10))
g.edata['feat'] = mx.nd.arange(num_edges * 10).reshape((num_edges, 10))
nfeat = np.arange(0, num_nodes * 10).astype('float32').reshape((num_nodes, 10))
efeat = np.arange(0, num_edges * 10).astype('float32').reshape((num_edges, 10))
g.ndata['feat'] = F.tensor(nfeat)
g.edata['feat'] = F.tensor(efeat)
g.run()

def test_init():
Expand Down Expand Up @@ -122,31 +126,31 @@ def check_compute_func(worker_id, graph_name, return_dict):
# Test update all.
g.update_all(fn.copy_src(src='feat', out='m'), fn.sum(msg='m', out='preprocess'))
adj = g.adjacency_matrix()
tmp = mx.nd.dot(adj, g.nodes[:].data['feat'])
assert_almost_equal(g.nodes[:].data['preprocess'].asnumpy(), tmp.asnumpy())
tmp = F.spmm(adj, g.nodes[:].data['feat'])
assert_almost_equal(F.asnumpy(g.nodes[:].data['preprocess']), F.asnumpy(tmp))
g._sync_barrier(60)
check_array_shared_memory(g, worker_id, [g.nodes[:].data['preprocess']])

# Test apply nodes.
data = g.nodes[:].data['feat']
g.apply_nodes(func=lambda nodes: {'feat': mx.nd.ones((1, in_feats)) * 10}, v=0)
assert_almost_equal(data[0].asnumpy(), np.squeeze(g.nodes[0].data['feat'].asnumpy()))
g.apply_nodes(func=lambda nodes: {'feat': F.ones((1, in_feats)) * 10}, v=0)
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.nodes[0].data['feat'])))

# Test apply edges.
data = g.edges[:].data['feat']
g.apply_edges(func=lambda edges: {'feat': mx.nd.ones((1, in_feats)) * 10}, edges=0)
assert_almost_equal(data[0].asnumpy(), np.squeeze(g.edges[0].data['feat'].asnumpy()))
g.apply_edges(func=lambda edges: {'feat': F.ones((1, in_feats)) * 10}, edges=0)
assert_almost_equal(F.asnumpy(data[0]), np.squeeze(F.asnumpy(g.edges[0].data['feat'])))

g.init_ndata('tmp', (g.number_of_nodes(), 10), 'float32')
data = g.nodes[:].data['tmp']
# Test pull
g.pull(1, fn.copy_src(src='feat', out='m'), fn.sum(msg='m', out='tmp'))
assert_almost_equal(data[1].asnumpy(), np.squeeze(g.nodes[1].data['preprocess'].asnumpy()))
assert_almost_equal(F.asnumpy(data[1]), np.squeeze(F.asnumpy(g.nodes[1].data['preprocess'])))

# Test send_and_recv
in_edges = g.in_edges(v=2)
g.send_and_recv(in_edges, fn.copy_src(src='feat', out='m'), fn.sum(msg='m', out='tmp'))
assert_almost_equal(data[2].asnumpy(), np.squeeze(g.nodes[2].data['preprocess'].asnumpy()))
assert_almost_equal(F.asnumpy(data[2]), np.squeeze(F.asnumpy(g.nodes[2].data['preprocess'])))

g.destroy()
return_dict[worker_id] = 0
Expand Down
4 changes: 4 additions & 0 deletions tests/scripts/task_unit_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,7 @@ export DGL_DOWNLOAD_DIR=${PWD}
python3 -m nose -v --with-xunit tests/compute || fail "compute"
python3 -m nose -v --with-xunit tests/graph_index || fail "graph_index"
python3 -m nose -v --with-xunit tests/$DGLBACKEND || fail "backend-specific"
export OMP_NUM_THREADS=1
if [ $2 != "gpu" ]; then
python3 -m nose -v --with-xunit tests/distributed || fail "distributed"
fi

0 comments on commit 2e9949d

Please sign in to comment.