Skip to content

Commit

Permalink
Merge #2139
Browse files Browse the repository at this point in the history
2139: Update data restore (ZKS-924) r=Deniallugo a=StanislavBreadless



Co-authored-by: Stanislav Bezkorovainyi <[email protected]>
Co-authored-by: Denis Baryshev <[email protected]>
Co-authored-by: Danil <[email protected]>
Co-authored-by: deniallugo <[email protected]>
  • Loading branch information
4 people authored Mar 24, 2022
2 parents 32e34f1 + ce75a6e commit aa5e788
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 27 deletions.
60 changes: 40 additions & 20 deletions core/bin/data_restore/src/data_restore_driver.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

// External deps
use web3::{
contract::Contract,
Expand Down Expand Up @@ -246,15 +248,16 @@ impl<T: Transport> DataRestoreDriver<T> {
self.tree_state = tree_state;
}

async fn store_tree_cache(&mut self, interactor: &mut StorageInteractor<'_>) {
async fn update_tree_cache(&mut self, interactor: &mut StorageInteractor<'_>) {
vlog::info!(
"Storing the tree cache, block number: {}",
"Updating the tree cache, block number: {}",
self.tree_state.block_number
);

self.tree_state.state.root_hash();
let tree_cache = self.tree_state.state.get_balance_tree().get_internals();
interactor
.store_tree_cache(
.update_tree_cache(
self.tree_state.block_number,
serde_json::to_string(&tree_cache).expect("failed to serialize tree cache"),
)
Expand Down Expand Up @@ -283,6 +286,8 @@ impl<T: Transport> DataRestoreDriver<T> {
cache.nfts,
)
} else {
vlog::info!("Building tree from scratch");

let tree_state = transaction.get_tree_state().await;
TreeState::load(
tree_state.last_block_number,
Expand All @@ -295,6 +300,7 @@ impl<T: Transport> DataRestoreDriver<T> {
StorageUpdateState::Events => {
// Update operations
let new_ops_blocks = self.update_operations_state(&mut transaction).await;

// Update tree
self.update_tree_state(&mut transaction, new_ops_blocks)
.await;
Expand Down Expand Up @@ -324,8 +330,9 @@ impl<T: Transport> DataRestoreDriver<T> {

let is_finished = self.finite_mode && (total_verified_blocks == *last_verified_block);
// Save tree cache if necessary.
if is_finished && !is_cached {
self.store_tree_cache(interactor).await;
if !is_cached {
vlog::info!("Saving tree cache for future re-uses");
self.update_tree_cache(interactor).await;
}
is_finished
}
Expand Down Expand Up @@ -358,6 +365,9 @@ impl<T: Transport> DataRestoreDriver<T> {
// to keep the `state_keeper` consistent with the `eth_sender`.
transaction.update_eth_state().await;

// We update tree cache for each load of updates to allow fast restart.
self.update_tree_cache(&mut transaction).await;

transaction.commit().await;

vlog::info!(
Expand Down Expand Up @@ -387,11 +397,6 @@ impl<T: Transport> DataRestoreDriver<T> {
if self.final_hash.is_some() && !final_hash_was_found {
panic!("Final hash was not met during the state restoring process");
}

// We've restored all the blocks, our job is done. Store the tree cache for
// consequent usage.
self.store_tree_cache(interactor).await;

break;
}
}
Expand Down Expand Up @@ -518,6 +523,10 @@ impl<T: Transport> DataRestoreDriver<T> {
let mut blocks = Vec::new();

let mut last_event_tx_hash = None;
// The HashMap from block_num to the RollupOpsBlock data for the tx represented by last_event_tx_hash.
// It is used as a cache to reuse the fetched data.
let mut last_tx_blocks = HashMap::new();

// TODO (ZKS-722): either due to Ethereum node lag or unknown
// bug in the events state, we have to additionally filter out
// already processed rollup blocks.
Expand All @@ -529,18 +538,29 @@ impl<T: Transport> DataRestoreDriver<T> {
{
// We use an aggregated block in contracts, which means that several BlockEvent can include the same tx_hash,
// but for correct restore we need to generate RollupBlocks from this tx only once.
// These blocks go one after the other, and checking only the last transaction hash is safe
if let Some(tx) = last_event_tx_hash {
if tx == event.transaction_hash {
continue;
}
// These blocks go one after the other, and checking only the last transaction hash is safe.

// If the previous tx hash does not exist or it is not equal to the current one, we should
// re-fetch the blocks for the new tx hash.
if !last_event_tx_hash
.map(|tx| tx == event.transaction_hash)
.unwrap_or_default()
{
let blocks = RollupOpsBlock::get_rollup_ops_blocks(&self.web3, event)
.await
.expect("Cant get new operation blocks from events");

last_tx_blocks = blocks
.into_iter()
.map(|block| (block.block_num, block))
.collect();
last_event_tx_hash = Some(event.transaction_hash);
}

let block = RollupOpsBlock::get_rollup_ops_blocks(&self.web3, event)
.await
.expect("Cant get new operation blocks from events");
blocks.extend(block);
last_event_tx_hash = Some(event.transaction_hash);
let rollup_block = last_tx_blocks
.remove(&event.block_num)
.expect("Block not found");
blocks.push(rollup_block);
}

blocks
Expand Down
26 changes: 23 additions & 3 deletions core/bin/data_restore/src/database_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,33 @@ impl<'a> DatabaseStorageInteractor<'a> {
}
}

pub async fn store_tree_cache(&mut self, block_number: BlockNumber, tree_cache: String) {
self.storage
pub async fn update_tree_cache(&mut self, block_number: BlockNumber, tree_cache: String) {
let mut transaction = self
.storage
.start_transaction()
.await
.expect("Failed to start transaction");

transaction
.chain()
.block_schema()
.remove_old_account_tree_cache(block_number)
.await
.expect("Failed to remove old tree cache");

// It is safe to store the new tree cache without additional checks
// since on conflict it does nothing.
transaction
.chain()
.block_schema()
.store_account_tree_cache(block_number, tree_cache)
.await
.expect("Failed to store the tree cache");
.expect("Failed to store new tree cache");

transaction
.commit()
.await
.expect("Failed to update tree cache");
}

pub async fn get_storage_state(&mut self) -> StorageUpdateState {
Expand Down
2 changes: 1 addition & 1 deletion core/bin/data_restore/src/inmemory_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ impl InMemoryStorageInteractor {
None
}

pub async fn store_tree_cache(&mut self, _block_number: BlockNumber, _tree_cache: String) {
pub async fn update_tree_cache(&mut self, _block_number: BlockNumber, _tree_cache: String) {
// Inmemory storage doesn't support caching.
}

Expand Down
6 changes: 3 additions & 3 deletions core/bin/data_restore/src/storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,15 +196,15 @@ impl StorageInteractor<'_> {
storage_interact!(self.get_cached_tree_state())
}

/// Saves the tree cache in the database.
/// Deletes the latest tree cache in the database and saves the new one.
///
/// # Arguments
///
/// * `block_number` - The corresponding block number
/// * `tree_cache` - Merkle tree cache
///
pub async fn store_tree_cache(&mut self, block_number: BlockNumber, tree_cache: String) {
storage_interact!(self.store_tree_cache(block_number, tree_cache))
pub async fn update_tree_cache(&mut self, block_number: BlockNumber, tree_cache: String) {
storage_interact!(self.update_tree_cache(block_number, tree_cache))
}

/// Retrieves the maximum serial id of a priority requests
Expand Down

0 comments on commit aa5e788

Please sign in to comment.