Skip to content

Commit

Permalink
[Perf] Accelerate block_compute when all nodes are invoked. (dmlc#434)
Browse files Browse the repository at this point in the history
* refactor.

* accelerate update_all in nodeflow.

* fix.

* refactor.

* fix lint.

* fix lint.

* reorganize.

* reorg.

* remove.

* add doc.

* impl block_incidence_matrix

* fix lint.

* fix.

* simple fix.

* fix test.

* fix interface.

* fix eid.

* fix comments.
  • Loading branch information
zheng-da authored Mar 7, 2019
1 parent ca2a7e1 commit 8651be5
Show file tree
Hide file tree
Showing 10 changed files with 510 additions and 85 deletions.
16 changes: 10 additions & 6 deletions include/dgl/immutable_graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -534,23 +534,27 @@ class ImmutableGraph: public GraphInterface {
return edge_list_;
}

protected:
DGLIdIters GetInEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;
DGLIdIters GetOutEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;

/*!
* \brief Get the CSR array that represents the in-edges.
* This method copies data from std::vector to IdArray.
* \param start the first row to copy.
* \param end the last row to copy (exclusive).
* \return the CSR array.
*/
CSRArray GetInCSRArray() const;
CSRArray GetInCSRArray(size_t start, size_t end) const;

/*!
* \brief Get the CSR array that represents the out-edges.
* This method copies data from std::vector to IdArray.
* \param start the first row to copy.
* \param end the last row to copy (exclusive).
* \return the CSR array.
*/
CSRArray GetOutCSRArray() const;
CSRArray GetOutCSRArray(size_t start, size_t end) const;

protected:
DGLIdIters GetInEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;
DGLIdIters GetOutEdgeIdRef(dgl_id_t src, dgl_id_t dst) const;

/*!
* \brief Compact a subgraph.
Expand Down
86 changes: 86 additions & 0 deletions include/dgl/nodeflow.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*!
* Copyright (c) 2019 by Contributors
* \file dgl/nodeflow.h
* \brief DGL NodeFlow class.
*/
#ifndef DGL_NODEFLOW_H_
#define DGL_NODEFLOW_H_

#include <vector>
#include <string>

#include "graph_interface.h"

namespace dgl {

class ImmutableGraph;

/*!
* \brief A NodeFlow graph stores the sampling results for a sampler that samples
* nodes/edges in layers.
*
* We store multiple layers of the sampling results in a single graph, which results
* in a more compact format. We store extra information,
* such as the node and edge mapping from the NodeFlow graph to the parent graph.
*/
struct NodeFlow {
/*! \brief The graph. */
GraphPtr graph;
/*!
* \brief the offsets of each layer.
*/
IdArray layer_offsets;
/*!
* \brief the offsets of each flow.
*/
IdArray flow_offsets;
/*!
* \brief The node mapping from the NodeFlow graph to the parent graph.
*/
IdArray node_mapping;
/*!
* \brief The edge mapping from the NodeFlow graph to the parent graph.
*/
IdArray edge_mapping;
};

/*!
* \brief Get a slice on a graph that represents a NodeFlow.
*
* The entire block has to be taken as a slice. Users have to specify the
* correct starting and ending location of a layer.
*
* If remap is false, the returned arrays can be viewed as a sub-matrix slice
* of the adjmat of the input graph. Let the adjmat of the input graph be A,
* then the slice is equal to (in numpy syntax):
* A[layer1_start:layer1_end, layer0_start:layer0_end]
*
* If remap is true, the returned arrays represents an adjacency matrix
* of shape NxM, where N is the number of nodes in layer1 and M is
* the number of nodes in layer0. Nodes in layer0 will be remapped to
* [0, M) and nodes in layer1 will be remapped to [0, N).
*
* A row of the returned adjacency matrix represents the destination
* of an edge and the column represents the source.
*
* If fmt == "csr", the function returns three arrays: indptr, indices, eid.
* If fmt == "coo", the function returns two arrays: idx, eid. Here, the idx array
* is the concatenation of src and dst node id arrays.
*
* \param graph An immutable graph.
* \param fmt the format of the returned adjacency matrix.
* \param layer0_size the size of the first layer in the block.
* \param layer1_start the location where the second layer starts.
* \param layer1_end the location where the secnd layer ends.
* \param remap Indicates to remap all vertex ids and edge Ids to local Id
* space.
* \return a vector of IdArrays.
*/
std::vector<IdArray> GetNodeFlowSlice(const ImmutableGraph &graph, const std::string &fmt,
size_t layer0_size, size_t layer1_start,
size_t layer1_end, bool remap);

} // namespace dgl

#endif // DGL_NODEFLOW_H_

30 changes: 1 addition & 29 deletions include/dgl/sampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,12 @@
#include <vector>
#include <string>
#include "graph_interface.h"
#include "nodeflow.h"

namespace dgl {

class ImmutableGraph;

/*!
* \brief A NodeFlow graph stores the sampling results for a sampler that samples
* nodes/edges in layers.
*
* We store multiple layers of the sampling results in a single graph, which results
* in a more compact format. We store extra information,
* such as the node and edge mapping from the NodeFlow graph to the parent graph.
*/
struct NodeFlow {
/*! \brief The graph. */
GraphPtr graph;
/*!
* \brief the offsets of each layer.
*/
IdArray layer_offsets;
/*!
* \brief the offsets of each flow.
*/
IdArray flow_offsets;
/*!
* \brief The node mapping from the NodeFlow graph to the parent graph.
*/
IdArray node_mapping;
/*!
* \brief The edge mapping from the NodeFlow graph to the parent graph.
*/
IdArray edge_mapping;
};

class SamplerOp {
public:
/*!
Expand Down
192 changes: 174 additions & 18 deletions python/dgl/nodeflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,161 @@ def block_parent_eid(self, block_id):
assert F.asnumpy(F.sum(ret == -1, 0)) == 0, "The eid in the parent graph is invalid."
return ret

def block_edges(self, block_id):
"""Return the edges in a block.
Parameters
----------
block_id : int
The specified block to return the edges.
Returns
-------
Tensor
The src nodes.
Tensor
The dst nodes.
Tensor
The edge ids.
"""
layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id]
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, "coo", layer0_size,
self._layer_offsets[block_id + 1],
self._layer_offsets[block_id + 2])
idx = utils.toindex(rst(0)).tousertensor()
eid = utils.toindex(rst(1))
num_edges = int(len(idx) / 2)
assert len(eid) == num_edges
return idx[num_edges:len(idx)], idx[0:num_edges], eid.tousertensor()

def block_adjacency_matrix(self, block_id, ctx):
"""Return the adjacency matrix representation for a specific block in a NodeFlow.
A row of the returned adjacency matrix represents the destination
of an edge and the column represents the source.
Parameters
----------
block_id : int
The specified block to return the adjacency matrix.
ctx : context
The context of the returned matrix.
Returns
-------
SparseTensor
The adjacency matrix.
Tensor
A index for data shuffling due to sparse format change. Return None
if shuffle is not required.
"""
fmt = F.get_preferred_sparse_format()
# We need to extract two layers.
layer0_size = self._layer_offsets[block_id + 1] - self._layer_offsets[block_id]
rst = _CAPI_NodeFlowGetBlockAdj(self._graph._handle, fmt, layer0_size,
self._layer_offsets[block_id + 1],
self._layer_offsets[block_id + 2])
num_rows = self.layer_size(block_id + 1)
num_cols = self.layer_size(block_id)

if fmt == "csr":
indptr = F.copy_to(utils.toindex(rst(0)).tousertensor(), ctx)
indices = F.copy_to(utils.toindex(rst(1)).tousertensor(), ctx)
shuffle = utils.toindex(rst(2))
dat = F.ones(indices.shape, dtype=F.float32, ctx=ctx)
return F.sparse_matrix(dat, ('csr', indices, indptr),
(num_rows, num_cols))[0], shuffle.tousertensor()
elif fmt == "coo":
## FIXME(minjie): data type
idx = F.copy_to(utils.toindex(rst(0)).tousertensor(), ctx)
m = self.block_size(block_id)
idx = F.reshape(idx, (2, m))
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
adj, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (num_rows, num_cols))
return adj, shuffle_idx
else:
raise Exception("unknown format")

def block_incidence_matrix(self, block_id, typestr, ctx):
"""Return the incidence matrix representation of the block.
An incidence matrix is an n x m sparse matrix, where n is
the number of nodes and m is the number of edges. Each nnz
value indicating whether the edge is incident to the node
or not.
There are three types of an incidence matrix `I`:
* "in":
- I[v, e] = 1 if e is the in-edge of v (or v is the dst node of e);
- I[v, e] = 0 otherwise.
* "out":
- I[v, e] = 1 if e is the out-edge of v (or v is the src node of e);
- I[v, e] = 0 otherwise.
* "both":
- I[v, e] = 1 if e is the in-edge of v;
- I[v, e] = -1 if e is the out-edge of v;
- I[v, e] = 0 otherwise (including self-loop).
Parameters
----------
block_id : int
The specified block to return the incidence matrix.
typestr : str
Can be either "in", "out" or "both"
ctx : context
The context of returned incidence matrix.
Returns
-------
SparseTensor
The incidence matrix.
Tensor
A index for data shuffling due to sparse format change. Return None
if shuffle is not required.
"""
src, dst, eid = self.block_edges(block_id)
src = F.copy_to(src, ctx) # the index of the ctx will be cached
dst = F.copy_to(dst, ctx) # the index of the ctx will be cached
eid = F.copy_to(eid, ctx) # the index of the ctx will be cached
if typestr == 'in':
n = self.layer_size(block_id + 1)
m = self.block_size(block_id)
row = F.unsqueeze(dst, 0)
col = F.unsqueeze(eid, 0)
idx = F.cat([row, col], dim=0)
# FIXME(minjie): data type
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
elif typestr == 'out':
n = self.layer_size(block_id)
m = self.block_size(block_id)
row = F.unsqueeze(src, 0)
col = F.unsqueeze(eid, 0)
idx = F.cat([row, col], dim=0)
# FIXME(minjie): data type
dat = F.ones((m,), dtype=F.float32, ctx=ctx)
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
elif typestr == 'both':
# TODO does it work for bipartite graph?
# first remove entries for self loops
mask = F.logical_not(F.equal(src, dst))
src = F.boolean_mask(src, mask)
dst = F.boolean_mask(dst, mask)
eid = F.boolean_mask(eid, mask)
n_entries = F.shape(src)[0]
# create index
row = F.unsqueeze(F.cat([src, dst], dim=0), 0)
col = F.unsqueeze(F.cat([eid, eid], dim=0), 0)
idx = F.cat([row, col], dim=0)
# FIXME(minjie): data type
x = -F.ones((n_entries,), dtype=F.float32, ctx=ctx)
y = F.ones((n_entries,), dtype=F.float32, ctx=ctx)
dat = F.cat([x, y], dim=0)
inc, shuffle_idx = F.sparse_matrix(dat, ('coo', idx), (n, m))
else:
raise DGLError('Invalid incidence matrix type: %s' % str(typestr))
return inc, shuffle_idx

def set_n_initializer(self, initializer, layer_id=ALL, field=None):
"""Set the initializer for empty node features.
Expand Down Expand Up @@ -651,12 +806,13 @@ def block_compute(self, block_id, message_func="default", reduce_func="default",
assert reduce_func is not None

if is_all(v):
dest_nodes = utils.toindex(self.layer_nid(block_id + 1))
u, v, _ = self._graph.in_edges(dest_nodes)
u = utils.toindex(self._glb2lcl_nid(u.tousertensor(), block_id))
v = utils.toindex(self._glb2lcl_nid(v.tousertensor(), block_id + 1))
dest_nodes = utils.toindex(F.arange(0, self.layer_size(block_id + 1)))
eid = utils.toindex(F.arange(0, self.block_size(block_id)))
with ir.prog() as prog:
scheduler.schedule_nodeflow_update_all(graph=self,
block_id=block_id,
message_func=message_func,
reduce_func=reduce_func,
apply_func=apply_node_func)
Runtime.run(prog)
else:
dest_nodes = utils.toindex(v)
u, v, eid = self._graph.in_edges(dest_nodes)
Expand All @@ -667,18 +823,18 @@ def block_compute(self, block_id, message_func="default", reduce_func="default",
block_id + 1))
eid = utils.toindex(self._glb2lcl_eid(eid.tousertensor(), block_id))

with ir.prog() as prog:
scheduler.schedule_nodeflow_compute(graph=self,
block_id=block_id,
u=u,
v=v,
eid=eid,
dest_nodes=dest_nodes,
message_func=message_func,
reduce_func=reduce_func,
apply_func=apply_node_func,
inplace=inplace)
Runtime.run(prog)
with ir.prog() as prog:
scheduler.schedule_nodeflow_compute(graph=self,
block_id=block_id,
u=u,
v=v,
eid=eid,
dest_nodes=dest_nodes,
message_func=message_func,
reduce_func=reduce_func,
apply_func=apply_node_func,
inplace=inplace)
Runtime.run(prog)

def prop_flow(self, message_funcs="default", reduce_funcs="default",
apply_node_funcs="default", flow_range=ALL, inplace=False):
Expand Down
Loading

0 comments on commit 8651be5

Please sign in to comment.