Skip to content

Commit

Permalink
Implement an actor responsible for filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Apr 12, 2021
1 parent ae56a49 commit 0ab7742
Show file tree
Hide file tree
Showing 19 changed files with 530 additions and 58 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/bin/event_listener/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ publish = false # We don't want to publish our binaries.
[dependencies]
anyhow = "1.0"
tokio = { version = "0.2", features = ["full"] }
futures = "0.3"

zksync_storage = { path = "../../lib/storage", version = "1.0" }
zksync_config = { path = "../../lib/config", version = "1.0" }
vlog = { path = "../../lib/vlog", version = "1.0" }
37 changes: 37 additions & 0 deletions core/bin/event_listener/src/handler/filters/account.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use std::collections::HashSet;
use zksync_storage::event::types::{account::*, ZkSyncEvent, EventData};

#[derive(Debug, Clone)]
pub struct AccountFilter {
pub account_ids: Option<HashSet<i64>>,
pub token_ids: Option<HashSet<i32>>,
pub status: Option<AccountStateChangeStatus>,
}

impl AccountFilter {
pub fn matches(&self, event: &ZkSyncEvent) -> bool {
let account_event = match &event.data {
EventData::Account(account_event) => account_event,
_ => return false,
};
if let Some(status) = &self.status {
if account_event.status != *status {
return false;
}
}
if let Some(token_ids) = &self.token_ids {
if let Some(token_id) = account_event.account_update_details.token_id {
if !token_ids.contains(&token_id) {
return false;
}
}
}
if let Some(account_ids) = &self.account_ids {
let account_id = account_event.account_update_details.account_id;
if !account_ids.contains(&account_id) {
return false;
}
}
return true;
}
}
21 changes: 21 additions & 0 deletions core/bin/event_listener/src/handler/filters/block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use zksync_storage::event::types::{block::*, ZkSyncEvent, EventData};

#[derive(Debug, Clone)]
pub struct BlockFilter {
pub block_status: Option<BlockStatus>,
}

impl BlockFilter {
pub fn matches(&self, event: &ZkSyncEvent) -> bool {
let block_event = match &event.data {
EventData::Block(block_event) => block_event,
_ => return false,
};
if let Some(block_status) = &self.block_status {
if block_event.status != *block_status {
return false;
}
}
return true;
}
}
29 changes: 29 additions & 0 deletions core/bin/event_listener/src/handler/filters/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use zksync_storage::event::types::ZkSyncEvent;

mod account;
mod block;
mod transaction;

pub use account::AccountFilter;
pub use block::BlockFilter;
pub use transaction::TransactionFilter;

#[derive(Debug, Clone)]
pub enum EventFilter {
Account(AccountFilter),
Block(BlockFilter),
_Transaction(TransactionFilter),
}

impl EventFilter {
// Should parse json: will be implemented with the transport component. (TODO)
// pub fn new(...) -> Self {}

pub fn matches(&self, event: &ZkSyncEvent) -> bool {
match self {
EventFilter::Account(account_filter) => account_filter.matches(event),
EventFilter::Block(block_filter) => block_filter.matches(event),
EventFilter::_Transaction(tx_filter) => tx_filter.matches(event),
}
}
}
43 changes: 43 additions & 0 deletions core/bin/event_listener/src/handler/filters/transaction.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::collections::HashSet;
use zksync_storage::event::types::{transaction::*, ZkSyncEvent, EventData};

#[derive(Debug, Clone)]
pub struct TransactionFilter {
pub tx_types: Option<HashSet<TransactionType>>,
pub account_ids: Option<HashSet<i64>>,
pub token_ids: Option<HashSet<i32>>,
pub status: Option<TransactionStatus>,
}

impl TransactionFilter {
pub fn matches(&self, event: &ZkSyncEvent) -> bool {
let tx_event = match &event.data {
EventData::Transaction(tx_event) => tx_event,
_ => return false,
};
if let Some(status) = &self.status {
if tx_event.status != *status {
return false;
}
}
if let Some(tx_types) = &self.tx_types {
let tx_type = tx_event.tx_type();
if !tx_types.contains(&tx_type) {
return false;
}
}
if let Some(token_ids) = &self.token_ids {
let token_id = tx_event.token_id;
if !token_ids.contains(&token_id) {
return false;
}
}
if let Some(account_ids) = &self.account_ids {
let account_id = tx_event.account_id;
if !account_ids.contains(&account_id) {
return false;
}
}
return true;
}
}
79 changes: 79 additions & 0 deletions core/bin/event_listener/src/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use futures::channel::mpsc;
use futures::StreamExt;
use std::collections::{HashMap, HashSet};
use subscribers::Subscriber;
use zksync_storage::event::{records::EventType, types::ZkSyncEvent};

use filters::*;
use zksync_storage::event::types::block::*;

mod filters;
mod subscribers;

pub struct EventHandler {
rx_for_events: mpsc::Receiver<Vec<ZkSyncEvent>>,
subs: HashSet<Subscriber>,
// TODO: sender/reciever to communicate with transport component.
}

impl EventHandler {
pub fn new(receiver: mpsc::Receiver<Vec<ZkSyncEvent>>) -> Self {
Self {
rx_for_events: receiver,
subs: HashSet::new(),
}
}

pub async fn run(&mut self) -> anyhow::Result<()> {
/* Testing filters */
let block_filter = EventFilter::Block(BlockFilter {
block_status: Some(BlockStatus::Finalized),
});
let account_filter = EventFilter::Account(AccountFilter {
account_ids: Some([1, 10].iter().cloned().collect::<HashSet<i64>>()),
token_ids: None,
status: None,
});

self.subs.insert(Subscriber {
id: 0,
filters: {
let mut filters = HashMap::new();
filters.insert(EventType::Block, block_filter);
filters
},
});

self.subs.insert(Subscriber {
id: 1,
filters: {
let mut filters = HashMap::new();
filters.insert(EventType::Account, account_filter);
filters
},
});
/* Testing filters */

while let Some(events) = self.rx_for_events.next().await {
for event in &events {
for sub in self.subs.iter() {
if sub.matches(event) {
eprintln!("Sub id: {}\nEvent: {:?}", sub.id, event);
}
}
}
}

Ok(())
}
}

#[must_use]
pub fn run_event_handler(
receiver: mpsc::Receiver<Vec<ZkSyncEvent>>,
) -> tokio::task::JoinHandle<()> {
let mut handler = EventHandler::new(receiver);
tokio::spawn(async move {
handler.run().await.unwrap();
})
}
34 changes: 34 additions & 0 deletions core/bin/event_listener/src/handler/subscribers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use zksync_storage::event::types::{EventType, ZkSyncEvent};
use super::filters::EventFilter;

#[derive(Debug, Clone)]
pub struct Subscriber {
pub id: i64,
pub filters: HashMap<EventType, EventFilter>,
}

impl Subscriber {
pub fn matches(&self, event: &ZkSyncEvent) -> bool {
let event_type = event.get_type();
match self.filters.get(&event_type) {
Some(filter) => filter.matches(event),
None => self.filters.is_empty(),
}
}
}

impl PartialEq for Subscriber {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}

impl Eq for Subscriber {}

impl Hash for Subscriber {
fn hash<H: Hasher>(&self, state: &mut H) {
self.id.hash(state);
}
}
39 changes: 2 additions & 37 deletions core/bin/event_listener/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,2 @@
// use zksync_config::ZkSyncConfig;
use zksync_storage::{listener::StorageListener, StorageProcessor};

pub struct EventListener<'a> {
_storage: StorageProcessor<'a>,
listener: StorageListener,
channel: String,
}

impl<'a> EventListener<'a> {
// pub async fn new<'b>(config: ZkSyncConfig) -> anyhow::Result<EventListener<'b>> {
pub async fn new<'b>() -> anyhow::Result<EventListener<'b>> {
let _storage = StorageProcessor::establish_connection().await?;
let listener = StorageListener::connect().await?;
// let channel = config.db.listen_channel_name;
let channel = "event_channel".into();
Ok(EventListener {
_storage,
listener,
channel,
})
}

pub async fn run(&mut self) -> anyhow::Result<()> {
self.listener.listen(&self.channel).await?;

loop {
while let Some(notification) = self.listener.try_recv().await? {
println!(
"Received notification, payload:\n{}",
notification.payload()
);
}
// Connection aborted, handle it here.
}
}
}
pub mod listener;
pub mod handler;
Loading

0 comments on commit 0ab7742

Please sign in to comment.