Skip to content

Commit

Permalink
add utils, update README, align implementations to the new README
Browse files Browse the repository at this point in the history
  • Loading branch information
jiguanglizipao committed May 11, 2022
1 parent 2e36df6 commit 4e77f77
Show file tree
Hide file tree
Showing 29 changed files with 1,016 additions and 842 deletions.
68 changes: 38 additions & 30 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,12 @@ link_directories(${Boost_LIBRARY_DIRS})
link_libraries(${Boost_LIBRARIES})

find_package(PkgConfig)
pkg_check_modules(UCX REQUIRED ucx)
include_directories(${UCX_INCLUDE_DIRS})
link_directories(${UCX_LIBRARY_DIRS})
link_libraries(${UCX_LIBRARIES})
pkg_check_modules(UCX ucx>=1.8)
if(UCX_FOUND)
include_directories(${UCX_INCLUDE_DIRS})
link_directories(${UCX_LIBRARY_DIRS})
link_libraries(${UCX_LIBRARIES})
endif()

include(CheckCXXCompilerFlag)
CHECK_CXX_COMPILER_FLAG("-march=native" COMPILER_SUPPORTS_MARCH_NATIVE)
Expand Down Expand Up @@ -86,6 +88,10 @@ if (ENABLE_WAL)
add_definitions("-DENABLE_WAL")
endif()

add_executable(convert_to_binary utils/convert_to_binary.cpp)
add_executable(convert_to_binary_random utils/convert_to_binary_random.cpp)
add_executable(convert_to_binary_timestamp utils/convert_to_binary_timestamp.cpp)

add_executable(wcc src/wcc.cpp)
add_executable(wcc_inc src/wcc_inc.cpp)
add_executable(wcc_inc_batch src/wcc_inc_batch.cpp)
Expand All @@ -106,37 +112,42 @@ add_executable(sssp_inc_rt src/sssp_inc_rt.cpp)

add_executable(sswp src/sswp.cpp)
add_executable(sswp_inc src/sswp_inc.cpp)
add_executable(sswp_inc_batch src/sswp_inc_batch.cpp)
add_executable(sswp_inc_clf src/sswp_inc_clf.cpp)
add_executable(sswp_inc_rt src/sswp_inc_rt.cpp)

add_executable(pagerank src/pagerank.cpp)
add_executable(pagerank_inc src/pagerank_inc.cpp)
if(UCX_FOUND)

add_executable(bfs_inc_rpc src/all_inc_rpc.cpp)
add_executable(wcc_inc_rpc src/all_inc_rpc.cpp)
add_executable(sssp_inc_rpc src/all_inc_rpc.cpp)
add_executable(sswp_inc_rpc src/all_inc_rpc.cpp)
target_compile_definitions(bfs_inc_rpc PRIVATE -DBFS)
target_compile_definitions(wcc_inc_rpc PRIVATE -DWCC)
target_compile_definitions(sssp_inc_rpc PRIVATE -DSSSP)
target_compile_definitions(sswp_inc_rpc PRIVATE -DSSWP)

add_executable(bfs_inc_rpc src/all_inc_rpc.cpp)
add_executable(wcc_inc_rpc src/all_inc_rpc.cpp)
add_executable(sssp_inc_rpc src/all_inc_rpc.cpp)
add_executable(sswp_inc_rpc src/all_inc_rpc.cpp)
target_compile_definitions(bfs_inc_rpc PRIVATE -DBFS)
target_compile_definitions(wcc_inc_rpc PRIVATE -DWCC)
target_compile_definitions(sssp_inc_rpc PRIVATE -DSSSP)
target_compile_definitions(sswp_inc_rpc PRIVATE -DSSWP)
add_executable(rpc_client src/rpc_client.cpp)
add_executable(rpc_kvs src/rpc_kvs.cpp)

add_executable(rpc_client src/rpc_client.cpp)
add_executable(rpc_kvs src/rpc_kvs.cpp)
add_executable(bfs_inc_rpc_txn src/all_inc_rpc_txn.cpp)
add_executable(wcc_inc_rpc_txn src/all_inc_rpc_txn.cpp)
add_executable(sssp_inc_rpc_txn src/all_inc_rpc_txn.cpp)
add_executable(sswp_inc_rpc_txn src/all_inc_rpc_txn.cpp)
target_compile_definitions(bfs_inc_rpc_txn PRIVATE -DBFS)
target_compile_definitions(wcc_inc_rpc_txn PRIVATE -DWCC)
target_compile_definitions(sssp_inc_rpc_txn PRIVATE -DSSSP)
target_compile_definitions(sswp_inc_rpc_txn PRIVATE -DSSWP)

add_executable(bfs_inc_rpc_txn src/all_inc_rpc_txn.cpp)
add_executable(wcc_inc_rpc_txn src/all_inc_rpc_txn.cpp)
add_executable(sssp_inc_rpc_txn src/all_inc_rpc_txn.cpp)
add_executable(sswp_inc_rpc_txn src/all_inc_rpc_txn.cpp)
target_compile_definitions(bfs_inc_rpc_txn PRIVATE -DBFS)
target_compile_definitions(wcc_inc_rpc_txn PRIVATE -DWCC)
target_compile_definitions(sssp_inc_rpc_txn PRIVATE -DSSSP)
target_compile_definitions(sswp_inc_rpc_txn PRIVATE -DSSWP)
add_executable(rpc_txn_client src/rpc_txn_client.cpp)

add_executable(rpc_txn_client src/rpc_txn_client.cpp)
add_executable(btree_bench src/btree_bench.cpp)
add_executable(ucx_stream_bench src/ucx_stream_bench.cpp)

add_executable(btree_bench src/btree_bench.cpp)
add_executable(ucx_stream_bench src/ucx_stream_bench.cpp)
add_executable(snbq14_inc_rpc src/snbq14_inc_rpc.cpp)
add_executable(rpc_client_snbq14 src/rpc_client_snbq14.cpp)

endif()

add_executable(bfs_inc_rate src/all_inc.cpp)
add_executable(wcc_inc_rate src/all_inc.cpp)
Expand All @@ -146,6 +157,3 @@ target_compile_definitions(bfs_inc_rate PRIVATE -DBFS)
target_compile_definitions(wcc_inc_rate PRIVATE -DWCC)
target_compile_definitions(sssp_inc_rate PRIVATE -DSSSP)
target_compile_definitions(sswp_inc_rate PRIVATE -DSSWP)

add_executable(snbq14_inc_rpc src/snbq14_inc_rpc.cpp)
add_executable(rpc_client_snbq14 src/rpc_client_snbq14.cpp)
152 changes: 150 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,156 @@ RisGraph: A Real-Time Streaming System for Evolving Graphs to Support Sub-millis
Guanyu Feng, Zixuan Ma, Daixuan Li, Shengqi Chen, Xiaowei Zhu, Wentao Han, Wenguang Chen.
SIGMOD/PODS '21: Proceedings of the 2021 International Conference on Management of Data. https://doi.org/10.1145/3448016.3457263

## Dependency
## System Dependency
- [CMake](https://gitlab.kitware.com/cmake/cmake)
- [TBB](https://github.com/oneapi-src/oneTBB)
- [UCX](https://github.com/openucx/ucx)
- OpenMP and C++17
- [Optional: UCX >= 1.8](https://github.com/openucx/ucx)

## Compilation
```bash
git clone https://github.com/thu-pacman/RisGraph.git --recursive
cd RisGraph
mkdir -p build
cd build
cmake .. -DCMAKE_BUILD_TYPE=Release
make -j
```

## Preprocessing

The graph format used by RisGraph is binary edge lists with 64-bit vertex IDs (in host byte ordering).
RisGraph provides some tools that converts edge lists in text format (such as [SNAP](https://snap.stanford.edu/data) datasets) to binary format.

RisGraph simulates sliding windows by inserting and deleting edges based on the sequence of edges from the input dataset.

### Sorting edges based on timestamps

```bash
# when edges are timestamped
# each line of text_graph_path is an edge with three integers
# source_vertex_id destination_vertex_id edge_timestamp

./convert_to_binary_timestamp < text_graph_path > binary_graph_path
```

### Randomly shuffling edges

```bash
# when edges have no specific order (for most of public datasets)
# each line of text_graph_path is an edge with two integers
# source_vertex_id destination_vertex_id

./convert_to_binary_random < text_graph_path > binary_graph_path
```

### Keeping the input ordering

```bash
# when edges are already in chronological or custom order
# each line of text_graph_path is an edge with two integers
# source_vertex_id destination_vertex_id

./convert_to_binary < text_graph_path > binary_graph_path
```

## Entire Graph Processing
These applications will process the entire graph.

### Breadth-First Search
```bash
./bfs binary_graph_path root
```

### Single Source Shortest Path
```bash
./sssp binary_graph_path root
```

### Single Source Widest Path
```bash
./sswp binary_graph_path root
```

### Weakly Connected Components
```bash
# edges are treated as undirected edges
./wcc binary_graph_path
```

## Incremental Processing
These applications will load the first `initial_edges_percent` edges from the graph, insert and delete an edge as an update (simulating sliding windows), and incrementally process the algorithm.

### Breadth-First Search
```bash
./bfs_inc binary_graph_path root initial_edges_percent
```

### Single Source Shortest Path
```bash
./sssp_inc binary_graph_path root initial_edges_percent
```

### Single Source Widest Path
```bash
./sswp_inc binary_graph_path root initial_edges_percent
```

### Weakly Connected Components
```bash
./wcc_inc binary_graph_path initial_edges_percent
```

## Incremental Processing with Safe/Unsafe Classification and Latency-aware Scheduler
These applications will load the first `initial_edges_percent` edges from the graph and simulate `num_of_clients` clients requesting updates with inserting/deleting an edge (simulating sliding windows).

RisGraph enables safe/unsafe classification for incrementally processing and tries to make `tail_latency_percent` updates are within `target_tail_latency` milliseconds through the latency-aware scheduler. It is recommended to set `num_of_clients` starting with the number of physical cores and try doubling `num_of_clients` until RisGraph cannot fulfill the expected tail latency.

### Breadth-First Search
```bash
./bfs_inc_rt binary_graph_path root initial_edges_percent target_tail_latency tail_latency_percent num_of_clients
```

### Single Source Shortest Path
```bash
./sssp_inc_rt binary_graph_path root initial_edges_percent target_tail_latency tail_latency_percent num_of_clients
```

### Single Source Widest Path
```bash
./sswp_inc_rt binary_graph_path root initial_edges_percent target_tail_latency tail_latency_percent num_of_clients
```

### Weakly Connected Components
```bash
./wcc_inc_rt binary_graph_path initial_edges_percent target_tail_latency tail_latency_percent num_of_clients
```


## Incremental Processing with Batched Updates
These applications will load the first `initial_edges_percent` edges from the graph, insert and delete `batch_size` edges as a batched update (simulating sliding windows), and incrementally process the algorithm.

### Breadth-First Search
```bash
./bfs_inc_batch binary_graph_path root initial_edges_percent batch_size
```

### Single Source Shortest Path
```bash
./sssp_inc_batch binary_graph_path root initial_edges_percent batch_size
```

### Single Source Widest Path
```bash
./sswp_inc_batch binary_graph_path root initial_edges_percent batch_size
```

### Weakly Connected Components
```bash
./wcc_inc_batch binary_graph_path initial_edges_percent batch_size
```


## Performance Tuning
* Binding threads to CPU cores: `export OMP_PROC_BIND=true`
* Trying different NUMA policies: In our experiments, `numactl --preferred=0` achieves relatively good performance in a wide range of cases.
20 changes: 10 additions & 10 deletions core/graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1164,12 +1164,12 @@ class Graph
ContinueReduceFunc continue_reduce_func,
UpdateFunc update_func,
ActiveResultFunc active_result_func,
std::vector<VertexTree<DataType>> &labels, const std::vector<edge_type> &edges, bool directed = true)
std::vector<VertexTree<DataType>> &labels, const std::vector<edge_type> &edges, const uint64_t &length, bool directed = true)
{
R total_result = 0;
if(edges.size() == 1)
if(length == 1)
{
for(uint64_t i=0;i<edges.size();i++)
for(uint64_t i=0;i<length;i++)
{
auto edge = edges[i];
total_result = update_tree_add<R>(continue_reduce_func, update_func, active_result_func, labels, edge, directed);
Expand All @@ -1178,8 +1178,8 @@ class Graph
}

active_in.clear();
THRESHOLD_OPENMP("omp parallel for", edges.size(),
for(uint64_t i=0;i<edges.size();i++)
THRESHOLD_OPENMP("omp parallel for", length,
for(uint64_t i=0;i<length;i++)
{
auto edge = edges[i];
if(update_func(edge.src, edge.dst, labels[edge.src].data, labels[edge.dst].data, edge).first)
Expand Down Expand Up @@ -1662,12 +1662,12 @@ class Graph
UpdateFunc update_func,
ActiveResultFunc active_result_func,
EqualFunc equal_func,
std::vector<VertexTree<DataType>> &labels, const std::vector<edge_type> &edges, bool directed = true)
std::vector<VertexTree<DataType>> &labels, const std::vector<edge_type> &edges, const uint64_t &length, bool directed = true)
{
R total_result = 0;
if(edges.size() == 1)
if(length == 1)
{
for(uint64_t i=0;i<edges.size();i++)
for(uint64_t i=0;i<length;i++)
{
auto edge = edges[i];
total_result = update_tree_del<R>(init_label_func, continue_reduce_func, update_func, active_result_func, equal_func, labels, edge, directed);
Expand All @@ -1678,8 +1678,8 @@ class Graph
active_in.clear();
active_tree.clear();
invalidated_idx++;
THRESHOLD_OPENMP("omp parallel for", edges.size(),
for(uint64_t i=0;i<edges.size();i++)
THRESHOLD_OPENMP("omp parallel for", length,
for(uint64_t i=0;i<length;i++)
{
auto edge = edges[i];
{
Expand Down
2 changes: 1 addition & 1 deletion deps/abseil-cpp
65 changes: 5 additions & 60 deletions src/bfs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,75 +90,20 @@ int main(int argc, char** argv)
return {vid==root?0:MAXL, vid==root};
};
{
auto start = std::chrono::system_clock::now();

graph.build_tree<uint64_t, uint64_t>(
init_label_func,
continue_reduce_print_func,
update_func,
active_result_func,
labels
);
}

//for(uint64_t i=imported_edges;i<raw_edges_len;i++)
//{
// if((i-imported_edges)%10000 == 0)
// {
// graph.get_dense_active_in().clear();
// graph.get_dense_active_in().set_bit(root);

// graph.stream_vertices<uint64_t>(
// [&](uint64_t vid)
// {
// labels[vid].data = init_label_func(vid);
// return 1;
// },
// graph.get_dense_active_all()
// );
// labels[root].data = 0;

// graph.build_tree<uint64_t, uint64_t>(
// continue_reduce_func,
// update_func,
// active_result_func,
// labels
// );
auto end = std::chrono::system_clock::now();
fprintf(stderr, "exec: %.6lfs\n", 1e-6*(uint64_t)std::chrono::duration_cast<std::chrono::microseconds>(end-start).count());
}

// std::vector<std::atomic_uint64_t> layer_counts(MAXL);
// for(auto &a : layer_counts) a = 0;
// graph.stream_vertices<uint64_t>(
// [&](uint64_t vid)
// {
// if(labels[vid].data != MAXL)
// {
// layer_counts[labels[vid].data]++;
// return 1;
// }
// return 0;
// },
// graph.get_dense_active_all()
// );
// for(uint64_t i=0;i<layer_counts.size();i++)
// {
// if(layer_counts[i] > 0)
// {
// printf("%lu ", layer_counts[i].load());
// }
// else
// {
// printf("\n");
// break;
// }
// }
// }
// {
// const auto &e = raw_edges[i];
// graph.add_edge({e.first, e.second}, true);
// }
// {
// const auto &e = raw_edges[i-imported_edges];
// graph.del_edge({e.first, e.second}, true);
// }
//}
{
std::vector<std::atomic_uint64_t> layer_counts(MAXL);
for(auto &a : layer_counts) a = 0;
Expand Down
Loading

0 comments on commit 4e77f77

Please sign in to comment.