Skip to content

Commit

Permalink
Merge pull request #739 from matter-labs/dvush/dont-add-tokens-from-e…
Browse files Browse the repository at this point in the history
…vents

Remove new token events from eth_watch
  • Loading branch information
dvush authored Jun 17, 2020
2 parents 1ae1aa0 + 76a4b67 commit 208641d
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 134 deletions.
12 changes: 0 additions & 12 deletions core/server/src/bin/eth_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use futures::{channel::mpsc, SinkExt};
use log::*;
use server::eth_watch::{EthWatch, EthWatchRequest};
use std::time::Duration;
use storage::ConnectionPool;
use tokio::{runtime::Runtime, time};

fn main() {
Expand All @@ -11,14 +10,6 @@ fn main() {
env_logger::init();
info!("ETH watcher started");
let web3_url = std::env::var("WEB3_URL").expect("WEB3_URL env var not found");
let governance_addr = std::env::var("GOVERNANCE_ADDR").expect("GOVERNANCE_ADDR env not found")
[2..]
.parse()
.expect("Failed to parse GOVERNANCE_ADDR");
// let priority_queue_address = std::env::var("PRIORITY_QUEUE_ADDR")
// .expect("PRIORITY_QUEUE_ADDR env var not found")[2..]
// .parse()
// .expect("Failed to parse PRIORITY_QUEUE_ADDR");
let contract_address = std::env::var("CONTRACT_ADDR").expect("CONTRACT_ADDR env var not found")
[2..]
.parse()
Expand All @@ -31,9 +22,6 @@ fn main() {
let watcher = EthWatch::new(
web3,
web3_event_loop_handle,
ConnectionPool::new(Some(1)),
governance_addr,
//priority_queue_address,
contract_address,
0,
eth_req_receiver,
Expand Down
11 changes: 1 addition & 10 deletions core/server/src/eth_watch/eth_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// Built-in deps
use std::collections::HashMap;
// External uses
use web3::types::Address;
// Workspace deps
use models::node::{PriorityOp, TokenId};
use models::node::PriorityOp;
// Local deps
use super::{received_ops::ReceivedPriorityOp, EthBlockId};

Expand All @@ -19,8 +18,6 @@ use super::{received_ops::ReceivedPriorityOp, EthBlockId};
pub struct ETHState {
/// The last block of the Ethereum network known to the Ethereum watcher.
last_ethereum_block: u64,
/// Tokens known to zkSync.
tokens: HashMap<TokenId, Address>,
/// Queue of priority operations that are accepted by Ethereum network,
/// but not yet have enough confirmations to be processed by zkSync.
///
Expand All @@ -37,13 +34,11 @@ pub struct ETHState {
impl ETHState {
pub fn new(
last_ethereum_block: u64,
tokens: HashMap<TokenId, Address>,
unconfirmed_queue: Vec<(EthBlockId, PriorityOp)>,
priority_queue: HashMap<u64, ReceivedPriorityOp>,
) -> Self {
Self {
last_ethereum_block,
tokens,
unconfirmed_queue,
priority_queue,
}
Expand All @@ -53,10 +48,6 @@ impl ETHState {
self.last_ethereum_block
}

pub fn tokens(&self) -> &HashMap<TokenId, Address> {
&self.tokens
}

pub fn priority_queue(&self) -> &HashMap<u64, ReceivedPriorityOp> {
&self.priority_queue
}
Expand Down
115 changes: 4 additions & 111 deletions core/server/src/eth_watch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,13 @@ use web3::{
};
// Workspace deps
use models::{
abi::{eip1271_contract, governance_contract, zksync_contract},
abi::{eip1271_contract, zksync_contract},
config_options::ConfigurationOptions,
misc::constants::EIP1271_SUCCESS_RETURN_VALUE,
node::tx::EIP1271Signature,
node::{FranklinPriorityOp, Nonce, PriorityOp, PubKeyHash, Token, TokenId},
node::{FranklinPriorityOp, Nonce, PriorityOp, PubKeyHash},
params::PRIORITY_EXPIRATION,
NewTokenEvent,
};
use storage::ConnectionPool;
// Local deps
use self::{eth_state::ETHState, received_ops::sift_outdated_ops};

Expand Down Expand Up @@ -93,12 +91,10 @@ pub enum EthWatchRequest {
}

pub struct EthWatch<T: Transport> {
gov_contract: (ethabi::Contract, Contract<T>),
zksync_contract: (ethabi::Contract, Contract<T>),
eth_state: ETHState,
web3: Web3<T>,
_web3_event_loop_handle: EventLoopHandle,
db_pool: ConnectionPool,
/// All ethereum events are accepted after sufficient confirmations to eliminate risk of block reorg.
number_of_confirmations_for_event: u64,

Expand All @@ -111,19 +107,10 @@ impl<T: Transport> EthWatch<T> {
pub fn new(
web3: Web3<T>,
web3_event_loop_handle: EventLoopHandle,
db_pool: ConnectionPool,
governance_addr: H160,
zksync_contract_addr: H160,
number_of_confirmations_for_event: u64,
eth_watch_req: mpsc::Receiver<EthWatchRequest>,
) -> Self {
let gov_contract = {
(
governance_contract(),
Contract::new(web3.eth(), governance_addr, governance_contract()),
)
};

let zksync_contract = {
(
zksync_contract(),
Expand All @@ -132,12 +119,10 @@ impl<T: Transport> EthWatch<T> {
};

Self {
gov_contract,
zksync_contract,
eth_state: ETHState::default(),
web3,
_web3_event_loop_handle: web3_event_loop_handle,
db_pool,
eth_watch_req,

mode: WatcherMode::Working,
Expand All @@ -154,41 +139,6 @@ impl<T: Transport> EthWatch<T> {
Contract::new(self.web3.eth(), address, eip1271_contract())
}

fn get_new_token_event_filter(&self, from: BlockNumber, to: BlockNumber) -> Filter {
let new_token_event_topic = self
.gov_contract
.0
.event("NewToken")
.expect("gov contract abi error")
.signature();
FilterBuilder::default()
.address(vec![self.gov_contract.1.address()])
.from_block(from)
.to_block(to)
.topics(Some(vec![new_token_event_topic]), None, None, None)
.build()
}

async fn get_new_token_events(
&self,
from: BlockNumber,
to: BlockNumber,
) -> Result<Vec<NewTokenEvent>, failure::Error> {
let filter = self.get_new_token_event_filter(from, to);

self.web3
.eth()
.logs(filter)
.compat()
.await?
.into_iter()
.map(|event| {
NewTokenEvent::try_from(event)
.map_err(|e| format_err!("Failed to parse NewToken event log from ETH: {}", e))
})
.collect()
}

fn get_priority_op_event_filter(&self, from: BlockNumber, to: BlockNumber) -> Filter {
let priority_op_event_topic = self
.zksync_contract
Expand Down Expand Up @@ -310,25 +260,7 @@ impl<T: Transport> EthWatch<T> {
priority_queue.insert(priority_op.serial_id, priority_op.into());
}

// restore token list from governance contract
let new_tokens = self
.get_new_token_events(
BlockNumber::Earliest,
BlockNumber::Number(new_block_with_accepted_events.into()),
)
.await?;

let mut tokens = HashMap::new();
for token in new_tokens.into_iter() {
tokens.insert(token.id as TokenId, token.address);
}

let new_state = ETHState::new(
last_ethereum_block,
tokens,
unconfirmed_queue,
priority_queue,
);
let new_state = ETHState::new(last_ethereum_block, unconfirmed_queue, priority_queue);

self.set_new_state(new_state);

Expand All @@ -345,21 +277,6 @@ impl<T: Transport> EthWatch<T> {
let new_block_with_accepted_events =
last_ethereum_block.saturating_sub(self.number_of_confirmations_for_event);

// Get new tokens
let new_tokens = self
.get_new_token_events(
BlockNumber::Number(previous_block_with_accepted_events.into()),
BlockNumber::Number(new_block_with_accepted_events.into()),
)
.await?;

// Extend the existing token list with the new tokens.
let mut tokens = self.eth_state.tokens().clone();
for token in new_tokens.into_iter() {
debug!("New token added: {:?}", token);
tokens.insert(token.id as TokenId, token.address);
}

// Get new priority ops
let priority_op_events = self
.get_priority_op_events(
Expand All @@ -382,32 +299,12 @@ impl<T: Transport> EthWatch<T> {
// update the state. This is done atomically to avoid the situation when
// due to error occurred mid-update the overall `ETHWatcher` state become
// messed up.
let new_state = ETHState::new(
last_ethereum_block,
tokens,
unconfirmed_queue,
priority_queue,
);
let new_state = ETHState::new(last_ethereum_block, unconfirmed_queue, priority_queue);
self.set_new_state(new_state);

Ok(())
}

fn commit_state(&self) {
self.db_pool
.access_storage()
.map(|storage| {
for (&id, &address) in self.eth_state.tokens() {
let decimals = 18;
let token = Token::new(id, address, &format!("ERC20-{}", id), decimals);
if let Err(e) = storage.tokens_schema().store_token(token) {
warn!("Failed to add token to db: {:?}", e);
}
}
})
.unwrap_or_default();
}

fn get_priority_requests(&self, first_serial_id: u64, max_chunks: usize) -> Vec<PriorityOp> {
let mut res = Vec::new();

Expand Down Expand Up @@ -499,7 +396,6 @@ impl<T: Transport> EthWatch<T> {

if last_block_number > self.eth_state.last_ethereum_block() {
self.process_new_blocks(last_block_number).await?;
self.commit_state();
}

Ok(())
Expand Down Expand Up @@ -632,7 +528,6 @@ impl<T: Transport> EthWatch<T> {

#[must_use]
pub fn start_eth_watch(
pool: ConnectionPool,
config_options: ConfigurationOptions,
eth_req_sender: mpsc::Sender<EthWatchRequest>,
eth_req_receiver: mpsc::Receiver<EthWatchRequest>,
Expand All @@ -645,8 +540,6 @@ pub fn start_eth_watch(
let eth_watch = EthWatch::new(
web3,
web3_event_loop_handle,
pool,
config_options.governance_eth_addr,
config_options.contract_eth_addr,
config_options.confirmations_for_eth_event,
eth_req_receiver,
Expand Down
1 change: 0 additions & 1 deletion core/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ fn main() {

let (eth_watch_req_sender, eth_watch_req_receiver) = mpsc::channel(256);
let eth_watch_task = start_eth_watch(
connection_pool.clone(),
config_opts.clone(),
eth_watch_req_sender.clone(),
eth_watch_req_receiver,
Expand Down

0 comments on commit 208641d

Please sign in to comment.