Skip to content

Commit

Permalink
eth_watch: rework priority ops gap handle
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Oct 25, 2021
1 parent f829830 commit fcde9a4
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 27 deletions.
28 changes: 28 additions & 0 deletions core/bin/zksync_core/src/eth_watch/eth_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@ use super::received_ops::ReceivedPriorityOp;
pub struct ETHState {
/// The last block of the Ethereum network known to the Ethereum watcher.
last_ethereum_block: u64,
/// The previous Ethereum block successfully processed by the watcher.
/// Keeping track of it is required to be able to poll the node for
/// the same range multiple times, e.g. in case it didn't return all
/// priority operations received by the contract.
last_ethereum_block_backup: u64,
/// Serial id of the next priority operation Ethereum watcher should process.
next_priority_op_id: SerialId,
/// Queue of priority operations that are accepted by Ethereum network,
/// but not yet have enough confirmations to be processed by zkSync.
///
Expand All @@ -37,13 +44,21 @@ pub struct ETHState {
impl ETHState {
pub fn new(
last_ethereum_block: u64,
last_ethereum_block_backup: u64,
unconfirmed_queue: Vec<PriorityOp>,
priority_queue: HashMap<SerialId, ReceivedPriorityOp>,
new_tokens: Vec<NewTokenEvent>,
register_nft_factory_events: Vec<RegisterNFTFactoryEvent>,
) -> Self {
let next_priority_op_id = priority_queue
.keys()
.max()
.map(|serial_id| *serial_id + 1)
.unwrap_or(0);
Self {
last_ethereum_block,
last_ethereum_block_backup,
next_priority_op_id,
unconfirmed_queue,
priority_queue,
new_tokens,
Expand All @@ -70,4 +85,17 @@ impl ETHState {
pub fn new_tokens(&self) -> &[NewTokenEvent] {
&self.new_tokens
}

pub fn next_priority_op_id(&self) -> SerialId {
self.next_priority_op_id
}

pub fn reset_last_ethereum_block(&mut self) {
self.last_ethereum_block = self.last_ethereum_block_backup;
}

#[cfg(test)]
pub(crate) fn last_ethereum_block_backup(&self) -> u64 {
self.last_ethereum_block_backup
}
}
65 changes: 47 additions & 18 deletions core/bin/zksync_core/src/eth_watch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use futures::{
channel::{mpsc, oneshot},
SinkExt, StreamExt,
};
use thiserror::Error;

pub use client::{get_web3_block_number, EthHttpClient};
use itertools::Itertools;
Expand All @@ -32,7 +33,7 @@ use zksync_config::ZkSyncConfig;
use zksync_crypto::params::PRIORITY_EXPIRATION;
use zksync_eth_client::ethereum_gateway::EthereumGateway;
use zksync_types::{
tx::TxHash, NewTokenEvent, Nonce, PriorityOp, PubKeyHash, RegisterNFTFactoryEvent,
tx::TxHash, NewTokenEvent, Nonce, PriorityOp, PubKeyHash, RegisterNFTFactoryEvent, SerialId,
ZkSyncPriorityOp, H256,
};

Expand Down Expand Up @@ -117,6 +118,14 @@ pub enum EthWatchRequest {
},
}

#[derive(Debug, Error)]
#[error("A priority op log is missing: last processed id is {0}, next is {1}")]
struct MissingPriorityOpError(SerialId, SerialId);

fn is_missing_priority_op_error(error: &anyhow::Error) -> bool {
error.is::<MissingPriorityOpError>()
}

pub struct EthWatch<W: EthClient> {
client: W,
eth_state: ETHState,
Expand Down Expand Up @@ -168,31 +177,44 @@ impl<W: EthClient> EthWatch<W> {
// Note that we don't have to add `number_of_confirmations_for_event` here, because the check function takes
// care of it on its own. Here we calculate "how many blocks should we watch", and the offsets with respect
// to the `number_of_confirmations_for_event` are calculated by `update_eth_state`.
let block_difference =
last_ethereum_block.saturating_sub(self.eth_state.last_ethereum_block());
let mut next_priority_op_id = self.eth_state.next_priority_op_id();
let previous_ethereum_block = self.eth_state.last_ethereum_block();
let block_difference = last_ethereum_block.saturating_sub(previous_ethereum_block);

let updated_state = self
.update_eth_state(last_ethereum_block, block_difference)
.await?;

// It's assumed that for the current state all priority operations have consecutive ids,
// i.e. there're no gaps. Thus, there're two opportunities for missing them: either
// we're missing last operations from the previous ethereum blocks range or there's a gap
// in the updated state.

// Extend the existing priority operations with the new ones.
let mut priority_queue = sift_outdated_ops(self.eth_state.priority_queue());

for (serial_id, op) in updated_state.priority_queue() {
priority_queue.insert(*serial_id, op.clone());
}

// Check for gaps in priority queue. If some event is missing we skip this `ETHState` update.
let mut priority_op_ids: Vec<_> = priority_queue.keys().cloned().collect();
priority_op_ids.sort_unstable();
for i in 0..priority_op_ids.len().saturating_sub(1) {
let gap = priority_op_ids[i + 1] - priority_op_ids[i];
anyhow::ensure!(
gap == 1,
"Gap in priority op queue: gap={}, priority_op_before_gap={}",
gap,
priority_op_ids[i]
);
// Iterate through new priority operations sorted by their serial id.
for (serial_id, op) in updated_state
.priority_queue()
.iter()
.sorted_by_key(|(id, _)| **id)
{
if *serial_id > next_priority_op_id {
// Either the previous block range contains missing operations
// or the new one has a gap in the beginning.
// We have to revert the block range back. This will only move the watcher
// backwards for a single time since `last_ethereum_block` and its backup will
// be equal.
self.eth_state.reset_last_ethereum_block();
return Err(anyhow::Error::from(MissingPriorityOpError(
next_priority_op_id,
*serial_id,
)));
} else {
// Next serial id matched the expected one.
priority_queue.insert(*serial_id, op.clone());
next_priority_op_id = next_priority_op_id.max(*serial_id + 1);
}
}

// Extend the existing token events with the new ones.
Expand All @@ -215,6 +237,7 @@ impl<W: EthClient> EthWatch<W> {

let new_state = ETHState::new(
last_ethereum_block,
previous_ethereum_block,
updated_state.unconfirmed_queue().to_vec(),
priority_queue,
new_tokens,
Expand Down Expand Up @@ -289,7 +312,9 @@ impl<W: EthClient> EthWatch<W> {
new_priority_op_ids
);

// The backup block number is not used.
let state = ETHState::new(
current_ethereum_block,
current_ethereum_block,
unconfirmed_queue,
priority_queue,
Expand Down Expand Up @@ -570,6 +595,10 @@ impl<W: EthClient> EthWatch<W> {
Entering the backoff mode"
);
self.enter_backoff_mode();
} else if is_missing_priority_op_error(&error) {
vlog::warn!("{}", error);
// Wait for some time and try to fetch new logs again.
self.enter_backoff_mode();
} else {
// Some unexpected kind of error, we won't shutdown the node because of it,
// but rather expect node administrators to handle the situation.
Expand Down
108 changes: 99 additions & 9 deletions core/bin/zksync_core/src/eth_watch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use zksync_types::{
TokenId, ZkSyncPriorityOp, H256,
};

use super::is_missing_priority_op_error;
use crate::eth_watch::{client::EthClient, EthWatch};
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -63,6 +64,11 @@ impl FakeEthClient {
BlockNumber::Number(number) => number.as_u64(),
}
}

async fn set_last_block_number(&mut self, block_number: u64) {
let mut inner = self.inner.write().await;
inner.last_block_number = block_number;
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -143,11 +149,11 @@ async fn test_operation_queues() {
from: from_addr,
token: TokenId(0),
amount: Default::default(),
to: to_addr,
to: Default::default(),
}),
deadline_block: 0,
eth_hash: [2; 32].into(),
eth_block: 4,
eth_block: 3,
eth_block_index: Some(1),
},
PriorityOp {
Expand All @@ -156,11 +162,11 @@ async fn test_operation_queues() {
from: Default::default(),
token: TokenId(0),
amount: Default::default(),
to: Default::default(),
to: to_addr,
}),
deadline_block: 0,
eth_hash: [3; 32].into(),
eth_block: 3,
eth_block: 4,
eth_block_index: Some(1),
},
PriorityOp {
Expand Down Expand Up @@ -189,15 +195,15 @@ async fn test_operation_queues() {
assert_eq!(priority_queues.len(), 1);
assert_eq!(
priority_queues.values().next().unwrap().as_ref().serial_id,
1
0
);
assert_eq!(unconfirmed_queue.len(), 2);
assert_eq!(unconfirmed_queue[0].serial_id, 0);
assert_eq!(unconfirmed_queue[0].serial_id, 1);
assert_eq!(unconfirmed_queue[1].serial_id, 2);

priority_queues.get(&1).unwrap();
priority_queues.get(&0).unwrap();
watcher
.find_ongoing_op_by_eth_hash(H256::from_slice(&[2u8; 32]))
.find_ongoing_op_by_eth_hash(H256::from_slice(&[3u8; 32]))
.unwrap();

// Make sure that the old behavior of the pending deposits getter has not changed.
Expand All @@ -213,7 +219,7 @@ async fn test_operation_queues() {
limit: 2,
direction: PaginationDirection::Newer,
});
assert_eq!(ops.list[0].tx_hash, priority_ops[0].tx_hash());
assert_eq!(ops.list[0].tx_hash, priority_ops[1].tx_hash());
assert_eq!(ops.list[1].tx_hash, priority_ops[2].tx_hash());
assert!(watcher
.get_ongoing_ops_for(PaginationQuery {
Expand Down Expand Up @@ -372,6 +378,7 @@ async fn test_restore_and_poll() {
])
.await;
watcher.poll_eth_node().await.unwrap();
dbg!(&watcher.eth_state);
assert_eq!(watcher.eth_state.last_ethereum_block(), 5);
let priority_queues = watcher.eth_state.priority_queue();
let unconfirmed_queue = watcher.eth_state.unconfirmed_queue();
Expand Down Expand Up @@ -429,3 +436,86 @@ async fn test_restore_and_poll_time_lag() {
priority_queues.get(&0).unwrap();
priority_queues.get(&1).unwrap();
}

#[tokio::test]
async fn test_serial_id_gaps() {
let deposit = ZkSyncPriorityOp::Deposit(Deposit {
from: Default::default(),
token: TokenId(0),
amount: Default::default(),
to: [2u8; 20].into(),
});

let mut client = FakeEthClient::new();
client
.add_operations(&[
PriorityOp {
serial_id: 0,
data: deposit.clone(),
deadline_block: 0,
eth_hash: [2; 32].into(),
eth_block: 1,
eth_block_index: Some(1),
},
PriorityOp {
serial_id: 1,
data: deposit.clone(),
deadline_block: 0,
eth_hash: [3; 32].into(),
eth_block: 1,
eth_block_index: Some(2),
},
])
.await;

let mut watcher = create_watcher(client.clone());
// Restore the valid (empty) state.
watcher.restore_state_from_eth(0).await.unwrap();
assert_eq!(watcher.eth_state.last_ethereum_block(), 0);
assert!(watcher.eth_state.priority_queue().is_empty());
assert_eq!(watcher.eth_state.next_priority_op_id(), 0);

// Advance the block number and poll the valid block range.
client.set_last_block_number(2).await;
watcher.poll_eth_node().await.unwrap();
assert_eq!(watcher.eth_state.next_priority_op_id(), 2);
assert_eq!(watcher.eth_state.last_ethereum_block_backup(), 0);
assert_eq!(watcher.eth_state.last_ethereum_block(), 2);

// Add a gap.
client
.add_operations(&[PriorityOp {
serial_id: 3, // Then next id is expected to be 2.
data: deposit.clone(),
deadline_block: 0,
eth_hash: [2; 32].into(),
eth_block: 2,
eth_block_index: Some(1),
}])
.await;
client.set_last_block_number(3).await;
// Should detect a gap.
let err = watcher.poll_eth_node().await.unwrap_err();
assert!(is_missing_priority_op_error(&err));

assert_eq!(watcher.eth_state.next_priority_op_id(), 2);
// The range got reset.
assert_eq!(watcher.eth_state.last_ethereum_block_backup(), 0);
assert_eq!(watcher.eth_state.last_ethereum_block(), 0);

// Add a missing operations to the processed range.
client
.add_operations(&[PriorityOp {
serial_id: 2,
data: deposit.clone(),
deadline_block: 0,
eth_hash: [2; 32].into(),
eth_block: 1,
eth_block_index: Some(1),
}])
.await;
watcher.poll_eth_node().await.unwrap();
assert_eq!(watcher.eth_state.next_priority_op_id(), 4);
assert_eq!(watcher.eth_state.last_ethereum_block_backup(), 0);
assert_eq!(watcher.eth_state.last_ethereum_block(), 3);
}

0 comments on commit fcde9a4

Please sign in to comment.