From e212a34114ade77c2da130e3d77280afd01cf2f0 Mon Sep 17 00:00:00 2001 From: Shivam Kapoor <4599890+iamsmkr@users.noreply.github.com> Date: Wed, 3 May 2023 15:21:29 +0100 Subject: [PATCH] Featuers/taint algo (#848) * init taint algo * add vertex in read_vec_partitions * impl generic taint convergence * add test * more tests * more tests * init taint notebook * init taint notebook * impl py test * impl generic taint notebook * impl load stablecoin rust * add taint tracking * fix lotr and modify generic taint apis * impl loader for stablecoins * revive this if need be * simplify stable coin impl * impl generic taint generic over string and u64 * impl stablecoin fetch and tests * create default dir * create default dir * accept data dir as args * create dirs always * add err msg --- examples/py/crypto/stable_coin_analysis.ipynb | 84 ++++ examples/rust/Cargo.toml | 3 + examples/rust/src/bin/crypto/main.rs | 54 ++ examples/rust/src/bin/lotr/main.rs | 183 ++----- python/src/algorithms.rs | 31 ++ python/src/graph.rs | 52 +- python/src/graph_loader.rs | 9 + python/src/lib.rs | 5 +- python/src/utils.rs | 49 ++ python/tests/test_graphdb.py | 114 +++-- .../src/algorithms/connected_components.rs | 16 +- raphtory/src/algorithms/generic_taint.rs | 465 ++++++++++++++++++ raphtory/src/algorithms/mod.rs | 26 + raphtory/src/algorithms/pagerank.rs | 14 +- raphtory/src/core/state.rs | 68 +-- raphtory/src/db/path.rs | 1 + raphtory/src/db/program.rs | 62 ++- raphtory/src/graph_loader/example/mod.rs | 1 + .../src/graph_loader/example/stable_coins.rs | 107 ++++ .../src/graph_loader/source/csv_loader.rs | 2 +- 20 files changed, 1062 insertions(+), 284 deletions(-) create mode 100644 examples/py/crypto/stable_coin_analysis.ipynb create mode 100644 examples/rust/src/bin/crypto/main.rs create mode 100644 raphtory/src/algorithms/generic_taint.rs create mode 100644 raphtory/src/graph_loader/example/stable_coins.rs diff --git a/examples/py/crypto/stable_coin_analysis.ipynb b/examples/py/crypto/stable_coin_analysis.ipynb new file mode 100644 index 0000000000..2202eac907 --- /dev/null +++ b/examples/py/crypto/stable_coin_analysis.ipynb @@ -0,0 +1,84 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "id": "5e76e492-a0b9-4199-8c46-bb10d1b15db2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'5': [(6, 13, '2')], '1': [(0, 11, '1')], '2': [(2, 12, '1'), (2, 11, '1'), (0, 11, '2')], '4': [(5, 12, '2')]}Time taken to complete step 1 = 0\n", + "\n", + "Time taken to check convergence = 0\n", + "Completed 1 steps in 0 secs\n", + "Time taken to complete step 1 = 0\n", + "Time taken to check convergence = 0\n" + ] + } + ], + "source": [ + "from raphtory import Graph\n", + "from raphtory import algorithms\n", + "\n", + "# actual data\n", + "g = Graph(1)\n", + "g.add_edge(10, 1, 3, {})\n", + "g.add_edge(11, 1, 2, {})\n", + "g.add_edge(12, 1, 2, {})\n", + "g.add_edge(9, 1, 2, {})\n", + "g.add_edge(12, 2, 4, {})\n", + "g.add_edge(13, 2, 5, {})\n", + "g.add_edge(14, 5, 5, {})\n", + "g.add_edge(14, 5, 4, {})\n", + "g.add_edge(5, 4, 6, {})\n", + "g.add_edge(15, 4, 7, {})\n", + "g.add_edge(10, 4, 7, {})\n", + "g.add_edge(10, 5, 8, {})\n", + "\n", + "actual = algorithms.generic_taint(g, 20, 11, [1, 2], [4, 5])\n", + "expected = {\n", + " '1': [(0, 11, '1')],\n", + " '2': [(2, 12, '1'), (2, 11, '1'), (0, 11, '2')],\n", + " '4': [(5, 12, '2')],\n", + " '5': [(6, 13, '2')],\n", + "}\n", + "\n", + "assert (actual == expected)\n", + "\n", + "print(actual)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "719e89be-da1f-4662-8055-1c1ee658116a", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python [conda env:pyraphtory] *", + "language": "python", + "name": "conda-env-pyraphtory-py" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.9" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/examples/rust/Cargo.toml b/examples/rust/Cargo.toml index 4fed73ccff..e323545a7c 100644 --- a/examples/rust/Cargo.toml +++ b/examples/rust/Cargo.toml @@ -27,6 +27,9 @@ name = "healthcheck" [[bin]] name = "hulongbay" +[[bin]] +name = "crypto" + [target.x86_64-unknown-linux-gnu] linker = "/usr/bin/clang" rustflags = ["-Clink-arg=-fuse-ld=lld", "-Clink-arg=-Wl,--no-rosegment"] diff --git a/examples/rust/src/bin/crypto/main.rs b/examples/rust/src/bin/crypto/main.rs new file mode 100644 index 0000000000..49e791d5a3 --- /dev/null +++ b/examples/rust/src/bin/crypto/main.rs @@ -0,0 +1,54 @@ +use std::env; +use std::path::Path; +use raphtory::algorithms::generic_taint::generic_taint; +use raphtory::db::view_api::*; +use raphtory::graph_loader::example::stable_coins::stable_coin_graph; +use serde::Deserialize; +use std::time::Instant; +use raphtory::algorithms::pagerank::unweighted_page_rank; + +#[derive(Deserialize, std::fmt::Debug)] +pub struct StableCoin { + block_number: String, + transaction_index: u32, + from_address: String, + to_address: String, + time_stamp: i64, + contract_address: String, + value: f64, +} + +fn main() { + let args: Vec = env::args().collect(); + + let data_dir = if args.len() < 2 { + None + } else { + Some(args.get(1).unwrap().to_string()) + }; + + let g = stable_coin_graph(data_dir, 1); + + assert_eq!(g.num_vertices(), 1523333); + assert_eq!(g.num_edges(), 2871269); + + println!("Pagerank"); + let now = Instant::now(); + let _ = unweighted_page_rank(&g, i64::MIN..i64::MAX, 20); + println!("Time taken: {} secs", now.elapsed().as_secs()); + + let now = Instant::now(); + let _ = unweighted_page_rank(&g.layer("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), i64::MIN..i64::MAX, 20); + println!("Time taken: {} secs", now.elapsed().as_secs()); + + println!("Generic taint"); + let now = Instant::now(); + let _ = generic_taint( + &g.layer("0xdac17f958d2ee523a2206206994597c13d831ec7").unwrap(), + 20, + 1651105815, + vec!["0xd30b438df65f4f788563b2b3611bd6059bff4ad9"], + vec![], + ); + println!("Time taken: {}", now.elapsed().as_secs()); +} diff --git a/examples/rust/src/bin/lotr/main.rs b/examples/rust/src/bin/lotr/main.rs index 5b607223f3..64ae3b89ba 100644 --- a/examples/rust/src/bin/lotr/main.rs +++ b/examples/rust/src/bin/lotr/main.rs @@ -8,6 +8,10 @@ use raphtory::graph_loader::source::csv_loader::CsvLoader; use serde::Deserialize; use std::path::PathBuf; use std::{env, path::Path, time::Instant}; +use itertools::Itertools; +use raphtory::algorithms::generic_taint::generic_taint; +use raphtory::db::view_api::internal::GraphViewInternalOps; +use raphtory::graph_loader::example::lotr_graph::lotr_graph; #[derive(Deserialize, std::fmt::Debug)] pub struct Lotr { @@ -55,30 +59,32 @@ fn main() { CsvLoader::new(data_dir) .load_into_graph(&g, |lotr: Lotr, g: &Graph| { - let src_id = utils::calculate_hash(&lotr.src_id); - let dst_id = utils::calculate_hash(&lotr.dst_id); - let time = lotr.time; - g.add_vertex( - time, - src_id, - &vec![("name".to_string(), Prop::Str("Character".to_string()))], - ); + lotr.time, + lotr.src_id.clone(), + &vec![ + ("type".to_string(), Prop::Str("Character".to_string())) + ], + ).expect("Failed to add vertex"); + g.add_vertex( - time, - src_id, - &vec![("name".to_string(), Prop::Str("Character".to_string()))], - ); + lotr.time, + lotr.dst_id.clone(), + &vec![ + ("type".to_string(), Prop::Str("Character".to_string())) + ], + ).expect("Failed to add vertex"); + g.add_edge( - time, - src_id, - dst_id, + lotr.time, + lotr.src_id.clone(), + lotr.dst_id.clone(), &vec![( - "name".to_string(), + "type".to_string(), Prop::Str("Character Co-occurrence".to_string()), )], None, - ); + ).expect("Failed to add edge"); }) .expect("Failed to load graph from CSV data files"); @@ -90,8 +96,8 @@ fn main() { now.elapsed().as_secs() ); - // g.save_to_file(encoded_data_dir) - // .expect("Failed to save graph"); + g.save_to_file(encoded_data_dir) + .expect("Failed to save graph"); g }; @@ -101,141 +107,10 @@ fn main() { let gandalf = utils::calculate_hash(&"Gandalf"); - assert_eq!(gandalf, 8703678510860200260); + assert_eq!(gandalf, 2760374808085341115); assert!(graph.has_vertex(gandalf)); + assert_eq!(graph.vertex(gandalf).unwrap().name(), "Gandalf"); - let program_s1 = TriangleCountS1 {}; - let program_s2 = TriangleCountS2 {}; - let agg = state::def::sum::(1); - - let mut gs = GlobalEvalState::new(graph.clone(), false); - - program_s1.run_step(&graph, &mut gs); - - program_s2.run_step(&graph, &mut gs); - - let actual_tri_count = gs.read_global_state(&agg); - - println!("Actual triangle count: {:?}", actual_tri_count); - - let program = TriangleCountSlowS2 {}; - let agg = state::def::sum::(0); - - let mut gs = GlobalEvalState::new(graph.clone(), false); - - program.run_step(&graph, &mut gs); - - let actual_tri_count = gs.read_global_state(&agg).map(|v| v / 3); - - println!("Actual triangle count: {:?}", actual_tri_count); - - // assert_eq!(v.in_degree().unwrap(), 24); - // assert_eq!(v.out_degree().unwrap(), 35); - // assert_eq!(v.degree().unwrap(), 49); - // - // let windowed_graph = graph.window(0, i64::MAX); - // let v = windowed_graph.vertex(gandalf).unwrap().unwrap(); - // - // assert_eq!(v.in_degree().unwrap(), 24); - // assert_eq!(v.out_degree().unwrap(), 35); - // assert_eq!(v.degree().unwrap(), 49); - // - // let windowed_graph = graph.window(100, 9000); - // let v = windowed_graph.vertex(gandalf).unwrap().unwrap(); - - // let windowed_graph = graph.window(100, 9000); - // let v = windowed_graph.vertex(gandalf).unwrap(); - - // let actual = v - // .out_edges() - // .map(|e| (e.src().id(), e.dst().id())) - // .collect::>(); - - // let expected = vec![ - // (13840129630991083248, 6768237561757024290), - // (13840129630991083248, 2582862946330553552), - // (13840129630991083248, 13415634039873497660), - // (13840129630991083248, 357812470600089148), - // (13840129630991083248, 17764752901005380738), - // (13840129630991083248, 6484040860173734298), - // (0, 2914346725110218071), - // (0, 5956895584314169235), - // (0, 12936471037316398897), - // (0, 13050559475682228465), - // (0, 13789593425373656861), - // (0, 14223985880962197705), - // ]; - - // let windowed_graph = graph.window(i64::MIN, i64::MAX); - // let v = windowed_graph.vertex(gandalf).unwrap().unwrap(); - // let actual = v - // .out_edges() - // .take(10) - // .map(|e| (e.src().id(), e.dst().id())) - // .collect::>(); - - // let windowed_graph = graph.window(i64::MIN, i64::MAX); - // let v = windowed_graph.vertex(gandalf).unwrap(); - // let actual = v - // .out_edges() - // .take(10) - // .map(|e| (e.src().id(), e.dst().id())) - // .collect::>(); - - // let expected: Vec<(u64, u64)> = vec![ - // (13840129630991083248, 12772980705568717046), - // (13840129630991083248, 6768237561757024290), - // (13840129630991083248, 11214194356141027632), - // (13840129630991083248, 2582862946330553552), - // (13840129630991083248, 13415634039873497660), - // (13840129630991083248, 6514938325906662882), - // (13840129630991083248, 13854913496482509346), - // (13840129630991083248, 357812470600089148), - // (13840129630991083248, 17764752901005380738), - // (13840129630991083248, 15044750458947305290), - // ]; - - // assert_eq!(actual, expected); - - // let windowed_graph = graph.window(i64::MIN, i64::MAX); - // let actual = windowed_graph - // .vertices() - // .take(10) - // .map(|tv| tv.id()) - // .collect::>(); - - // let expected: Vec = vec![ - // 13840129630991083248, - // 12772980705568717046, - // 8366058037510783370, - // 11638942476191275730, - // 6768237561757024290, - // 13652678879212650868, - // 10620258110842154986, - // 12687378031997996522, - // 11214194356141027632, - // 2582862946330553552, - // ]; - - // assert_eq!(actual, expected); - - // let windowed_graph = graph.window(0, 300); - // let actual = windowed_graph - // .vertices() - // .map(|v| v.id()) - // .collect::>(); - - // let expected = vec![ - // 13840129630991083248, - // 12772980705568717046, - // 8366058037510783370, - // 11638942476191275730, - // 12936471037316398897, - // 5956895584314169235, - // 5402476312775412883, - // 7320164159843417887, - // ]; - // assert_eq!(actual, expected); - - // triangle count global + let r = generic_taint(&graph, 20, 31930, vec!["Gandalf"], vec![]); + assert_eq!(r.keys().sorted().collect_vec(), vec!["Gandalf", "Saruman", "Wormtongue"]) } diff --git a/python/src/algorithms.rs b/python/src/algorithms.rs index 1b891dc589..1c7666d28b 100644 --- a/python/src/algorithms.rs +++ b/python/src/algorithms.rs @@ -3,9 +3,12 @@ /// To run an algorithm simply import the module and call the function with the graph as the argument /// use crate::graph_view::PyGraphView; +use itertools::Itertools; +use pyo3::exceptions::PyTypeError; use std::collections::HashMap; use crate::utils; +use crate::utils::{extract_input_vertex, InputVertexBox}; use pyo3::prelude::*; use raphtory::algorithms::degree::{ average_degree as average_degree_rs, max_in_degree as max_in_degree_rs, @@ -13,11 +16,13 @@ use raphtory::algorithms::degree::{ min_out_degree as min_out_degree_rs, }; use raphtory::algorithms::directed_graph_density::directed_graph_density as directed_graph_density_rs; +use raphtory::algorithms::generic_taint::generic_taint as generic_taint_rs; use raphtory::algorithms::local_clustering_coefficient::local_clustering_coefficient as local_clustering_coefficient_rs; use raphtory::algorithms::local_triangle_count::local_triangle_count as local_triangle_count_rs; use raphtory::algorithms::reciprocity::{ all_local_reciprocity as all_local_reciprocity_rs, global_reciprocity as global_reciprocity_rs, }; +use raphtory::core::vertex::InputVertex; /// Local triangle count - calculates the number of triangles (a cycle of length 3) for a node. /// It measures the local clustering of a graph. @@ -41,6 +46,32 @@ pub(crate) fn local_triangle_count(g: &PyGraphView, v: &PyAny) -> PyResult, + stop_nodes: Vec<&PyAny>, +) -> PyResult>> { + let infected_nodes: PyResult> = infected_nodes + .into_iter() + .map(|v| extract_input_vertex(v)) + .collect(); + let stop_nodes: PyResult> = stop_nodes + .into_iter() + .map(|v| extract_input_vertex(v)) + .collect(); + + Ok(generic_taint_rs( + &g.graph, + iter_count, + start_time, + infected_nodes?, + stop_nodes?, + )) +} + /// Local Clustering coefficient - measures the degree to which nodes in a graph tend to cluster together. /// /// It is calculated by dividing the number of triangles (sets of three nodes that are all diff --git a/python/src/graph.rs b/python/src/graph.rs index a89b76446d..c65f691530 100644 --- a/python/src/graph.rs +++ b/python/src/graph.rs @@ -5,15 +5,14 @@ //! It is a wrapper around a set of shards, which are the actual graph data structures. //! In Python, this class wraps around the rust graph. -use crate::dynamic::{DynamicGraph, IntoDynamic}; +use crate::dynamic::IntoDynamic; use crate::graph_view::PyGraphView; -use crate::utils::adapt_result; +use crate::utils::{adapt_result, extract_input_vertex, InputVertexBox}; use crate::wrappers::prop::Prop; use itertools::Itertools; -use pyo3::exceptions::{PyException, PyTypeError}; +use pyo3::exceptions::PyException; use pyo3::prelude::*; use raphtory::core as dbc; -use raphtory::core::vertex::InputVertex; use raphtory::db::graph::Graph; use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -215,49 +214,6 @@ impl PyGraph { /// Arguments: /// id (str or int): The id of the vertex. pub(crate) fn extract_id(id: &PyAny) -> PyResult { - match id.extract::() { - Ok(string) => Ok(InputVertexBox::new(string)), - Err(_) => { - let msg = "IDs need to be strings or an unsigned integers"; - let number = id.extract::().map_err(|_| PyTypeError::new_err(msg))?; - Ok(InputVertexBox::new(number)) - } - } - } -} - -/// A trait for vertices that can be used as input for the graph. -/// This allows us to add vertices with different types of ids, either strings or ints. -#[derive(Clone)] -pub struct InputVertexBox { - id: u64, - name_prop: Option, -} - -/// Implementation for vertices that can be used as input for the graph. -/// This allows us to add vertices with different types of ids, either strings or ints. -impl InputVertexBox { - pub(crate) fn new(vertex: T) -> InputVertexBox - where - T: InputVertex, - { - InputVertexBox { - id: vertex.id(), - name_prop: vertex.name_prop(), - } - } -} - -/// Implementation for vertices that can be used as input for the graph. -/// This allows us to add vertices with different types of ids, either strings or ints. -impl InputVertex for InputVertexBox { - /// Returns the id of the vertex. - fn id(&self) -> u64 { - self.id - } - - /// Returns the name property of the vertex. - fn name_prop(&self) -> Option { - self.name_prop.clone() + extract_input_vertex(id) } } diff --git a/python/src/graph_loader.rs b/python/src/graph_loader.rs index 3bffa7978b..55a42111ff 100644 --- a/python/src/graph_loader.rs +++ b/python/src/graph_loader.rs @@ -73,6 +73,15 @@ pub(crate) fn reddit_hyperlink_graph(shards: usize, timeout_seconds: u64) -> PyR raphtory::graph_loader::example::reddit_hyperlinks::reddit_graph(shards, timeout_seconds), ) } + +#[pyfunction] +#[pyo3(signature = (path=None,shards=1))] +pub(crate) fn stable_coin_graph(path: Option, shards: usize) -> PyResult> { + PyGraph::py_from_db_graph( + raphtory::graph_loader::example::stable_coins::stable_coin_graph(path, shards), + ) +} + #[pyfunction] #[pyo3(signature = (uri,username,password,database="neo4j".to_string(),shards=1))] pub(crate) fn neo4j_movie_graph( diff --git a/python/src/lib.rs b/python/src/lib.rs index 567a1b4930..297aa35993 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -16,9 +16,6 @@ pub mod vertex; pub mod wrappers; use crate::algorithms::*; -use crate::algorithms::{ - all_local_reciprocity, global_clustering_coefficient, global_reciprocity, triplet_count, -}; use crate::graph::PyGraph; use crate::graph_gen::*; use crate::graph_loader::*; @@ -38,6 +35,7 @@ fn raphtory(py: Python<'_>, m: &PyModule) -> PyResult<()> { algorithm_module )?)?; algorithm_module.add_function(wrap_pyfunction!(local_triangle_count, algorithm_module)?)?; + algorithm_module.add_function(wrap_pyfunction!(generic_taint, algorithm_module)?)?; algorithm_module.add_function(wrap_pyfunction!( local_clustering_coefficient, algorithm_module @@ -57,6 +55,7 @@ fn raphtory(py: Python<'_>, m: &PyModule) -> PyResult<()> { graph_loader_module )?)?; graph_loader_module.add_function(wrap_pyfunction!(neo4j_movie_graph, graph_loader_module)?)?; + graph_loader_module.add_function(wrap_pyfunction!(stable_coin_graph, graph_loader_module)?)?; m.add_submodule(graph_loader_module)?; let graph_gen_module = PyModule::new(py, "graph_gen")?; diff --git a/python/src/utils.rs b/python/src/utils.rs index b91d94434c..cfa049ce19 100644 --- a/python/src/utils.rs +++ b/python/src/utils.rs @@ -11,6 +11,8 @@ use raphtory::core::time::Interval; use raphtory::db::view_api::time::WindowSet; use raphtory::db::view_api::TimeOps; use std::error::Error; +use raphtory::core::vertex::InputVertex; +use raphtory::core as dbc; /// Extract a `VertexRef` from a Python object. /// The object can be a `str`, `u64` or `PyVertex`. @@ -114,3 +116,50 @@ impl TryFrom for Interval { value.interval } } + +/// A trait for vertices that can be used as input for the graph. +/// This allows us to add vertices with different types of ids, either strings or ints. +#[derive(Clone, Debug)] +pub struct InputVertexBox { + id: u64, + name_prop: Option, +} + +/// Implementation for vertices that can be used as input for the graph. +/// This allows us to add vertices with different types of ids, either strings or ints. +impl InputVertexBox { + pub(crate) fn new(vertex: T) -> InputVertexBox + where + T: InputVertex, + { + InputVertexBox { + id: vertex.id(), + name_prop: vertex.name_prop(), + } + } +} + +/// Implementation for vertices that can be used as input for the graph. +/// This allows us to add vertices with different types of ids, either strings or ints. +impl InputVertex for InputVertexBox { + /// Returns the id of the vertex. + fn id(&self) -> u64 { + self.id + } + + /// Returns the name property of the vertex. + fn name_prop(&self) -> Option { + self.name_prop.clone() + } +} + +pub(crate) fn extract_input_vertex(id: &PyAny) -> PyResult { + match id.extract::() { + Ok(string) => Ok(InputVertexBox::new(string)), + Err(_) => { + let msg = "IDs need to be strings or an unsigned integers"; + let number = id.extract::().map_err(|_| PyTypeError::new_err(msg))?; + Ok(InputVertexBox::new(number)) + } + } +} diff --git a/python/tests/test_graphdb.py b/python/tests/test_graphdb.py index 8d74eed1cb..c7fe619e55 100644 --- a/python/tests/test_graphdb.py +++ b/python/tests/test_graphdb.py @@ -331,34 +331,39 @@ def no_static_property_test(key, value): assert g.vertex(1).properties(include_static=False) == {'prop 2': 0.9, 'prop 3': 'hello', 'prop 1': 2, 'prop 4': True} assert g.vertices.properties(include_static=False).collect() == [{'prop 2': 0.9, 'prop 3': 'hello', 'prop 1': 2, - 'prop 4': True}] - assert g.vertices.out_neighbours().properties(include_static=False).collect() == [[{'prop 2': 0.9, 'prop 3': 'hello', 'prop 1': 2, - 'prop 4': True}]] + 'prop 4': True}] + assert g.vertices.out_neighbours().properties(include_static=False).collect() == [ + [{'prop 2': 0.9, 'prop 3': 'hello', 'prop 1': 2, + 'prop 4': True}]] assert g.at(2).vertex(1).properties() == {'prop 1': 2, 'prop 4': False, 'prop 2': 0.6, 'static prop': 123, 'prop 3': 'hi'} assert g.at(2).vertices.properties().collect() == [{'prop 1': 2, 'prop 4': False, 'prop 2': 0.6, 'static prop': 123, - 'prop 3': 'hi'}] - assert g.at(2).vertices.out_neighbours().properties().collect() == [[{'prop 1': 2, 'prop 4': False, 'prop 2': 0.6, 'static prop': 123, - 'prop 3': 'hi'}]] + 'prop 3': 'hi'}] + assert g.at(2).vertices.out_neighbours().properties().collect() == [ + [{'prop 1': 2, 'prop 4': False, 'prop 2': 0.6, 'static prop': 123, + 'prop 3': 'hi'}]] # testing property histories assert g.vertex(1).property_histories() == {'prop 3': [(1, 'hi'), (3, 'hello')], 'prop 1': [(1, 1), (2, 2)], 'prop 4': [(1, True), (2, False), (3, True)], 'prop 2': [(2, 0.6), (3, 0.9)]} - assert g.vertices.property_histories().collect() == [{'prop 3': [(1, 'hi'), (3, 'hello')], 'prop 1': [(1, 1), (2, 2)], - 'prop 4': [(1, True), (2, False), (3, True)], - 'prop 2': [(2, 0.6), (3, 0.9)]}] - assert g.vertices.out_neighbours().property_histories().collect() == [[{'prop 3': [(1, 'hi'), (3, 'hello')], 'prop 1': [(1, 1), (2, 2)], - 'prop 4': [(1, True), (2, False), (3, True)], - 'prop 2': [(2, 0.6), (3, 0.9)]}]] + assert g.vertices.property_histories().collect() == [ + {'prop 3': [(1, 'hi'), (3, 'hello')], 'prop 1': [(1, 1), (2, 2)], + 'prop 4': [(1, True), (2, False), (3, True)], + 'prop 2': [(2, 0.6), (3, 0.9)]}] + assert g.vertices.out_neighbours().property_histories().collect() == [ + [{'prop 3': [(1, 'hi'), (3, 'hello')], 'prop 1': [(1, 1), (2, 2)], + 'prop 4': [(1, True), (2, False), (3, True)], + 'prop 2': [(2, 0.6), (3, 0.9)]}]] assert g.at(2).vertex(1).property_histories() == {'prop 2': [(2, 0.6)], 'prop 4': [(1, True), (2, False)], 'prop 1': [(1, 1), (2, 2)], 'prop 3': [(1, 'hi')]} assert g.at(2).vertices.property_histories().collect() == [{'prop 2': [(2, 0.6)], 'prop 4': [(1, True), (2, False)], - 'prop 1': [(1, 1), (2, 2)], 'prop 3': [(1, 'hi')]}] - assert g.at(2).vertices.out_neighbours().property_histories().collect() == [[{'prop 2': [(2, 0.6)], 'prop 4': [(1, True), (2, False)], - 'prop 1': [(1, 1), (2, 2)], 'prop 3': [(1, 'hi')]}]] + 'prop 1': [(1, 1), (2, 2)], 'prop 3': [(1, 'hi')]}] + assert g.at(2).vertices.out_neighbours().property_histories().collect() == [ + [{'prop 2': [(2, 0.6)], 'prop 4': [(1, True), (2, False)], + 'prop 1': [(1, 1), (2, 2)], 'prop 3': [(1, 'hi')]}]] # testing property names expected_names = sorted(['prop 4', 'prop 1', 'prop 2', 'prop 3', 'static prop']) @@ -498,12 +503,13 @@ def test_edge_properties(): def test_exploded_edge_time(): g = graph_loader.lotr_graph() - e = g.edge("Frodo","Gandalf") + e = g.edge("Frodo", "Gandalf") his = e.history() exploded_his = [] for ee in e.explode(): exploded_his.append(ee.time()) - assert(his,exploded_his) + assert (his, exploded_his) + # assert g.vertex(1).property_history("prop 3") == [(1, 3), (3, 'hello')] @@ -533,12 +539,13 @@ def test_algorithms(): assert min_out_degree == 1 assert min_in_degree == 1 assert clustering_coefficient == 1.0 - + lotr_clustering_coefficient = algorithms.local_clustering_coefficient(lotr_graph, 'Frodo') lotr_local_triangle_count = algorithms.local_triangle_count(lotr_graph, 'Frodo') assert lotr_clustering_coefficient == 0.1984313726425171 assert lotr_local_triangle_count == 253 + def test_graph_time_api(): g = create_graph(1) @@ -772,7 +779,6 @@ def test_edge_earliest_latest_time(): g.add_edge(1, 1, 3, {}) g.add_edge(2, 1, 3, {}) - assert g.edge(1, 2).earliest_time() == 0 assert g.edge(1, 2).latest_time() == 2 @@ -795,14 +801,15 @@ def test_vertex_history(): g.add_vertex(6, "Lord Farquaad", {}) g.add_vertex(7, "Lord Farquaad", {}) g.add_vertex(8, "Lord Farquaad", {}) - - assert(g.vertex(1).history() == [1, 2, 3, 4, 8]) - assert(g.vertex("Lord Farquaad").history() == [4, 6, 7, 8]) + + assert (g.vertex(1).history() == [1, 2, 3, 4, 8]) + assert (g.vertex("Lord Farquaad").history() == [4, 6, 7, 8]) view = g.window(1, 8) - assert(view.vertex(1).history() == [1, 2 ,3 ,4]) - assert(view.vertex("Lord Farquaad").history() == [4, 6, 7]) + assert (view.vertex(1).history() == [1, 2, 3, 4]) + assert (view.vertex("Lord Farquaad").history() == [4, 6, 7]) + def test_edge_history(): g = Graph(1) @@ -814,16 +821,61 @@ def test_edge_history(): view = g.window(1, 5) - assert(g.edge(1,2).history() == [1,3]) + assert (g.edge(1, 2).history() == [1, 3]) # also needs to be fixed in Pedros PR # assert(view.edge(1, 4).history() == [4]) + def test_lotr_edge_history(): g = graph_loader.lotr_graph() - - assert(g.edge('Frodo','Gandalf').history() == [329, 555, 861, 1056, 1130, 1160, 1234, 1241, 1390, 1417, 1656, 1741, 1783, 1785, 1792, 1804, 1809, 1999, 2056, 2254, 2925, 2999, 3703, 3914, 4910, 5620, 5775, 6381, 6531, 6578, 6661, 6757, 7041, 7356, 8183, 8190, 8276, 8459, 8598, 8871, 9098, 9343, 9903, 11189, 11192, 11279, 11365, 14364, 21551, 21706, 23212, 26958, 27060, 29024, 30173, 30737, 30744, 31023, 31052, 31054, 31103, 31445, 32656]) - assert(g.at(1000).edge('Frodo','Gandalf').history() == [329, 555, 861]) - assert(g.edge('Frodo','Gandalf').at(1000).history() == [329, 555, 861]) - assert(g.window(100,1000).edge('Frodo','Gandalf').history() == [329, 555, 861]) - assert(g.edge('Frodo','Gandalf').window(100,1000).history() == [329, 555, 861]) \ No newline at end of file + + assert (g.edge('Frodo', 'Gandalf').history() == [329, 555, 861, 1056, 1130, 1160, 1234, 1241, 1390, 1417, 1656, + 1741, 1783, 1785, 1792, 1804, 1809, 1999, 2056, 2254, 2925, 2999, + 3703, 3914, 4910, 5620, 5775, 6381, 6531, 6578, 6661, 6757, 7041, + 7356, 8183, 8190, 8276, 8459, 8598, 8871, 9098, 9343, 9903, 11189, + 11192, 11279, 11365, 14364, 21551, 21706, 23212, 26958, 27060, + 29024, 30173, 30737, 30744, 31023, 31052, 31054, 31103, 31445, + 32656]) + assert (g.at(1000).edge('Frodo', 'Gandalf').history() == [329, 555, 861]) + assert (g.edge('Frodo', 'Gandalf').at(1000).history() == [329, 555, 861]) + assert (g.window(100, 1000).edge('Frodo', 'Gandalf').history() == [329, 555, 861]) + assert (g.edge('Frodo', 'Gandalf').window(100, 1000).history() == [329, 555, 861]) + + +def test_generic_taint(): + g = Graph(1) + g.add_edge(10, 1, 3, {}) + g.add_edge(11, 1, 2, {}) + g.add_edge(12, 1, 2, {}) + g.add_edge(9, 1, 2, {}) + g.add_edge(12, 2, 4, {}) + g.add_edge(13, 2, 5, {}) + g.add_edge(14, 5, 5, {}) + g.add_edge(14, 5, 4, {}) + g.add_edge(5, 4, 6, {}) + g.add_edge(15, 4, 7, {}) + g.add_edge(10, 4, 7, {}) + g.add_edge(10, 5, 8, {}) + + actual = algorithms.generic_taint(g, 20, 11, [1, 2], [4, 5]) + expected = { + '1': [(0, 11, '1')], + '2': [(2, 12, '1'), (2, 11, '1'), (0, 11, '2')], + '4': [(5, 12, '2')], + '5': [(6, 13, '2')], + } + + assert (actual == expected) + + +def test_generic_taint_loader(): + g = graph_loader.stable_coin_graph("/tmp/stablecoin", 1) + + actual = algorithms.generic_taint(g, 20, 1651105815, ["0xd30b438df65f4f788563b2b3611bd6059bff4ad9"], []) + expected = { + '0xd30b438df65f4f788563b2b3611bd6059bff4ad9': [(0, 1651105815, '0xd30b438df65f4f788563b2b3611bd6059bff4ad9')], + '0xda816e2122a8a39b0926bfa84edd3d42477e9efd': [(1, 1651105815, '0xd30b438df65f4f788563b2b3611bd6059bff4ad9')], + } + + assert (actual == expected) diff --git a/raphtory/src/algorithms/connected_components.rs b/raphtory/src/algorithms/connected_components.rs index 6123325992..38f0aa26d6 100644 --- a/raphtory/src/algorithms/connected_components.rs +++ b/raphtory/src/algorithms/connected_components.rs @@ -111,8 +111,8 @@ mod cc_test { let expected = // output from the eval running on the first shard vec![ - vec![7, 1, 3, 3], // shard 0 (2, 4, 6, 8) - vec![3, 7, 1, 2], // shard 1 (1, 3, 5, 7) + vec![(8, 7), (2, 1), (4, 3), (6, 3)], // shard 0 (2, 4, 6, 8) + vec![(5, 3), (7, 7), (1, 1), (3, 2)], // shard 1 (1, 3, 5, 7) ]; let actual_part1 = &gs.read_vec_partitions(&agg)[0]; @@ -127,8 +127,8 @@ mod cc_test { let expected = // output from the eval running on the first shard vec![ - vec![7, 1, 2, 3], // shard 0 (2, 4, 6, 8) - vec![2, 7, 1, 1], // shard 1 (1, 3, 5, 7) + vec![(8, 7), (2, 1), (4, 2), (6, 3)], // shard 0 (2, 4, 6, 8) + vec![(5, 2), (7, 7), (1, 1), (3, 1)], // shard 1 (1, 3, 5, 7) ]; let actual_part1 = &gs.read_vec_partitions(&agg)[0]; @@ -143,8 +143,8 @@ mod cc_test { let expected = // output from the eval running on the first shard vec![ - vec![7, 1, 1, 2], // shard 0 (2, 4, 6, 8) - vec![1, 7, 1, 1], // shard 1 (1, 3, 5, 7) + vec![(8, 7), (2, 1), (4, 1), (6, 2)], // shard 0 (2, 4, 6, 8) + vec![(5, 1), (7, 7), (1, 1), (3, 1)], // shard 1 (1, 3, 5, 7) ]; let actual_part1 = &gs.read_vec_partitions(&agg)[0]; @@ -159,8 +159,8 @@ mod cc_test { let expected = // output from the eval running on the first shard vec![ - vec![7, 1, 1, 1], // shard 0 (2, 4, 6, 8) - vec![1, 7, 1, 1], // shard 1 (1, 3, 5, 7) + vec![(8, 7), (2, 1), (4, 1), (6, 1)], // shard 0 (2, 4, 6, 8) + vec![(5, 1), (7, 7), (1, 1), (3, 1)], // shard 1 (1, 3, 5, 7) ]; let actual_part1 = &gs.read_vec_partitions(&agg)[0]; diff --git a/raphtory/src/algorithms/generic_taint.rs b/raphtory/src/algorithms/generic_taint.rs new file mode 100644 index 0000000000..bd294f05f0 --- /dev/null +++ b/raphtory/src/algorithms/generic_taint.rs @@ -0,0 +1,465 @@ +use crate::algorithms::*; +use crate::core::agg::set::Set; +use crate::core::agg::*; +use crate::core::state::def::*; +use crate::core::state::*; +use crate::db::program::*; +use crate::db::view_api::{GraphViewOps, VertexViewOps}; +use itertools::Itertools; +use rustc_hash::FxHashSet; +use std::collections::{HashMap, HashSet}; +use tokio::time::Instant; +use crate::core::utils::calculate_hash; +use crate::core::vertex::InputVertex; + +#[derive(Eq, Hash, PartialEq, Clone, Debug, Default)] +pub struct TaintMessage { + pub edge_id: usize, + pub event_time: i64, + pub src_vertex: String, +} + +impl Add for TaintMessage { + type Output = TaintMessage; + + fn add(self, rhs: Self) -> Self::Output { + rhs + } +} + +impl Zero for TaintMessage { + fn zero() -> Self { + TaintMessage { + edge_id: 0, + event_time: -1, + src_vertex: "".to_string(), + } + } + + fn set_zero(&mut self) { + *self = Zero::zero(); + } + + fn is_zero(&self) -> bool { + *self + == TaintMessage { + edge_id: 0, + event_time: -1, + src_vertex: "".to_string(), + } + } +} + +struct GenericTaintS0 { + start_time: i64, + infected_nodes: Vec, + taint_status: AccId>, + taint_history: + AccId, TaintMessage, FxHashSet, Set>, + recv_tainted_msgs: + AccId, TaintMessage, FxHashSet, Set>, +} + +impl GenericTaintS0 { + fn new(start_time: i64, infected_nodes: Vec) -> Self { + Self { + start_time, + infected_nodes, + taint_status: val(0), + taint_history: hash_set(1), + recv_tainted_msgs: hash_set(2), + } + } +} + +impl Program for GenericTaintS0 { + type Out = (); + + fn local_eval(&self, c: &LocalState) { + let taint_status = c.agg(self.taint_status); + let taint_history = c.agg(self.taint_history.clone()); + let recv_tainted_msgs = c.agg(self.recv_tainted_msgs.clone()); + + c.step(|evv| { + if self.infected_nodes.contains(&evv.global_id()) { + evv.update(&taint_status, Bool(true)); + evv.update( + &taint_history, + TaintMessage { + edge_id: 0, + event_time: self.start_time, + src_vertex: evv.name(), + }, + ); + evv.out_edges(self.start_time).for_each(|eev| { + let dst = eev.dst(); + eev.history().into_iter().for_each(|t| { + dst.update( + &recv_tainted_msgs, + TaintMessage { + edge_id: eev.id(), + event_time: t, + src_vertex: evv.name(), + }, + ) + }); + }); + } + }); + } + + fn post_eval(&self, c: &mut GlobalEvalState) { + let _ = c.agg(self.taint_history.clone()); + let _ = c.agg(self.recv_tainted_msgs.clone()); + c.step(|_| true) + } + + fn produce_output(&self, g: &G, gs: &GlobalEvalState) -> Self::Out + where + Self: Sync, + { + } +} + +struct GenericTaintS1 { + stop_nodes: Vec, + taint_status: AccId>, + taint_history: + AccId, TaintMessage, FxHashSet, Set>, + recv_tainted_msgs: + AccId, TaintMessage, FxHashSet, Set>, +} + +impl GenericTaintS1 { + fn new(stop_nodes: Vec) -> Self { + Self { + stop_nodes, + taint_status: val(0), + taint_history: hash_set(1), + recv_tainted_msgs: hash_set(2), + } + } +} + +impl Program for GenericTaintS1 { + type Out = (); + + fn local_eval(&self, c: &LocalState) { + let taint_status = c.agg(self.taint_status); + let taint_history = c.agg(self.taint_history.clone()); + let recv_tainted_msgs = c.agg(self.recv_tainted_msgs.clone()); + + // Check if vertices have received tainted message + // If yes, if the vertex was not already tainted, update the accumulators else, update tainted history + // Spread the taint messages if no stop_nodes provided + + c.step(|evv| { + let msgs = evv.read(&recv_tainted_msgs); + if !msgs.is_empty() { + if !evv.read(&taint_status).0 { + evv.update(&taint_status, Bool(true)); + } + msgs.iter().for_each(|msg| { + evv.update(&taint_history, msg.clone()); + }); + + let earliest_taint_time = msgs.into_iter().map(|msg| msg.event_time).min().unwrap(); + + if self.stop_nodes.is_empty() || !self.stop_nodes.contains(&evv.global_id()) { + evv.out_edges(earliest_taint_time).for_each(|eev| { + let dst = eev.dst(); + eev.history().into_iter().for_each(|t| { + dst.update( + &recv_tainted_msgs, + TaintMessage { + edge_id: eev.id(), + event_time: t, + src_vertex: evv.name(), + }, + ) + }); + }); + } + } + }); + } + + fn post_eval(&self, c: &mut GlobalEvalState) { + let _ = c.agg(self.taint_history.clone()); + let _ = c.agg(self.recv_tainted_msgs.clone()); + c.step(|_| true) + } + + fn produce_output(&self, g: &G, gs: &GlobalEvalState) -> Self::Out + where + Self: Sync, + { + } +} + +pub fn generic_taint( + g: &G, + iter_count: usize, + start_time: i64, + infected_nodes: Vec, + stop_nodes: Vec, +) -> HashMap> { + let mut c = GlobalEvalState::new(g.clone(), true); + let gtaint_s0 = GenericTaintS0::new(start_time, infected_nodes.into_iter().map(|n| n.id()).collect_vec()); + let gtaint_s1 = GenericTaintS1::new(stop_nodes.into_iter().map(|n| n.id()).collect_vec()); + + gtaint_s0.run_step(g, &mut c); + + // println!( + // "step0, taint_status = {:?}", + // c.read_vec_partitions(&val::(0)) + // ); + // println!( + // "step0, taint_history = {:?}", + // c.read_vec_partitions(&hash_set::(1)) + // ); + // println!( + // "step0, recv_tainted_msgs = {:?}", + // c.read_vec_partitions(&hash_set::(2)) + // ); + // println!(); + + let mut last_taint_list = HashSet::::new(); + let mut i = 0; + loop { + gtaint_s1.run_step(g, &mut c); + + let r = c.read_vec_partitions(&val::(0)); + let taint_list: HashSet<_> = r + .into_iter() + .flat_map(|v| v.into_iter().flat_map(|c| c.into_iter().map(|(a, b)| a))) + .collect(); + + // println!( + // "step{}, taint_status = {:?}", + // i + 1, + // c.read_vec_partitions(&val::(0)) + // ); + // println!("step{}, taint_list = {:?}", i + 1, taint_list); + // println!( + // "step{}, taint_history = {:?}", + // i + 1, + // c.read_vec_partitions(&hash_set::(1)) + // ); + // println!( + // "step{}, recv_tainted_msgs = {:?}", + // i + 1, + // c.read_vec_partitions(&hash_set::(2)) + // ); + + let difference: Vec<_> = taint_list + .iter() + .filter(|item| !last_taint_list.contains(*item)) + .collect(); + let converged = difference.is_empty(); + + // println!("taint_list diff = {:?}", difference); + // println!(); + + if converged || i > iter_count { + break; + } + + last_taint_list = taint_list; + + if c.keep_past_state { + c.ss += 1; + } + + i += 1; + } + + println!("Completed {} steps", i); + + let mut results: HashMap> = HashMap::default(); + + (0..g.num_shards()) + .into_iter() + .fold(&mut results, |res, part_id| { + c.fold_state( + &hash_set::(1), + part_id, + res, + |res, v_id, sc| { + res.insert( + g.vertex(*v_id).unwrap().name(), + // *v_id, + sc.into_iter() + .map(|msg| (msg.edge_id, msg.event_time, msg.src_vertex)) + .collect_vec(), + ); + res + }, + ) + }); + + results +} + +#[cfg(test)] +mod generic_taint_tests { + use crate::db::graph::Graph; + use super::*; + + fn load_graph(n_shards: usize, edges: Vec<(i64, u64, u64)>) -> Graph { + let graph = Graph::new(n_shards); + + for (t, src, dst) in edges { + graph.add_edge(t, src, dst, &vec![], None).unwrap(); + } + graph + } + + fn test_generic_taint( + graph: Graph, + iter_count: usize, + start_time: i64, + infected_nodes: Vec, + stop_nodes: Vec, + ) -> HashMap> { + let results: HashMap> = + generic_taint(&graph, iter_count, start_time, infected_nodes, stop_nodes) + .into_iter() + .collect(); + results + } + + #[test] + fn test_generic_taint_1() { + let graph = load_graph( + 1, + vec![ + (10, 1, 3), + (11, 1, 2), + (12, 2, 4), + (13, 2, 5), + (14, 5, 5), + (14, 5, 4), + (5, 4, 6), + (15, 4, 7), + (10, 4, 7), + (10, 5, 8), + ], + ); + + let results = test_generic_taint(graph, 20, 11, vec![2], vec![]); + + assert_eq!( + results, + HashMap::from([ + ("5".to_string(), vec![(4, 13, "2".to_string()), (5, 14, "5".to_string())]), + ("2".to_string(), vec![(0, 11, "2".to_string())]), + ("7".to_string(), vec![(8, 15, "4".to_string())]), + ("4".to_string(), vec![(3, 12, "2".to_string()), (6, 14, "5".to_string())]) + ]) + ); + } + + #[test] + fn test_generic_taint_1_multiple_start() { + let graph = load_graph( + 1, + vec![ + (10, 1, 3), + (11, 1, 2), + (12, 2, 4), + (13, 2, 5), + (14, 5, 5), + (14, 5, 4), + (5, 4, 6), + (15, 4, 7), + (10, 4, 7), + (10, 5, 8), + ], + ); + + let results = test_generic_taint(graph, 20, 11, vec![1, 2], vec![]); + + assert_eq!( + results, + HashMap::from([ + ("4".to_string(), vec![(3, 12, "2".to_string()), (6, 14, "5".to_string())]), + ("1".to_string(), vec![(0, 11, "1".to_string())]), + ("5".to_string(), vec![(4, 13, "2".to_string()), (5, 14, "5".to_string())]), + ("7".to_string(), vec![(8, 15, "4".to_string())]), + ("2".to_string(), vec![(2, 11, "1".to_string()), (0, 11, "2".to_string())]), + ]) + ); + } + + #[test] + fn test_generic_taint_1_stop_nodes() { + let graph = load_graph( + 1, + vec![ + (10, 1, 3), + (11, 1, 2), + (12, 2, 4), + (13, 2, 5), + (14, 5, 5), + (14, 5, 4), + (5, 4, 6), + (15, 4, 7), + (10, 4, 7), + (10, 5, 8), + ], + ); + + let results = test_generic_taint(graph, 20, 11, vec![1, 2], vec![4, 5]); + + assert_eq!( + results, + HashMap::from([ + ("5".to_string(), vec![(4, 13, "2".to_string())]), + ("2".to_string(), vec![(2, 11, "1".to_string()), (0, 11, "2".to_string())]), + ("1".to_string(), vec![(0, 11, "1".to_string())]), + ("4".to_string(), vec![(3, 12, "2".to_string())]), + ]) + ); + } + + #[test] + fn test_generic_taint_1_multiple_history_points() { + let graph = load_graph( + 1, + vec![ + (10, 1, 3), + (11, 1, 2), + (12, 1, 2), + (9, 1, 2), + (12, 2, 4), + (13, 2, 5), + (14, 5, 5), + (14, 5, 4), + (5, 4, 6), + (15, 4, 7), + (10, 4, 7), + (10, 5, 8), + ], + ); + + let results = test_generic_taint(graph, 20, 11, vec![1, 2], vec![4, 5]); + + assert_eq!( + results, + HashMap::from([ + ("1".to_string(), vec![(0, 11, "1".to_string())]), + ("5".to_string(), vec![(6, 13, "2".to_string())]), + ( + "2".to_string(), + vec![ + (2, 12, "1".to_string()), + (2, 11, "1".to_string()), + (0, 11, "2".to_string()) + ] + ), + ("4".to_string(), vec![(5, 12, "2".to_string())]), + ]) + ); + } +} diff --git a/raphtory/src/algorithms/mod.rs b/raphtory/src/algorithms/mod.rs index 5d4db95947..150c61fe18 100644 --- a/raphtory/src/algorithms/mod.rs +++ b/raphtory/src/algorithms/mod.rs @@ -37,6 +37,7 @@ pub mod pagerank; pub mod reciprocity; pub mod triangle_count; pub mod triplet_count; +pub mod generic_taint; use num_traits::{abs, Bounded, Zero}; use std::ops::{Add, AddAssign, Div, Mul, Range, Sub}; @@ -172,3 +173,28 @@ impl Bounded for SumF32 { SumF32(f32::MAX) } } + +#[derive(PartialEq, PartialOrd, Copy, Clone, Debug)] +struct Bool(bool); + +impl Zero for Bool { + fn zero() -> Self { + Bool(false) + } + + fn set_zero(&mut self) { + *self = Zero::zero(); + } + + fn is_zero(&self) -> bool { + *self == Bool(false) + } +} + +impl Add for Bool { + type Output = Bool; + + fn add(self, rhs: Self) -> Self::Output { + rhs + } +} diff --git a/raphtory/src/algorithms/pagerank.rs b/raphtory/src/algorithms/pagerank.rs index 54f742dd20..701dc05924 100644 --- a/raphtory/src/algorithms/pagerank.rs +++ b/raphtory/src/algorithms/pagerank.rs @@ -147,8 +147,8 @@ impl Program for UnweightedPageRankS2 { } #[allow(unused_variables)] -pub fn unweighted_page_rank( - g: &Graph, +pub fn unweighted_page_rank( + g: &G, window: Range, iter_count: usize, ) -> FxHashMap { @@ -164,12 +164,12 @@ pub fn unweighted_page_rank( loop { pg_s1.run_step(g, &mut c); - println!("vec parts: {:?}", c.read_vec_partitions(&val::(0))); + // println!("vec parts: {:?}", c.read_vec_partitions(&val::(0))); pg_s2.run_step(g, &mut c); let r = c.read_global_state(&max::(2)).unwrap(); - println!("max_diff = {:?}", r); + // println!("max_diff = {:?}", r); if r <= max_diff || i > iter_count { break; @@ -181,9 +181,11 @@ pub fn unweighted_page_rank( i += 1; } + println!("Completed {} steps", i); + let mut results: FxHashMap = FxHashMap::default(); - (0..g.nr_shards) + (0..g.num_shards()) .into_iter() .fold(&mut results, |res, part_id| { c.fold_state(&val::(0), part_id, res, |res, v_id, sc| { @@ -305,7 +307,7 @@ mod page_rank_tests { acc_id: AccId, c_g1: &GlobalEvalState, c_g2: &GlobalEvalState, - ) -> (Vec, Vec>>) { + ) -> (Vec<(u64, OUT)>, Vec>>) { let actual_g1 = c_g1.read_vec_partitions(&acc_id); assert!(actual_g1.len() == 1); let actual_g1_part0 = &actual_g1[0][0]; diff --git a/raphtory/src/core/state.rs b/raphtory/src/core/state.rs index 210a71fc7b..f875a7cc87 100644 --- a/raphtory/src/core/state.rs +++ b/raphtory/src/core/state.rs @@ -278,7 +278,8 @@ pub trait ComputeState: Debug + Clone { ) where A: StateType; - fn finalize>(&self, ss: usize) -> Vec + fn finalize>(&self, ss: usize) -> Vec<(u64, OUT)> + // fn finalize>(&self, ss: usize) -> Vec<(u64, OUT)> where OUT: StateType, A: 'static; @@ -426,8 +427,8 @@ impl ComputeState for ComputeStateMap { }); } - fn finalize>(&self, ss: usize) -> Vec - // fn finalize>(&self, ss: usize) -> Vec<(u64, OUT)> + // fn finalize>(&self, ss: usize) -> Vec + fn finalize>(&self, ss: usize) -> Vec<(u64, OUT)> where OUT: StateType, A: 'static, @@ -438,15 +439,15 @@ impl ComputeState for ComputeStateMap { .downcast_ref::>() .unwrap(); - // current.map.iter().map(|(c,v)| { - // (*c, ACC::finish(&v[ss % 2])) - // }).collect::>() + current.map.iter().map(|(c,v)| { + (*c, ACC::finish(&v[ss % 2])) + }).collect::>() - current - .map - .values() - .map(|v| ACC::finish(&v[ss % 2])) - .collect() + // current + // .map + // .values() + // .map(|v| ACC::finish(&v[ss % 2])) + // .collect() } fn fold, F, B>(&self, ss: usize, b: B, f: F) -> B @@ -514,7 +515,8 @@ impl ShardComputeState { &self, ss: usize, agg_ref: &AccId, - ) -> Option> + ) -> Option> + // ) -> Option> where OUT: StateType, A: 'static, @@ -623,7 +625,8 @@ impl ShardComputeState { &mut self, ss: usize, agg_ref: &AccId, - ) -> Option> + ) -> Option> + // ) -> Option> where OUT: StateType, A: 'static, @@ -813,7 +816,8 @@ impl ShuffleComputeState { &self, ss: usize, agg_def: &AccId, - ) -> Vec> + ) -> Vec> + // ) -> Vec> where OUT: StateType, A: 'static, @@ -831,7 +835,8 @@ impl ShuffleComputeState { &mut self, ss: usize, agg_def: &AccId, - ) -> Vec>> + ) -> Vec>> + // ) -> Vec>> where OUT: StateType, A: 'static, @@ -851,6 +856,7 @@ mod state_test { use super::*; use rand::Rng; + use rand_distr::weighted_alias::AliasableWeight; #[test] fn min_aggregates_for_3_keys() { @@ -875,7 +881,7 @@ mod state_test { } let actual = state_map.finalize(0, &min); - assert_eq!(actual, Some(vec![actual_min, actual_min, actual_min])); + assert_eq!(actual, Some(vec![(0, actual_min), (1,actual_min), (2,actual_min)])); } #[test] @@ -902,7 +908,7 @@ mod state_test { let actual_avg = sum / 100; let actual = state_map.finalize(0, &avg); - assert_eq!(actual, Some(vec![actual_avg, actual_avg, actual_avg])); + assert_eq!(actual, Some(vec![(0,actual_avg), (1,actual_avg), (2,actual_avg)])); } #[test] @@ -921,7 +927,7 @@ mod state_test { let actual = state_map.finalize(0, &avg); assert_eq!( actual, - Some(vec![expected.clone(), expected.clone(), expected.clone()]) + Some(vec![(0,expected.clone()), (1, expected.clone()), (2,expected.clone())]) ); } @@ -949,7 +955,7 @@ mod state_test { let actual = state.finalize(0, &sum); - assert_eq!(actual, Some(vec![actual_sum, actual_sum, actual_sum])); + assert_eq!(actual, Some(vec![(0, actual_sum), (1,actual_sum), (2,actual_sum)])); } #[test] @@ -997,12 +1003,12 @@ mod state_test { assert_eq!( actual, - vec![Some(vec![actual_sum_1]), Some(vec![actual_sum_1])] + vec![Some(vec![(2,actual_sum_1)]), Some(vec![(1,actual_sum_1)])] ); let actual = part2_state.finalize(0, &sum); - assert_eq!(actual, vec![None, Some(vec![actual_sum_2, actual_sum_2])]); + assert_eq!(actual, vec![None, Some(vec![(1,actual_sum_2), (3,actual_sum_2)])]); ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, &sum, 0); let actual = part1_state.finalize(0, &sum); @@ -1010,8 +1016,8 @@ mod state_test { assert_eq!( actual, vec![ - Some(vec![(actual_sum_1)]), - Some(vec![(actual_sum_1 + actual_sum_2), (actual_sum_2)]), + Some(vec![(2, actual_sum_1)]), + Some(vec![(1,(actual_sum_1 + actual_sum_2)), (3, actual_sum_2)]), ] ); } @@ -1066,20 +1072,20 @@ mod state_test { let actual = part1_state.finalize(0, &sum); assert_eq!( actual, - vec![Some(vec![actual_sum_1]), Some(vec![actual_sum_1])] + vec![Some(vec![(2,actual_sum_1)]), Some(vec![(1, actual_sum_1)])] ); let actual = part1_state.finalize(0, &min); assert_eq!( actual, - vec![Some(vec![actual_min_1]), Some(vec![actual_min_1])] + vec![Some(vec![(2, actual_min_1)]), Some(vec![(1,actual_min_1)])] ); let actual = part2_state.finalize(0, &sum); - assert_eq!(actual, vec![None, Some(vec![actual_sum_2, actual_sum_2])]); + assert_eq!(actual, vec![None, Some(vec![(1, actual_sum_2), (3,actual_sum_2)])]); let actual = part2_state.finalize(0, &min); - assert_eq!(actual, vec![None, Some(vec![actual_min_2, actual_min_2])]); + assert_eq!(actual, vec![None, Some(vec![(1,actual_min_2), (3,actual_min_2)])]); ShuffleComputeState::merge_mut(&mut part1_state, &part2_state, &sum, 0); let actual = part1_state.finalize(0, &sum); @@ -1087,8 +1093,8 @@ mod state_test { assert_eq!( actual, vec![ - Some(vec![(actual_sum_1)]), - Some(vec![(actual_sum_1 + actual_sum_2), (actual_sum_2)]), + Some(vec![((2,actual_sum_1))]), + Some(vec![(1,(actual_sum_1 + actual_sum_2)), (3,actual_sum_2)]), ] ); @@ -1097,8 +1103,8 @@ mod state_test { assert_eq!( actual, vec![ - Some(vec![(actual_min_1)]), - Some(vec![(actual_min_1.min(actual_min_2)), (actual_min_2)]), + Some(vec![(2,actual_min_1)]), + Some(vec![(1, actual_min_1.min(actual_min_2)), (3, actual_min_2)]), ] ); } diff --git a/raphtory/src/db/path.rs b/raphtory/src/db/path.rs index bd65646151..b2d667f722 100644 --- a/raphtory/src/db/path.rs +++ b/raphtory/src/db/path.rs @@ -277,6 +277,7 @@ impl PathFromVertex { } } } + impl VertexViewOps for PathFromVertex { type Graph = G; type ValueType = BoxedIter; diff --git a/raphtory/src/db/program.rs b/raphtory/src/db/program.rs index 3e18e652ab..b1fd0e0989 100644 --- a/raphtory/src/db/program.rs +++ b/raphtory/src/db/program.rs @@ -13,11 +13,14 @@ use crate::core::{ state::{AccId, ShuffleComputeState}, state::{ComputeStateMap, StateType}, }; +use crate::db::edge::EdgeView; use crate::db::vertex::VertexView; -use crate::db::view_api::{GraphViewOps, VertexViewOps}; +use crate::db::view_api::{GraphViewOps, TimeOps, VertexViewOps}; use itertools::Itertools; +use rand_distr::weighted_alias::AliasableWeight; use rayon::prelude::*; use rustc_hash::FxHashSet; +use crate::db::graph_window::WindowedGraph; type CS = ComputeStateMap; @@ -227,7 +230,8 @@ impl GlobalEvalState { pub fn read_vec_partitions>( &self, agg: &AccId, - ) -> Vec>> + ) -> Vec>> + // ) -> Vec>> where OUT: StateType, A: 'static, @@ -791,6 +795,10 @@ impl EvalVertexView { Self { ss, vv, state } } + pub fn name(&self) -> String { + self.vv.name() + } + /// Obtain the global id of the vertex. pub fn global_id(&self) -> u64 { self.vv.id() @@ -832,6 +840,56 @@ impl EvalVertexView { .iter() .map(move |vv| EvalVertexView::new(self.ss, vv, self.state.clone())) } + + pub fn out_edges(&self, after: i64) -> impl Iterator>> + '_ { + self.vv + .window(after, i64::MAX) + .out_edges() + .map(move |ev| { + // let et = ev.history().into_iter().filter(|t| t >= &after).min().unwrap(); + // ev.dst(); + EvalEdgeView::new(self.ss, ev, self.state.clone()) + }) + } +} + +/// `EvalEdgeView` represents a view of a edge in a computation graph. +/// +/// The view contains the evaluation step, the `WindowedEdge` representing the edge. +pub struct EvalEdgeView { + ss: usize, + ev: EdgeView, + state: Rc>>, +} + +/// `EvalEdgeView` represents a view of a edge in a computation graph. +impl EvalEdgeView { + /// Create a new `EvalEdgeView` from the given super-step counter, `WindowedEdge` + /// + /// # Arguments + /// + /// * `ss` - super-step counter + /// * `ev` - The `WindowedEdge` representing the edge. + /// + /// # Returns + /// + /// A new `EvalVertexView`. + pub fn new(ss: usize, ev: EdgeView, state: Rc>>) -> Self { + Self { ss, ev, state } + } + + pub fn id(&self) -> usize { + self.ev.id() + } + + pub fn dst(&self) -> EvalVertexView { + self.ev.dst(); + EvalVertexView::new(self.ss, self.ev.dst(), self.state.clone()) + } + + pub fn history(&self) -> Vec { + self.ev.history() + } } /// Represents a program that can be executed on a graph. We use this to run algorithms on graphs. diff --git a/raphtory/src/graph_loader/example/mod.rs b/raphtory/src/graph_loader/example/mod.rs index 552390430c..0bd934134b 100644 --- a/raphtory/src/graph_loader/example/mod.rs +++ b/raphtory/src/graph_loader/example/mod.rs @@ -2,3 +2,4 @@ pub mod lotr_graph; pub mod neo4j_examples; pub mod reddit_hyperlinks; pub mod sx_superuser_graph; +pub mod stable_coins; diff --git a/raphtory/src/graph_loader/example/stable_coins.rs b/raphtory/src/graph_loader/example/stable_coins.rs new file mode 100644 index 0000000000..cbdbe4ff35 --- /dev/null +++ b/raphtory/src/graph_loader/example/stable_coins.rs @@ -0,0 +1,107 @@ +use crate::core::utils; +use crate::db::graph::Graph; +use crate::db::view_api::internal::GraphViewInternalOps; +use crate::db::view_api::GraphViewOps; +use crate::graph_loader::source::csv_loader::CsvLoader; +use serde::Deserialize; +use std::fs::File; +use std::io::{copy, Cursor}; +use std::path::PathBuf; +use std::time::Duration; +use std::{env, fs, io, path::Path, time::Instant}; + +#[derive(Deserialize, std::fmt::Debug)] +pub struct StableCoin { + block_number: String, + transaction_index: u32, + from_address: String, + to_address: String, + time_stamp: i64, + contract_address: String, + value: f64, +} + +fn fetch_file(file_path: PathBuf, timeout: u64) -> Result<(), Box> { + let client = reqwest::blocking::Client::builder() + .timeout(Duration::from_secs(timeout)) + .build()?; + let response = client + .get("https://raw.githubusercontent.com/Raphtory/Data/main/token_transfers.csv") + .send()?; + let mut content = Cursor::new(response.bytes()?); + let mut file = File::create(&file_path)?; + copy(&mut content, &mut file)?; + + Ok(()) +} + +pub fn stable_coin_graph(path: Option, num_shards: usize) -> Graph { + let default_data_dir: PathBuf = PathBuf::from("/tmp/stablecoin"); + + let data_dir = match path { + Some(path) => PathBuf::from(path), + None => { + default_data_dir + }, + }; + + let dir_str = data_dir.to_str().unwrap(); + fs::create_dir_all(dir_str).expect(&format!("Failed to create directory {}", dir_str)); + + if !data_dir.join("token_transfers.csv").exists() { + fetch_file(data_dir.join("token_transfers.csv"), 10).expect("Failed to fetch stable coin data: https://raw.githubusercontent.com/Raphtory/Data/main/token_transfers.csv"); + } + + let encoded_data_dir = data_dir.join("graphdb.bincode"); + + let g = if encoded_data_dir.exists() { + let now = Instant::now(); + let g = Graph::load_from_file(encoded_data_dir.as_path()) + .expect("Failed to load graph from encoded data files"); + + println!( + "Loaded graph with {} shards from encoded data files {} with {} vertices, {} edges which took {} seconds", + g.num_shards(), + encoded_data_dir.to_str().unwrap(), + g.num_vertices(), + g.num_edges(), + now.elapsed().as_secs() + ); + + g + } else { + let g = Graph::new(num_shards); + let now = Instant::now(); + + CsvLoader::new(data_dir) + .set_header(true) + .set_delimiter(",") + .load_into_graph(&g, |stablecoin: StableCoin, g: &Graph| { + g.add_edge( + stablecoin.time_stamp, + stablecoin.from_address, + stablecoin.to_address, + &vec![], + Some(&stablecoin.contract_address), + ) + .expect("Failed to add edge"); + }) + .expect("Failed to load graph from CSV data files"); + + println!( + "Loaded graph with {} shards from CSV data files {} with {} vertices, {} edges which took {} seconds", + g.num_shards(), + encoded_data_dir.to_str().unwrap(), + g.num_vertices(), + g.num_edges(), + now.elapsed().as_secs() + ); + + g.save_to_file(encoded_data_dir) + .expect("Failed to save graph"); + + g + }; + + g +} diff --git a/raphtory/src/graph_loader/source/csv_loader.rs b/raphtory/src/graph_loader/source/csv_loader.rs index b3ba1f1ad6..f3c7f6d059 100644 --- a/raphtory/src/graph_loader/source/csv_loader.rs +++ b/raphtory/src/graph_loader/source/csv_loader.rs @@ -350,7 +350,7 @@ impl CsvLoader { //TODO this needs better error handling for files without perfect data for rec in records_iter { - let record = rec?; + let record = rec.expect("Failed to deserialize"); loader(record, g) }