Skip to content

Commit

Permalink
Merge pull request #1725 from matter-labs/check-priority-queue-gap
Browse files Browse the repository at this point in the history
Add handling of priority queue gap in eth_watch
  • Loading branch information
dvush authored Jun 27, 2021
2 parents 872aba0 + 0697458 commit 918e28d
Showing 1 changed file with 23 additions and 3 deletions.
26 changes: 23 additions & 3 deletions core/bin/zksync_core/src/eth_watch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,24 @@ impl<W: EthClient> EthWatch<W> {

// Extend the existing priority operations with the new ones.
let mut priority_queue = sift_outdated_ops(self.eth_state.priority_queue());
let new_priority_op_ids: Vec<_> = priority_queue.keys().cloned().collect();
vlog::debug!("New priority ops added: {:?}", new_priority_op_ids);

for (serial_id, op) in received_priority_queue {
priority_queue.insert(serial_id, op);
}

// Check for gaps in priority queue. If some event is missing we skip this `ETHState` update.
let mut priority_op_ids: Vec<_> = priority_queue.keys().cloned().collect();
priority_op_ids.sort_unstable();
for i in 0..priority_op_ids.len().saturating_sub(1) {
let gap = priority_op_ids[i + 1] - priority_op_ids[i];
anyhow::ensure!(
gap == 1,
"Gap in priority op queue: gap={}, priority_op_before_gap={}",
gap,
priority_op_ids[i]
);
}

let new_state = ETHState::new(last_ethereum_block, unconfirmed_queue, priority_queue);
self.set_new_state(new_state);
Ok(())
Expand Down Expand Up @@ -200,7 +211,7 @@ impl<W: EthClient> EthWatch<W> {
new_block_with_accepted_events.saturating_sub(unprocessed_blocks_amount);

let unconfirmed_queue = self.get_unconfirmed_ops(current_ethereum_block).await?;
let priority_queue = self
let priority_queue: HashMap<u64, _> = self
.client
.get_priority_op_events(
BlockNumber::Number(previous_block_with_accepted_events.into()),
Expand All @@ -211,6 +222,15 @@ impl<W: EthClient> EthWatch<W> {
.map(|priority_op| (priority_op.serial_id, priority_op.into()))
.collect();

let mut new_priority_op_ids: Vec<_> = priority_queue.keys().cloned().collect();
new_priority_op_ids.sort_unstable();
vlog::debug!(
"Updating eth state: block_range=[{},{}], new_priority_ops={:?}",
previous_block_with_accepted_events,
new_block_with_accepted_events,
new_priority_op_ids
);

Ok((unconfirmed_queue, priority_queue))
}

Expand Down

0 comments on commit 918e28d

Please sign in to comment.