Skip to content

Commit

Permalink
[Feature] Add batch and unbatch for immutable graph (dmlc#539)
Browse files Browse the repository at this point in the history
* Add batch and unbatch for immutable graph

* fix line seprator

* fix lintr

* remove unnecessary include

* fix code review
  • Loading branch information
VoVAllen authored and jermainewang committed May 20, 2019
1 parent aa12952 commit 4b76157
Show file tree
Hide file tree
Showing 4 changed files with 241 additions and 37 deletions.
43 changes: 43 additions & 0 deletions include/dgl/graph_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include <vector>
#include "graph.h"
#include "immutable_graph.h"

namespace dgl {

Expand Down Expand Up @@ -66,6 +67,48 @@ class GraphOp {
*/
static std::vector<Graph> DisjointPartitionBySizes(const Graph* graph, IdArray sizes);

/*!
* \brief Return a readonly disjoint union of the input graphs.
*
* The new readonly graph will include all the nodes/edges in the given graphs.
* Nodes/Edges will be relabled in the given sequence order by batching over CSR Graphs.
* For example, giving input [g1, g2, g3], where
* they have 5, 6, 7 nodes respectively. Then node#2 of g2 will become node#7
* in the result graph. Edge ids are re-assigned similarly.
*
* \param ImmutableGraph A list of input graphs to be unioned.
* \return the disjoint union of the ImmutableGraph
*/
static ImmutableGraph DisjointUnion(std::vector<const ImmutableGraph*> graphs);

/*!
* \brief Partition the ImmutableGraph into several immutable subgraphs.
*
* This is a reverse operation of DisjointUnion. The graph will be partitioned
* into num graphs. This requires the given number of partitions to evenly
* divides the number of nodes in the graph.
*
* \param graph The ImmutableGraph to be partitioned.
* \param num The number of partitions.
* \return a list of partitioned ImmutableGraph
*/
static std::vector<ImmutableGraph> DisjointPartitionByNum(const ImmutableGraph *graph,
int64_t num);

/*!
* \brief Partition the ImmutableGraph into several immutable subgraphs.
*
* This is a reverse operation of DisjointUnion. The graph will be partitioned
* based on the given sizes. This requires the sum of the given sizes is equal
* to the number of nodes in the graph.
*
* \param graph The ImmutableGraph to be partitioned.
* \param sizes The number of partitions.
* \return a list of partitioned ImmutableGraph
*/
static std::vector<ImmutableGraph> DisjointPartitionBySizes(const ImmutableGraph *batched_graph,
IdArray sizes);

/*!
* \brief Map vids in the parent graph to the vids in the subgraph.
*
Expand Down
108 changes: 73 additions & 35 deletions src/graph/graph_apis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,57 @@ PackedFunc ConvertSubgraphToPackedFunc(const Subgraph& sg) {

} // namespace

namespace {
// This namespace contains template functions for batching
// and unbatching over graph and immutable graph
template<typename T>
void DGLDisjointPartitionByNum(const T *gptr, DGLArgs args, DGLRetValue *rv) {
int64_t num = args[1];
const std::vector<T> &&rst = GraphOp::DisjointPartitionByNum(gptr, num);
// return the pointer array as an integer array
const int64_t len = rst.size();
NDArray ptr_array = NDArray::Empty({len}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t *ptr_array_data = static_cast<int64_t *>(ptr_array->data);
for (size_t i = 0; i < rst.size(); ++i) {
T *ptr = new T();
*ptr = std::move(rst[i]);
ptr_array_data[i] = reinterpret_cast<std::intptr_t>(ptr);
}
*rv = ptr_array;
}

template<typename T>
void DGLDisjointUnion(GraphHandle *inhandles, int list_size, DGLRetValue *rv) {
std::vector<const T *> graphs;
for (int i = 0; i < list_size; ++i) {
const GraphInterface *ptr = static_cast<const GraphInterface *>(inhandles[i]);
const T *gr = dynamic_cast<const T *>(ptr);
CHECK(gr) << "Error: Attempted to batch MutableGraph with ImmutableGraph";
graphs.push_back(gr);
}

T *gptr = new T();
*gptr = GraphOp::DisjointUnion(std::move(graphs));
GraphHandle ghandle = gptr;
*rv = ghandle;
}

template<typename T>
void DGLDisjointPartitionBySizes(const T *gptr, const IdArray sizes, DGLRetValue *rv) {
std::vector<T> &&rst = GraphOp::DisjointPartitionBySizes(gptr, sizes);
// return the pointer array as an integer array
const int64_t len = rst.size();
NDArray ptr_array = NDArray::Empty({len}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t *ptr_array_data = static_cast<int64_t *>(ptr_array->data);
for (size_t i = 0; i < rst.size(); ++i) {
T *ptr = new T();
*ptr = std::move(rst[i]);
ptr_array_data[i] = reinterpret_cast<std::intptr_t>(ptr);
}
*rv = ptr_array;
}
} // namespace

///////////////////////////// Graph API ///////////////////////////////////

DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphCreateMutable")
Expand Down Expand Up @@ -383,58 +434,45 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointUnion")
void* list = args[0];
GraphHandle* inhandles = static_cast<GraphHandle*>(list);
int list_size = args[1];
std::vector<const Graph*> graphs;
for (int i = 0; i < list_size; ++i) {
const GraphInterface *ptr = static_cast<const GraphInterface *>(inhandles[i]);
const Graph* gr = dynamic_cast<const Graph*>(ptr);
CHECK(gr) << "_CAPI_DGLDisjointUnion isn't implemented in immutable graph";
graphs.push_back(gr);
const GraphInterface *ptr = static_cast<const GraphInterface *>(inhandles[0]);
const ImmutableGraph *im_gr = dynamic_cast<const ImmutableGraph *>(ptr);
const Graph *gr = dynamic_cast<const Graph *>(ptr);
if (gr) {
DGLDisjointUnion<Graph>(inhandles, list_size, rv);
} else {
CHECK(im_gr) << "Args[0] is not a list of valid DGLGraph";
DGLDisjointUnion<ImmutableGraph>(inhandles, list_size, rv);
}
Graph* gptr = new Graph();
*gptr = GraphOp::DisjointUnion(std::move(graphs));
GraphHandle ghandle = gptr;
*rv = ghandle;
});

DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointPartitionByNum")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghandle);
const Graph* gptr = dynamic_cast<const Graph*>(ptr);
CHECK(gptr) << "_CAPI_DGLDisjointPartitionByNum isn't implemented in immutable graph";
int64_t num = args[1];
std::vector<Graph>&& rst = GraphOp::DisjointPartitionByNum(gptr, num);
// return the pointer array as an integer array
const int64_t len = rst.size();
NDArray ptr_array = NDArray::Empty({len}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t* ptr_array_data = static_cast<int64_t*>(ptr_array->data);
for (size_t i = 0; i < rst.size(); ++i) {
Graph* ptr = new Graph();
*ptr = std::move(rst[i]);
ptr_array_data[i] = reinterpret_cast<std::intptr_t>(ptr);
const ImmutableGraph* im_gptr = dynamic_cast<const ImmutableGraph*>(ptr);
if (gptr) {
DGLDisjointPartitionByNum(gptr, args, rv);
} else {
CHECK(im_gptr) << "Args[0] is not a valid DGLGraph";
DGLDisjointPartitionByNum(im_gptr, args, rv);
}
*rv = ptr_array;
});

DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLDisjointPartitionBySizes")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
const IdArray sizes = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghandle);
const Graph* gptr = dynamic_cast<const Graph*>(ptr);
CHECK(gptr) << "_CAPI_DGLDisjointPartitionBySizes isn't implemented in immutable graph";
const IdArray sizes = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[1]));
std::vector<Graph>&& rst = GraphOp::DisjointPartitionBySizes(gptr, sizes);
// return the pointer array as an integer array
const int64_t len = rst.size();
NDArray ptr_array = NDArray::Empty({len}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t* ptr_array_data = static_cast<int64_t*>(ptr_array->data);
for (size_t i = 0; i < rst.size(); ++i) {
Graph* ptr = new Graph();
*ptr = std::move(rst[i]);
ptr_array_data[i] = reinterpret_cast<std::intptr_t>(ptr);
const ImmutableGraph* im_gptr = dynamic_cast<const ImmutableGraph*>(ptr);
if (gptr) {
DGLDisjointPartitionBySizes(gptr, sizes, rv);
} else {
CHECK(im_gptr) << "Args[0] is not a valid DGLGraph";
DGLDisjointPartitionBySizes(im_gptr, sizes, rv);
}
*rv = ptr_array;
});
});

DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLineGraph")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
Expand Down
100 changes: 98 additions & 2 deletions src/graph/graph_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
* \brief Graph operation implementation
*/
#include <dgl/graph_op.h>
#include <dgl/immutable_graph.h>
#include <algorithm>
#include "../c_api_common.h"

namespace dgl {
namespace {
Expand Down Expand Up @@ -104,9 +106,103 @@ std::vector<Graph> GraphOp::DisjointPartitionBySizes(const Graph* graph, IdArray
return rst;
}


ImmutableGraph GraphOp::DisjointUnion(std::vector<const ImmutableGraph *> graphs) {
dgl_id_t num_nodes = 0;
dgl_id_t num_edges = 0;
for (const ImmutableGraph *gr : graphs) {
num_nodes += gr->NumVertices();
num_edges += gr->NumEdges();
}
ImmutableGraph::CSR::Ptr batched_csr_ptr = std::make_shared<ImmutableGraph::CSR>(num_nodes,
num_edges);
batched_csr_ptr->indptr[0] = 0;
dgl_id_t cum_num_nodes = 0;
dgl_id_t cum_num_edges = 0;
dgl_id_t indptr_idx = 1;
for (const ImmutableGraph *gr : graphs) {
const ImmutableGraph::CSR::Ptr &g_csrptr = gr->GetInCSR();
dgl_id_t g_num_nodes = g_csrptr->NumVertices();
dgl_id_t g_num_edges = g_csrptr->NumEdges();
ImmutableGraph::CSR::vector<dgl_id_t> &g_indices = g_csrptr->indices;
ImmutableGraph::CSR::vector<int64_t> &g_indptr = g_csrptr->indptr;
ImmutableGraph::CSR::vector<dgl_id_t> &g_edge_ids = g_csrptr->edge_ids;
for (dgl_id_t i = 1; i < g_indptr.size(); ++i) {
batched_csr_ptr->indptr[indptr_idx] = g_indptr[i] + cum_num_edges;
indptr_idx++;
}
for (dgl_id_t i = 0; i < g_indices.size(); ++i) {
batched_csr_ptr->indices.push_back(g_indices[i] + cum_num_nodes);
}

for (dgl_id_t i = 0; i < g_edge_ids.size(); ++i) {
batched_csr_ptr->edge_ids.push_back(g_edge_ids[i] + cum_num_edges);
}
cum_num_nodes += g_num_nodes;
cum_num_edges += g_num_edges;
}

return ImmutableGraph(batched_csr_ptr, nullptr);
}

std::vector<ImmutableGraph> GraphOp::DisjointPartitionByNum(const ImmutableGraph *graph,
int64_t num) {
CHECK(num != 0 && graph->NumVertices() % num == 0)
<< "Number of partitions must evenly divide the number of nodes.";
IdArray sizes = IdArray::Empty({num}, DLDataType{kDLInt, 64, 1}, DLContext{kDLCPU, 0});
int64_t *sizes_data = static_cast<int64_t *>(sizes->data);
std::fill(sizes_data, sizes_data + num, graph->NumVertices() / num);
return DisjointPartitionBySizes(graph, sizes);
}

std::vector<ImmutableGraph> GraphOp::DisjointPartitionBySizes(const ImmutableGraph *batched_graph,
IdArray sizes) {
const int64_t len = sizes->shape[0];
const int64_t *sizes_data = static_cast<int64_t *>(sizes->data);
std::vector<int64_t> cumsum;
cumsum.push_back(0);
for (int64_t i = 0; i < len; ++i) {
cumsum.push_back(cumsum[i] + sizes_data[i]);
}
CHECK_EQ(cumsum[len], batched_graph->NumVertices())
<< "Sum of the given sizes must equal to the number of nodes.";
std::vector<ImmutableGraph> rst;
const ImmutableGraph::CSR::Ptr &in_csr_ptr = batched_graph->GetInCSR();
ImmutableGraph::CSR::vector<int64_t> &bg_indptr = in_csr_ptr->indptr;
ImmutableGraph::CSR::vector<dgl_id_t> &bg_indices = in_csr_ptr->indices;
dgl_id_t cum_sum_edges = 0;
for (int64_t i = 0; i < len; ++i) {
int64_t start_pos = cumsum[i];
int64_t end_pos = cumsum[i + 1];
int64_t g_num_edges = bg_indptr[end_pos] - bg_indptr[start_pos];
ImmutableGraph::CSR::Ptr g_in_csr_ptr = std::make_shared<ImmutableGraph::CSR>(sizes_data[i],
g_num_edges);
ImmutableGraph::CSR::vector<int64_t> &g_indptr = g_in_csr_ptr->indptr;
ImmutableGraph::CSR::vector<dgl_id_t> &g_indices = g_in_csr_ptr->indices;
ImmutableGraph::CSR::vector<dgl_id_t> &g_edge_ids = g_in_csr_ptr->edge_ids;

for (int l = start_pos + 1; l < end_pos + 1; ++l) {
g_indptr[l - start_pos] = bg_indptr[l] - bg_indptr[start_pos];
}

for (int j = bg_indptr[start_pos]; j < bg_indptr[end_pos]; ++j) {
g_indices.push_back(bg_indices[j] - cumsum[i]);
}

for (int k = bg_indptr[start_pos]; k < bg_indptr[end_pos]; ++k) {
g_edge_ids.push_back(in_csr_ptr->edge_ids[k] - cum_sum_edges);
}

cum_sum_edges += g_num_edges;
ImmutableGraph graph(g_in_csr_ptr, nullptr);
rst.push_back(graph);
}
return rst;
}

IdArray GraphOp::MapParentIdToSubgraphId(IdArray parent_vids, IdArray query) {
CHECK(IsValidIdArray(parent_vids)) << "Invalid parent id array.";
CHECK(IsValidIdArray(query)) << "Invalid query id array.";
CHECK(dgl::IsValidIdArray(parent_vids)) << "Invalid parent id array.";
CHECK(dgl::IsValidIdArray(query)) << "Invalid query id array.";
const auto parent_len = parent_vids->shape[0];
const auto query_len = query->shape[0];
const dgl_id_t* parent_data = static_cast<dgl_id_t*>(parent_vids->data);
Expand Down
27 changes: 27 additions & 0 deletions tests/compute/test_batched_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,33 @@ def test_batch_unbatch1():
assert F.allclose(t2.ndata['h'], s3.ndata['h'])
assert F.allclose(t2.edata['h'], s3.edata['h'])

# Test batching readonly graphs
t1.readonly()
t2.readonly()
t1_was_readonly = t1.is_readonly
t2_was_readonly = t2.is_readonly
bg = dgl.batch([t1, t2])

assert t1.is_readonly == t1_was_readonly
assert t2.is_readonly == t2_was_readonly
assert bg.number_of_nodes() == 10
assert bg.number_of_edges() == 8
assert bg.batch_size == 2
assert bg.batch_num_nodes == [5, 5]
assert bg.batch_num_edges == [4, 4]

rs1, rs2 = dgl.unbatch(bg)
assert F.allclose(rs1.edges()[0], t1.edges()[0])
assert F.allclose(rs1.edges()[1], t1.edges()[1])
assert F.allclose(rs2.edges()[0], t2.edges()[0])
assert F.allclose(rs2.edges()[1], t2.edges()[1])
assert F.allclose(rs1.nodes(), t1.nodes())
assert F.allclose(rs2.nodes(), t2.nodes())
assert F.allclose(t1.ndata['h'], rs1.ndata['h'])
assert F.allclose(t1.edata['h'], rs1.edata['h'])
assert F.allclose(t2.ndata['h'], rs2.ndata['h'])
assert F.allclose(t2.edata['h'], rs2.edata['h'])

def test_batch_unbatch2():
# test setting/getting features after batch
a = dgl.DGLGraph()
Expand Down

0 comments on commit 4b76157

Please sign in to comment.