Skip to content

Commit

Permalink
data restore: fetch Eth logs for priority operations
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Nov 17, 2021
1 parent 5135a37 commit c4f1fcd
Show file tree
Hide file tree
Showing 12 changed files with 420 additions and 36 deletions.
18 changes: 15 additions & 3 deletions core/bin/data_restore/src/data_restore_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl<T: Transport> DataRestoreDriver<T> {
let mut transaction = interactor.start_transaction().await;

transaction
.save_events_state(&[], &[], genesis_eth_block_number)
.save_events_state(&[], &[], &[], genesis_eth_block_number)
.await;

let genesis_fee_account =
Expand Down Expand Up @@ -401,7 +401,7 @@ impl<T: Transport> DataRestoreDriver<T> {
/// Updates events state, saves new blocks, tokens events and the last watched eth block number in storage
/// Returns bool flag, true if there are new block events
async fn update_events_state(&mut self, interactor: &mut StorageInteractor<'_>) -> bool {
let (block_events, token_events, last_watched_eth_block_number) = self
let (block_events, token_events, priority_op_data, last_watched_eth_block_number) = self
.events_state
.update_events_state(
&self.web3,
Expand All @@ -417,7 +417,8 @@ impl<T: Transport> DataRestoreDriver<T> {
interactor
.save_events_state(
&block_events,
token_events.as_slice(),
&token_events,
&priority_op_data,
last_watched_eth_block_number,
)
.await;
Expand Down Expand Up @@ -465,6 +466,17 @@ impl<T: Transport> DataRestoreDriver<T> {
.update_tree_state(blocks[i].clone(), updates[i].clone())
.await;
}

let priority_op_data = self.events_state.priority_op_data.values();
let serial_ids = transaction.apply_priority_op_data(priority_op_data).await;
if !serial_ids.is_empty() {
vlog::debug!(
"Serial ids of operations with no corresponding blocks in storage: {:?}",
serial_ids
);
}
self.events_state.sift_priority_ops(&serial_ids);

transaction.commit().await;

vlog::debug!("Updated state");
Expand Down
30 changes: 28 additions & 2 deletions core/bin/data_restore/src/database_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use zksync_storage::{
};
use zksync_types::{
aggregated_operations::{BlocksCommitOperation, BlocksExecuteOperation},
AccountId, BlockNumber, NewTokenEvent, SerialId, Token, TokenId, TokenInfo, TokenKind,
AccountId, BlockNumber, NewTokenEvent, PriorityOp, SerialId, Token, TokenId, TokenInfo,
TokenKind,
{block::Block, AccountUpdate, AccountUpdates},
};

Expand Down Expand Up @@ -129,6 +130,17 @@ impl<'a> DatabaseStorageInteractor<'a> {
.expect("Unable to commit DB transaction");
}

pub async fn apply_priority_op_data(
&mut self,
priority_op_data: impl Iterator<Item = &PriorityOp>,
) -> Vec<SerialId> {
self.storage
.data_restore_schema()
.update_executed_priority_operations(priority_op_data)
.await
.expect("Failed to update executed priority operations")
}

pub async fn store_token(&mut self, token: TokenInfo, token_id: TokenId) {
self.storage
.tokens_schema()
Expand All @@ -147,6 +159,7 @@ impl<'a> DatabaseStorageInteractor<'a> {
&mut self,
block_events: &[BlockEvent],
tokens: &[NewTokenEvent],
priority_op_data: &[PriorityOp],
last_watched_eth_block_number: u64,
) {
let mut new_events: Vec<NewBlockEvent> = vec![];
Expand All @@ -167,7 +180,12 @@ impl<'a> DatabaseStorageInteractor<'a> {
.collect();
self.storage
.data_restore_schema()
.save_events_state(new_events.as_slice(), &tokens, &block_number)
.save_events_state(
new_events.as_slice(),
&tokens,
priority_op_data,
&block_number,
)
.await
.expect("Cant update events state");
}
Expand Down Expand Up @@ -230,10 +248,18 @@ impl<'a> DatabaseStorageInteractor<'a> {
verified_events.push(block_event);
}

let priority_op_data = self
.storage
.data_restore_schema()
.get_priority_op_data()
.await
.expect("Failed to load priority operations data");

EventsState {
committed_events,
verified_events,
last_watched_eth_block_number,
priority_op_data,
}
}

Expand Down
146 changes: 139 additions & 7 deletions core/bin/data_restore/src/events_state.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
// Built-in deps
use std::collections::HashMap;
use std::convert::TryFrom;
// External deps
use anyhow::format_err;
use std::convert::TryFrom;
use web3::contract::Contract;
use web3::types::{BlockNumber as Web3BlockNumber, FilterBuilder, Log, Transaction, H256, U256};
use web3::types::{
BlockNumber as Web3BlockNumber, FilterBuilder, Log, Transaction, H256, U256, U64,
};
use web3::{Transport, Web3};
// Workspace deps
use zksync_contracts::upgrade_gatekeeper;
use zksync_types::{Address, BlockNumber, NewTokenEvent};
use zksync_types::{Address, BlockNumber, NewTokenEvent, PriorityOp, SerialId};
// Local deps
use crate::contract::{ZkSyncContractVersion, ZkSyncDeployedContract};
use crate::eth_tx_helpers::get_block_number_from_ethereum_transaction;
Expand All @@ -21,6 +25,11 @@ pub struct EventsState {
pub verified_events: Vec<BlockEvent>,
/// Last watched ethereum block number
pub last_watched_eth_block_number: u64,
/// Priority operations data parsed from logs
/// emitted by the zkSync contract. Required for
/// fetching fields which are not present in public data
/// such as Ethereum transaction hash.
pub priority_op_data: HashMap<SerialId, PriorityOp>,
}

impl std::default::Default for EventsState {
Expand All @@ -30,6 +39,7 @@ impl std::default::Default for EventsState {
committed_events: Vec::new(),
verified_events: Vec::new(),
last_watched_eth_block_number: 0,
priority_op_data: HashMap::new(),
}
}
}
Expand All @@ -52,6 +62,21 @@ impl EventsState {
Ok(genesis_block_number)
}

/// Remove successfully stored priority operations from the queue.
///
/// # Arguments
///
/// * `serial_ids` - serial ids of operations that don't have a block in storage yet.
pub fn sift_priority_ops(&mut self, serial_ids: &[SerialId]) {
let mut priority_op_data = HashMap::with_capacity(self.priority_op_data.len());
for serial_id in serial_ids {
if let Some(priority_op) = self.priority_op_data.remove(&serial_id) {
priority_op_data.insert(*serial_id, priority_op);
}
}
self.priority_op_data = priority_op_data;
}

/// Update past events state from last watched ethereum block with delta between last eth block and last watched block.
/// Returns new verified committed blocks evens, added tokens events and the last watched eth block number
///
Expand All @@ -75,10 +100,10 @@ impl EventsState {
eth_blocks_step: u64,
end_eth_blocks_offset: u64,
init_contract_version: u32,
) -> Result<(Vec<BlockEvent>, Vec<NewTokenEvent>, u64), anyhow::Error> {
) -> Result<(Vec<BlockEvent>, Vec<NewTokenEvent>, Vec<PriorityOp>, u64), anyhow::Error> {
self.remove_verified_events();

let (events, token_events, to_block_number) =
let (events, token_events, priority_op_data, to_block_number) =
EventsState::get_new_events_and_last_watched_block(
web3,
zksync_contract,
Expand All @@ -104,12 +129,19 @@ impl EventsState {
);
}

for priority_op in priority_op_data {
self.priority_op_data
.insert(priority_op.serial_id, priority_op);
}
let mut events_to_return = self.committed_events.clone();
events_to_return.extend(self.verified_events.clone());

let priority_op_data = self.priority_op_data.values().cloned().collect();

Ok((
events_to_return,
token_events,
priority_op_data,
self.last_watched_eth_block_number,
))
}
Expand Down Expand Up @@ -146,13 +178,15 @@ impl EventsState {
) -> anyhow::Result<(
Vec<(&'a ZkSyncDeployedContract<T>, Vec<Log>)>,
Vec<NewTokenEvent>,
Vec<PriorityOp>,
u64,
)> {
let latest_eth_block_minus_delta =
EventsState::get_last_block_number(web3).await? - end_eth_blocks_offset;

if latest_eth_block_minus_delta == last_watched_block_number {
return Ok((vec![], vec![], last_watched_block_number)); // No new eth blocks
return Ok((vec![], vec![], vec![], last_watched_block_number));
// No new eth blocks
}

let from_block_number_u64 = last_watched_block_number + 1;
Expand Down Expand Up @@ -182,7 +216,15 @@ impl EventsState {
.await?;
logs.push((zksync_contract, block_logs));

Ok((logs, token_logs, to_block_number_u64))
let priority_op_data = EventsState::get_priority_operations_logs(
web3,
zksync_contract,
from_block_number_u64.into(),
to_block_number_u64.into(),
)
.await?;

Ok((logs, token_logs, priority_op_data, to_block_number_u64))
}

/// Returns logs about complete contract upgrades.
Expand Down Expand Up @@ -217,6 +259,96 @@ impl EventsState {
Ok(result)
}

async fn get_priority_operations_logs_inner<T: Transport>(
web3: &Web3<T>,
contract: &ZkSyncDeployedContract<T>,
from: U64,
to: U64,
) -> anyhow::Result<Vec<PriorityOp>> {
let priority_op_topic = contract
.abi
.event("NewPriorityRequest")
.expect("main contract abi error")
.signature();
let filter = FilterBuilder::default()
.address(vec![contract.web3_contract.address()])
.from_block(Web3BlockNumber::Number(from))
.to_block(Web3BlockNumber::Number(to))
.topics(Some(vec![priority_op_topic]), None, None, None)
.build();

let logs = web3.eth().logs(filter).await?;
logs.into_iter()
.map(|event| {
PriorityOp::try_from(event)
.map_err(|e| format_err!("Failed to parse event log from ETH: {:?}", e))
})
.collect()
}

/// Returns priority operations logs emitted by the zkSync contract.
///
/// # Arguments
///
/// * `web3` - Web3 provider.
/// * `contract` - zkSync contract.
/// * `start` - start of the block range
/// * `end` - end of the block range (inclusive).
///
async fn get_priority_operations_logs<T: Transport>(
web3: &Web3<T>,
contract: &ZkSyncDeployedContract<T>,
start: U64,
end: U64,
) -> Result<Vec<PriorityOp>, anyhow::Error> {
const LIMIT_ERR: &str = "query returned more than";
let mut from_number = start;
let mut to_number = end;

let mut priority_operations = Vec::new();

loop {
if from_number > end {
return Ok(priority_operations);
}

let result = EventsState::get_priority_operations_logs_inner(
web3,
contract,
from_number,
to_number,
)
.await;
let range_diff = to_number - from_number;

match result {
Ok(mut operations) => {
priority_operations.append(&mut operations);

from_number = to_number + 1;
to_number = (from_number + range_diff).min(end);

continue;
}
Err(err) => {
if err.to_string().contains(LIMIT_ERR) {
if to_number <= from_number || to_number - from_number == 1.into() {
panic!("Ethereum node failed to return logs for a single block")
}

// Shorten the block range.
to_number = from_number + (range_diff / 2u64);

continue;
} else {
// Non-recoverable error.
return Err(err);
}
}
}
}
}

/// Returns new added token logs
///
/// # Arguments
Expand Down
11 changes: 10 additions & 1 deletion core/bin/data_restore/src/inmemory_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use web3::types::Address;
use zksync_types::block::Block;
use zksync_types::{
Account, AccountId, AccountMap, AccountUpdate, AccountUpdates, Action, BlockNumber,
NewTokenEvent, Operation, SerialId, Token, TokenId, TokenInfo, TokenKind,
NewTokenEvent, Operation, PriorityOp, SerialId, Token, TokenId, TokenInfo, TokenKind,
};

use crate::{
Expand Down Expand Up @@ -211,6 +211,13 @@ impl InMemoryStorageInteractor {
// TODO save operations
}

pub async fn apply_priority_op_data(
&mut self,
_priority_op_data: impl Iterator<Item = &PriorityOp>,
) -> Vec<SerialId> {
Vec::new()
}

pub async fn store_token(&mut self, token: TokenInfo, token_id: TokenId) {
let mut inner = self.inner.borrow_mut();
let token = Token::new(
Expand All @@ -227,6 +234,7 @@ impl InMemoryStorageInteractor {
&mut self,
block_events: &[BlockEvent],
tokens: &[NewTokenEvent],
_priority_op_data: &[PriorityOp],
last_watched_eth_block_number: u64,
) {
let mut inner = self.inner.borrow_mut();
Expand Down Expand Up @@ -277,6 +285,7 @@ impl InMemoryStorageInteractor {
committed_events,
verified_events,
last_watched_eth_block_number: inner.last_watched_block,
priority_op_data: Default::default(),
}
}

Expand Down
Loading

0 comments on commit c4f1fcd

Please sign in to comment.