Skip to content

Commit

Permalink
Optimize data_restore continue mode
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber authored and Deniallugo committed Jul 7, 2021
1 parent c8dfc58 commit cfa0ca7
Show file tree
Hide file tree
Showing 7 changed files with 253 additions and 37 deletions.
55 changes: 46 additions & 9 deletions core/bin/data_restore/src/data_restore_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,18 +232,47 @@ where
self.tree_state = tree_state;
}

async fn store_tree_cache(&mut self, interactor: &mut I) {
vlog::info!(
"Storing the tree cache, block number: {}",
self.tree_state.state.block_number
);
self.tree_state.state.root_hash();
let tree_cache = self.tree_state.state.get_balance_tree().get_internals();
interactor
.store_tree_cache(
self.tree_state.state.block_number,
serde_json::to_value(tree_cache).expect("failed to serialize tree cache"),
)
.await;
}

/// Stops states from storage
pub async fn load_state_from_storage(&mut self, interactor: &mut I) -> bool {
vlog::info!("Loading state from storage");
let state = interactor.get_storage_state().await;
self.events_state = interactor.get_block_events_state_from_storage().await;
let tree_state = interactor.get_tree_state().await;
self.tree_state = TreeState::load(
tree_state.last_block_number, // current block
tree_state.account_map, // account map
tree_state.unprocessed_prior_ops, // unprocessed priority op
tree_state.fee_acc_id, // fee account
);

let mut is_cached = false;
// Try to load tree cache from the database.
self.tree_state = if let Some(cache) = interactor.get_cached_tree_state().await {
vlog::info!("Using tree cache from the database");
is_cached = true;
TreeState::restore_from_cache(
cache.tree_cache,
cache.account_map,
cache.current_block,
cache.nfts,
)
} else {
let tree_state = interactor.get_tree_state().await;
TreeState::load(
tree_state.last_block_number,
tree_state.account_map,
tree_state.unprocessed_prior_ops,
tree_state.fee_acc_id,
)
};
match state {
StorageUpdateState::Events => {
// Update operations
Expand All @@ -268,7 +297,12 @@ where
self.tree_state.root_hash()
);

self.finite_mode && (total_verified_blocks == *last_verified_block)
let is_finished = self.finite_mode && (total_verified_blocks == *last_verified_block);
// Save tree cache if necessary.
if is_finished && !is_cached {
self.store_tree_cache(interactor).await;
}
is_finished
}

/// Activates states updates
Expand Down Expand Up @@ -324,7 +358,10 @@ where
panic!("Final hash was not met during the state restoring process");
}

// We've restored all the blocks, our job is done.
// We've restored all the blocks, our job is done. Store the tree cache for
// consequent usage.
self.store_tree_cache(interactor).await;

break;
}
}
Expand Down
56 changes: 54 additions & 2 deletions core/bin/data_restore/src/database_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use zksync_storage::{
};
use zksync_types::{
aggregated_operations::{BlocksCommitOperation, BlocksExecuteOperation},
AccountId, NewTokenEvent, Token, TokenId, TokenInfo,
AccountId, BlockNumber, NewTokenEvent, Token, TokenId, TokenInfo,
{block::Block, AccountUpdate, AccountUpdates},
};

Expand All @@ -20,7 +20,7 @@ use crate::{
rollup_ops::RollupOpsBlock,
storage_interactor::{
block_event_into_stored_block_event, stored_block_event_into_block_event,
stored_ops_block_into_ops_block, StorageInteractor,
stored_ops_block_into_ops_block, CachedTreeState, StorageInteractor,
},
};

Expand Down Expand Up @@ -289,6 +289,58 @@ impl StorageInteractor for DatabaseStorageInteractor<'_> {
.expect("Can't update the eth_stats table")
}

async fn get_cached_tree_state(&mut self) -> Option<CachedTreeState> {
let (last_block, account_map) = self
.storage
.chain()
.state_schema()
.load_verified_state()
.await
.expect("Failed to load verified state from the database");

let tree_cache = self
.storage
.chain()
.block_schema()
.get_account_tree_cache_block(last_block)
.await
.expect("Failed to query the database for the tree cache");

if let Some(tree_cache) = tree_cache {
let current_block = self
.storage
.chain()
.block_schema()
.get_block(last_block)
.await
.expect("Failed to query the database for the latest block")
.unwrap();
let nfts = self
.storage
.tokens_schema()
.load_nfts()
.await
.expect("Failed to load NFTs from the database");
Some(CachedTreeState {
tree_cache,
account_map,
current_block,
nfts,
})
} else {
None
}
}

async fn store_tree_cache(&mut self, block_number: BlockNumber, tree_cache: serde_json::Value) {
self.storage
.chain()
.block_schema()
.store_account_tree_cache(block_number, tree_cache)
.await
.expect("Failed to store the tree cache");
}

async fn get_storage_state(&mut self) -> StorageUpdateState {
let storage_state_string = self
.storage
Expand Down
15 changes: 13 additions & 2 deletions core/bin/data_restore/src/inmemory_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ use crate::{
events::{BlockEvent, EventType},
events_state::EventsState,
rollup_ops::RollupOpsBlock,
storage_interactor::StorageInteractor,
storage_interactor::StoredTreeState,
storage_interactor::{CachedTreeState, StorageInteractor, StoredTreeState},
};

pub struct InMemoryStorageInteractor {
Expand Down Expand Up @@ -148,6 +147,18 @@ impl StorageInteractor for InMemoryStorageInteractor {
async fn get_storage_state(&mut self) -> StorageUpdateState {
self.storage_state
}

async fn get_cached_tree_state(&mut self) -> Option<CachedTreeState> {
None
}

async fn store_tree_cache(
&mut self,
_block_number: BlockNumber,
_tree_cache: serde_json::Value,
) {
// Inmemory storage doesn't support caching.
}
}

impl InMemoryStorageInteractor {
Expand Down
25 changes: 23 additions & 2 deletions core/bin/data_restore/src/storage_interactor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::convert::TryFrom;
use std::{collections::HashMap, convert::TryFrom};

use web3::types::H256;

Expand All @@ -7,7 +7,7 @@ use zksync_storage::data_restore::records::{
};
use zksync_types::{
block::Block, AccountId, AccountMap, AccountUpdate, AccountUpdates, BlockNumber, NewTokenEvent,
Token, TokenId, TokenInfo,
Token, TokenId, TokenInfo, NFT,
};

use crate::{
Expand All @@ -25,6 +25,13 @@ pub struct StoredTreeState {
pub fee_acc_id: AccountId,
}

pub struct CachedTreeState {
pub tree_cache: serde_json::Value,
pub account_map: AccountMap,
pub current_block: Block,
pub nfts: HashMap<TokenId, NFT>,
}

#[async_trait::async_trait]
pub trait StorageInteractor {
/// Saves Rollup operations blocks in storage
Expand Down Expand Up @@ -98,6 +105,20 @@ pub trait StorageInteractor {

/// Returns last recovery state update step from storage
async fn get_storage_state(&mut self) -> StorageUpdateState;

/// Returns cached tree state from storage. It's expected to be valid
/// after completing `finite` restore mode and may be used to speed up the
/// `continue` mode.
async fn get_cached_tree_state(&mut self) -> Option<CachedTreeState>;

/// Saves the tree cache in the database.
///
/// # Arguments
///
/// * `block_number` - The corresponding block number
/// * `tree_cache` - Merkle tree cache
///
async fn store_tree_cache(&mut self, block_number: BlockNumber, tree_cache: serde_json::Value);
}

/// Returns Rollup contract event from its stored representation
Expand Down
64 changes: 55 additions & 9 deletions core/bin/data_restore/src/tree_state.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use crate::rollup_ops::RollupOpsBlock;
use anyhow::format_err;
use zksync_crypto::Fr;
use std::collections::HashMap;
use zksync_crypto::{params::account_tree_depth, Fr};
use zksync_state::{
handler::TxHandler,
state::{CollectedFee, OpSuccess, TransferOutcome, ZkSyncState},
};
use zksync_types::block::{Block, ExecutedOperations, ExecutedPriorityOp, ExecutedTx};
use zksync_types::operations::ZkSyncOp;
use zksync_types::priority_ops::PriorityOp;
use zksync_types::priority_ops::ZkSyncPriorityOp;
use zksync_types::tx::{
ChangePubKey, Close, ForcedExit, Swap, Transfer, Withdraw, WithdrawNFT, ZkSyncTx,
use zksync_types::{
account::Account,
block::{Block, ExecutedOperations, ExecutedPriorityOp, ExecutedTx},
operations::ZkSyncOp,
priority_ops::{PriorityOp, ZkSyncPriorityOp},
tx::{ChangePubKey, Close, ForcedExit, Swap, Transfer, Withdraw, WithdrawNFT, ZkSyncTx},
AccountId, AccountMap, AccountTree, AccountUpdates, Address, BlockNumber, MintNFT, TokenId,
H256, NFT,
};
use zksync_types::{account::Account, MintNFT};
use zksync_types::{AccountId, AccountMap, AccountUpdates, Address, BlockNumber, H256};

/// Rollup accounts states
pub struct TreeState {
Expand Down Expand Up @@ -68,6 +69,51 @@ impl TreeState {
}
}

/// Restores the tree state from the storage cache
///
/// # Arguments
///
/// * `tree_cache` - Merkle tree cache
/// * `account_map` - Account map obtained from the latest finalized state
/// * `current_block` - Latest confirmed verified block
/// * `nfts` - Finalized NFTs
///
pub fn restore_from_cache(
tree_cache: serde_json::Value,
account_map: AccountMap,
current_block: Block,
nfts: HashMap<TokenId, NFT>,
) -> Self {
let mut account_id_by_address = HashMap::with_capacity(account_map.len());
let mut balance_tree = AccountTree::new(account_tree_depth());

balance_tree.set_internals(
serde_json::from_value(tree_cache).expect("failed to deserialize tree cache"),
);

account_map.into_iter().for_each(|(account_id, account)| {
account_id_by_address.insert(account.address, account_id);
balance_tree.items.insert(*account_id as u64, account);
});

let state = ZkSyncState::new(
balance_tree,
account_id_by_address,
current_block.block_number,
nfts,
);
let last_fee_account_address = state
.get_account(current_block.fee_account)
.expect("Failed to obtain fee account address from the cached tree")
.address;
let current_unprocessed_priority_op = current_block.processed_priority_ops.1;
Self {
state,
current_unprocessed_priority_op,
last_fee_account_address,
}
}

/// Updates Rollup accounts states from Rollup operations block
/// Returns current rollup block and updated accounts
///
Expand Down
48 changes: 48 additions & 0 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -4342,6 +4342,54 @@
"nullable": []
}
},
"ad3353e54de7447ea420b981e09af0f44894a6cad621640c7f8aac08f441ede2": {
"query": "SELECT * FROM nft",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "token_id",
"type_info": "Int4"
},
{
"ordinal": 1,
"name": "creator_account_id",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "creator_address",
"type_info": "Bytea"
},
{
"ordinal": 3,
"name": "serial_id",
"type_info": "Int4"
},
{
"ordinal": 4,
"name": "address",
"type_info": "Bytea"
},
{
"ordinal": 5,
"name": "content_hash",
"type_info": "Bytea"
}
],
"parameters": {
"Left": []
},
"nullable": [
false,
false,
false,
false,
false,
false
]
}
},
"b1c528c67d3c2ecea86e3ba1b2407cb4ee72149d66be0498be1c1162917c065d": {
"query": "INSERT INTO block_witness (block, witness)\n VALUES ($1, $2)\n ON CONFLICT (block)\n DO NOTHING",
"describe": {
Expand Down
Loading

0 comments on commit cfa0ca7

Please sign in to comment.