Skip to content

Commit

Permalink
[Feature]Uniform layer-wise sampler (dmlc#416)
Browse files Browse the repository at this point in the history
* migrate to node-flow

* uniform layer sampler test cases

* more test cases

* documentations

* fix lint errors

* fix lint errors

* fix lint errors

* iota

* add asnumpy

* requested changes

* fix indptr error

* fix lint errors

* requested changes & fix lint errors

* fix lint errors

* fix LayerSampler unit test
  • Loading branch information
GaiYu0 authored and zheng-da committed Mar 3, 2019
1 parent a88f351 commit fb4246e
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 11 deletions.
1 change: 1 addition & 0 deletions include/dgl/graph_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ typedef dgl::runtime::NDArray IdArray;
typedef dgl::runtime::NDArray DegreeArray;
typedef dgl::runtime::NDArray BoolArray;
typedef dgl::runtime::NDArray IntArray;
typedef dgl::runtime::NDArray FloatArray;

struct Subgraph;
struct NodeFlow;
Expand Down
15 changes: 15 additions & 0 deletions include/dgl/sampler.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#ifndef DGL_SAMPLER_H_
#define DGL_SAMPLER_H_

#include <vector>
#include <string>
#include "graph_interface.h"

Expand Down Expand Up @@ -61,6 +62,20 @@ class SamplerOp {
int num_hops, int expand_factor,
const bool add_self_loop);

/*!
* \brief Sample a graph from the seed vertices with layer sampling.
* The layers are sampled with a uniform distribution.
*
* \param graphs A graph for sampling.
* \param seeds the nodes where we should start to sample.
* \param edge_type the type of edges we should sample neighbors.
* \param layer_sizes The size of layers.
* \return a NodeFlow graph.
*/
static NodeFlow LayerUniformSample(const ImmutableGraph *graph, IdArray seed_array,
const std::string &neigh_type,
const std::vector<size_t> &layer_sizes);

/*!
* \brief Batch-generate random walk traces
* \param seeds The array of starting vertex IDs
Expand Down
2 changes: 1 addition & 1 deletion python/dgl/contrib/sampling/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
from .sampler import NeighborSampler
from .sampler import NeighborSampler, LayerSampler
from .randomwalk import *
74 changes: 64 additions & 10 deletions python/dgl/contrib/sampling/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,25 @@
except ImportError:
import queue

__all__ = ['NeighborSampler']
__all__ = ['NeighborSampler', 'LayerSampler']

class NSSubgraphLoader(object):
def __init__(self, g, batch_size, expand_factor, num_hops=1,
class SampledSubgraphLoader(object):
def __init__(self, g, batch_size, sampler,
expand_factor=None, num_hops=1, layer_sizes=None,
neighbor_type='in', node_prob=None, seed_nodes=None,
shuffle=False, num_workers=1, add_self_loop=False):
self._g = g
if not g._graph.is_readonly():
raise NotImplementedError("NodeFlow loader only support read-only graphs.")
self._batch_size = batch_size
self._expand_factor = expand_factor
self._num_hops = num_hops
self._sampler = sampler
if sampler == 'neighbor':
self._expand_factor = expand_factor
self._num_hops = num_hops
elif sampler == 'layer':
self._layer_sizes = layer_sizes
else:
raise NotImplementedError()
self._node_prob = node_prob
self._add_self_loop = add_self_loop
if self._node_prob is not None:
Expand Down Expand Up @@ -54,9 +61,13 @@ def _prefetch(self):
end = min((self._nflow_idx + 1) * self._batch_size, num_nodes)
seed_ids.append(utils.toindex(self._seed_nodes[start:end]))
self._nflow_idx += 1
sgi = self._g._graph.neighbor_sampling(seed_ids, self._expand_factor,
self._num_hops, self._neighbor_type,
self._node_prob, self._add_self_loop)
if self._sampler == 'neighbor':
sgi = self._g._graph.neighbor_sampling(seed_ids, self._expand_factor,
self._num_hops, self._neighbor_type,
self._node_prob, self._add_self_loop)
elif self._sampler == 'layer':
sgi = self._g._graph.layer_sampling(seed_ids, self._layer_sizes,
self._neighbor_type, self._node_prob)
nflows = [NodeFlow(self._g, i) for i in sgi]
self._nflows.extend(nflows)

Expand Down Expand Up @@ -264,8 +275,51 @@ def NeighborSampler(g, batch_size, expand_factor, num_hops=1,
generator
The generator of NodeFlows.
'''
loader = NSSubgraphLoader(g, batch_size, expand_factor, num_hops, neighbor_type, node_prob,
seed_nodes, shuffle, num_workers, add_self_loop)
loader = SampledSubgraphLoader(g, batch_size, 'neighbor',
expand_factor=expand_factor, num_hops=num_hops,
neighbor_type=neighbor_type, node_prob=node_prob,
seed_nodes=seed_nodes, shuffle=shuffle,
num_workers=num_workers)
if not prefetch:
return loader
else:
return _PrefetchingLoader(loader, num_prefetch=num_workers*2)

def LayerSampler(g, batch_size, layer_sizes,
neighbor_type='in', node_prob=None, seed_nodes=None,
shuffle=False, num_workers=1, prefetch=False):
'''Create a sampler that samples neighborhood.
This creates a NodeFlow loader that samples subgraphs from the input graph
with layer-wise sampling. This sampling method is implemented in C and can perform
sampling very efficiently.
The NodeFlow loader returns a list of NodeFlows.
The size of the NodeFlow list is the number of workers.
Parameters
----------
g: the DGLGraph where we sample NodeFlows.
batch_size: The number of NodeFlows in a batch.
layer_size: A list of layer sizes.
node_prob: the probability that a neighbor node is sampled.
Not implemented.
seed_nodes: a list of nodes where we sample NodeFlows from.
If it's None, the seed vertices are all vertices in the graph.
shuffle: indicates the sampled NodeFlows are shuffled.
num_workers: the number of worker threads that sample NodeFlows in parallel.
prefetch : bool, default False
Whether to prefetch the samples in the next batch.
Returns
-------
A NodeFlow iterator
The iterator returns a list of batched NodeFlows.
'''
loader = SampledSubgraphLoader(g, batch_size, 'layer', layer_sizes=layer_sizes,
neighbor_type=neighbor_type, node_prob=node_prob,
seed_nodes=seed_nodes, shuffle=shuffle,
num_workers=num_workers)
if not prefetch:
return loader
else:
Expand Down
42 changes: 42 additions & 0 deletions python/dgl/graph_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ._ffi.function import _init_api
from .base import DGLError
from . import backend as F
from . import ndarray as nd
from . import utils

GraphIndexHandle = ctypes.c_void_p
Expand Down Expand Up @@ -694,6 +695,25 @@ def neighbor_sampling(self, seed_ids, expand_factor, num_hops, neighbor_type,
utils.toindex(rst(num_subgs * 3 + i)),
utils.toindex(rst(num_subgs * 4 + i))) for i in range(num_subgs)]

def layer_sampling(self, seed_ids, layer_sizes, neighbor_type, node_prob=None):
"""Layer sampling"""
if len(seed_ids) == 0:
return []

seed_ids = [v.todgltensor() for v in seed_ids]
layer_sizes = nd.from_dlpack(F.zerocopy_to_dlpack(F.tensor(layer_sizes)))
if node_prob is None:
rst = _layer_uniform_sampling(self, seed_ids, neighbor_type, layer_sizes)
else:
raise NotImplementedError()

num_subgs = len(seed_ids)
return [NodeFlowIndex(rst(i), self, utils.toindex(rst(num_subgs + i)),
utils.toindex(rst(num_subgs * 2 + i)),
utils.toindex(rst(num_subgs * 3 + i)),
utils.toindex(rst(num_subgs * 4 + i))) for i in range(num_subgs)]


def random_walk(self, seeds, num_traces, num_hops):
"""Random walk sampling.
Expand Down Expand Up @@ -1151,3 +1171,25 @@ def _uniform_sampling(gidx, seed_ids, neigh_type, num_hops, expand_factor, add_s
return _NEIGHBOR_SAMPLING_APIS[len(seed_ids)](gidx._handle, *seed_ids, neigh_type,
num_hops, expand_factor, num_seeds,
add_self_loop)

_LAYER_SAMPLING_APIS = {
1: _CAPI_DGLGraphLayerUniformSampling,
2: _CAPI_DGLGraphLayerUniformSampling2,
4: _CAPI_DGLGraphLayerUniformSampling4,
8: _CAPI_DGLGraphLayerUniformSampling8,
16: _CAPI_DGLGraphLayerUniformSampling16,
32: _CAPI_DGLGraphLayerUniformSampling32,
64: _CAPI_DGLGraphLayerUniformSampling64,
128: _CAPI_DGLGraphLayerUniformSampling128,
}

def _layer_uniform_sampling(gidx, seed_ids, neigh_type, layer_sizes):
num_seeds = len(seed_ids)
empty_ids = []
if len(seed_ids) > 1 and len(seed_ids) not in _LAYER_SAMPLING_APIS.keys():
remain = 2**int(math.ceil(math.log2(len(dgl_ids)))) - len(dgl_ids)
empty_ids = _EMPTY_ARRAYS[0:remain]
seed_ids.extend([empty.todgltensor() for empty in empty_ids])
assert len(seed_ids) in _LAYER_SAMPLING_APIS.keys()
return _LAYER_SAMPLING_APIS[len(seed_ids)](gidx._handle, *seed_ids, neigh_type,
layer_sizes, num_seeds)
42 changes: 42 additions & 0 deletions src/graph/graph_apis.cc
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,48 @@ DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling64")
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphUniformSampling128")
.set_body(CAPI_NeighborUniformSample<128>);

template<int num_seeds>
void CAPI_LayerUniformSample(DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
std::vector<IdArray> seeds(num_seeds);
for (size_t i = 0; i < seeds.size(); i++)
seeds[i] = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[i + 1]));
std::string neigh_type = args[num_seeds + 1];
auto ls_array = IdArray::FromDLPack(CreateTmpDLManagedTensor(args[num_seeds + 2]));
size_t *ls_data = static_cast<size_t*>(ls_array->data);
size_t ls_len = ls_array->shape[0];
std::vector<size_t> layer_sizes;
std::copy(ls_data, ls_data + ls_len, std::back_inserter(layer_sizes));
const int num_valid_seeds = args[num_seeds + 3];
const GraphInterface *ptr = static_cast<const GraphInterface *>(ghandle);
const ImmutableGraph *gptr = dynamic_cast<const ImmutableGraph*>(ptr);
CHECK(gptr) << "sampling isn't implemented in mutable graph";
CHECK(num_valid_seeds <= num_seeds);
std::vector<NodeFlow> subgs(seeds.size());
#pragma omp parallel for
for (int i = 0; i < num_valid_seeds; i++) {
subgs[i] = SamplerOp::LayerUniformSample(gptr, seeds[i], neigh_type, layer_sizes);
}
*rv = ConvertSubgraphToPackedFunc(subgs);
}

DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling")
.set_body(CAPI_LayerUniformSample<1>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling2")
.set_body(CAPI_LayerUniformSample<2>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling4")
.set_body(CAPI_LayerUniformSample<4>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling8")
.set_body(CAPI_LayerUniformSample<8>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling16")
.set_body(CAPI_LayerUniformSample<16>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling32")
.set_body(CAPI_LayerUniformSample<32>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling64")
.set_body(CAPI_LayerUniformSample<64>);
DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphLayerUniformSampling128")
.set_body(CAPI_LayerUniformSample<128>);

DGL_REGISTER_GLOBAL("graph_index._CAPI_DGLGraphGetAdj")
.set_body([] (DGLArgs args, DGLRetValue* rv) {
GraphHandle ghandle = args[0];
Expand Down
Loading

0 comments on commit fb4246e

Please sign in to comment.