Skip to content

Commit

Permalink
Some debug commetns are presetns and possibly updating tree cache tha…
Browse files Browse the repository at this point in the history
…t often is not optimal, but it looks like it will work soon
  • Loading branch information
StanislavBreadless committed Jan 20, 2022
1 parent 7e8723a commit 356eb66
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 17 deletions.
68 changes: 55 additions & 13 deletions core/bin/data_restore/src/data_restore_driver.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::HashMap;

// External deps
use web3::{
contract::Contract,
Expand Down Expand Up @@ -261,6 +263,22 @@ impl<T: Transport> DataRestoreDriver<T> {
.await;
}

async fn update_tree_cache(&mut self, interactor: &mut StorageInteractor<'_>) {
vlog::info!(
"Updating the tree cache, block number: {}",
self.tree_state.block_number
);

self.tree_state.state.root_hash();
let tree_cache = self.tree_state.state.get_balance_tree().get_internals();
interactor
.update_tree_cache(
self.tree_state.block_number,
serde_json::to_value(tree_cache).expect("failed to serialize tree cache"),
)
.await;
}

/// Stops states from storage
pub async fn load_state_from_storage(
&mut self,
Expand All @@ -283,6 +301,8 @@ impl<T: Transport> DataRestoreDriver<T> {
cache.nfts,
)
} else {
vlog::info!("Building tree from scratch");

let tree_state = transaction.get_tree_state().await;
TreeState::load(
tree_state.last_block_number,
Expand All @@ -295,6 +315,7 @@ impl<T: Transport> DataRestoreDriver<T> {
StorageUpdateState::Events => {
// Update operations
let new_ops_blocks = self.update_operations_state(&mut transaction).await;

// Update tree
self.update_tree_state(&mut transaction, new_ops_blocks)
.await;
Expand Down Expand Up @@ -324,8 +345,9 @@ impl<T: Transport> DataRestoreDriver<T> {

let is_finished = self.finite_mode && (total_verified_blocks == *last_verified_block);
// Save tree cache if necessary.
if is_finished && !is_cached {
self.store_tree_cache(interactor).await;
if !is_cached {
vlog::info!("Saving tree cache for future re-uses");
self.update_tree_cache(interactor).await;
}
is_finished
}
Expand Down Expand Up @@ -390,7 +412,7 @@ impl<T: Transport> DataRestoreDriver<T> {

// We've restored all the blocks, our job is done. Store the tree cache for
// consequent usage.
self.store_tree_cache(interactor).await;
self.update_tree_cache(interactor).await;

break;
}
Expand All @@ -402,6 +424,7 @@ impl<T: Transport> DataRestoreDriver<T> {
std::thread::sleep(std::time::Duration::from_secs(5));
} else {
last_watched_block = self.events_state.last_watched_eth_block_number;
self.update_tree_cache(interactor).await;
}
}
}
Expand Down Expand Up @@ -506,6 +529,8 @@ impl<T: Transport> DataRestoreDriver<T> {
) -> Vec<RollupOpsBlock> {
let new_blocks = self.get_new_operation_blocks_from_events().await;

// dbg!(new_blocks.clone());

interactor.save_rollup_ops(&new_blocks).await;

vlog::debug!("Updated operations storage");
Expand All @@ -518,6 +543,10 @@ impl<T: Transport> DataRestoreDriver<T> {
let mut blocks = Vec::new();

let mut last_event_tx_hash = None;
// The HashMap from block_num to the RollupOpsBlock data for the tx represented by last_event_tx_hash.
// It is used as a cache to reuse the fetched data.
let mut last_tx_blocks = HashMap::new();

// TODO (ZKS-722): either due to Ethereum node lag or unknown
// bug in the events state, we have to additionally filter out
// already processed rollup blocks.
Expand All @@ -527,20 +556,33 @@ impl<T: Transport> DataRestoreDriver<T> {
.iter()
.filter(|bl| bl.block_num > self.tree_state.block_number)
{
dbg!(event.clone());

// We use an aggregated block in contracts, which means that several BlockEvent can include the same tx_hash,
// but for correct restore we need to generate RollupBlocks from this tx only once.
// These blocks go one after the other, and checking only the last transaction hash is safe
if let Some(tx) = last_event_tx_hash {
if tx == event.transaction_hash {
continue;
}
// These blocks go one after the other, and checking only the last transaction hash is safe.

// If the previous tx hash does not exist or it is not equal to the current one, we should
// re-fetch the blocks for the new tx hash.
if !last_event_tx_hash
.map(|tx| tx == event.transaction_hash)
.unwrap_or_default()
{
let blocks = RollupOpsBlock::get_rollup_ops_blocks(&self.web3, event)
.await
.expect("Cant get new operation blocks from events");

last_tx_blocks = blocks
.into_iter()
.map(|block| (block.block_num, block))
.collect();
last_event_tx_hash = Some(event.transaction_hash);
}

let block = RollupOpsBlock::get_rollup_ops_blocks(&self.web3, event)
.await
.expect("Cant get new operation blocks from events");
blocks.extend(block);
last_event_tx_hash = Some(event.transaction_hash);
let rollup_block = last_tx_blocks
.remove(&event.block_num)
.expect("Block not found");
blocks.push(rollup_block);
}

blocks
Expand Down
35 changes: 35 additions & 0 deletions core/bin/data_restore/src/database_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ impl<'a> DatabaseStorageInteractor<'a> {
});
}

dbg!(ops.iter().map(|b| b.block_num).collect::<Vec<_>>());

self.storage
.data_restore_schema()
.save_rollup_ops(ops.as_slice())
Expand Down Expand Up @@ -394,6 +396,39 @@ impl<'a> DatabaseStorageInteractor<'a> {
.expect("Failed to store the tree cache");
}

pub async fn update_tree_cache(
&mut self,
block_number: BlockNumber,
tree_cache: serde_json::Value,
) {
let mut transaction = self
.storage
.start_transaction()
.await
.expect("Failed to start transaction");

transaction
.chain()
.block_schema()
.remove_old_account_tree_cache(block_number)
.await
.expect("Failed to remove old tree cache");

// It is safe to store the new tree cache without additional checks
// since on conflict it does nothing.
transaction
.chain()
.block_schema()
.store_account_tree_cache(block_number, tree_cache)
.await
.expect("Failed to store new tree cache");

transaction
.commit()
.await
.expect("Failed to update tree cache");
}

pub async fn get_storage_state(&mut self) -> StorageUpdateState {
let storage_state_string = self
.storage
Expand Down
8 changes: 8 additions & 0 deletions core/bin/data_restore/src/inmemory_storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,14 @@ impl InMemoryStorageInteractor {
// Inmemory storage doesn't support caching.
}

pub async fn update_tree_cache(
&mut self,
_block_number: BlockNumber,
_tree_cache: serde_json::Value,
) {
// Inmemory storage doesn't support caching.
}

pub async fn get_max_priority_op_serial_id(&mut self) -> SerialId {
let number_of_priority_ops = self
.inner
Expand Down
15 changes: 15 additions & 0 deletions core/bin/data_restore/src/storage_interactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,21 @@ impl StorageInteractor<'_> {
storage_interact!(self.store_tree_cache(block_number, tree_cache))
}

/// Deletes the latest tree cache in the database and saves the new one.
///
/// # Arguments
///
/// * `block_number` - The corresponding block number
/// * `tree_cache` - Merkle tree cache
///
pub async fn update_tree_cache(
&mut self,
block_number: BlockNumber,
tree_cache: serde_json::Value,
) {
storage_interact!(self.update_tree_cache(block_number, tree_cache))
}

/// Retrieves the maximum serial id of a priority requests
pub async fn get_max_priority_op_serial_id(&mut self) -> SerialId {
storage_interact!(self.get_max_priority_op_serial_id())
Expand Down
10 changes: 6 additions & 4 deletions infrastructure/zk/src/docker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ async function _build(image: string) {
}
const { stdout: imageTag } = await utils.exec('git rev-parse --short HEAD');
const latestImage = `-t matterlabs/${image}:latest`;
const taggedImage = ['nginx', 'server', 'prover'].includes(image) ? `-t matterlabs/${image}:${imageTag}` : '';
const taggedImage = ['nginx', 'server', 'prover', 'data-restore'].includes(image)
? `-t matterlabs/${image}:${imageTag}`
: '';
await utils.spawn(`DOCKER_BUILDKIT=1 docker build ${latestImage} ${taggedImage} -f ./docker/${image}/Dockerfile .`);
}

async function _push(image: string) {
await utils.spawn(`docker push matterlabs/${image}:latest`);
if (['nginx', 'server', 'prover', 'event-listener'].includes(image)) {
// await utils.spawn(`docker push matterlabs/${image}:latest`);
if (['nginx', 'server', 'prover', 'event-listener', 'data-restore'].includes(image)) {
const { stdout: imageTag } = await utils.exec('git rev-parse --short HEAD');
await utils.spawn(`docker push matterlabs/${image}:${imageTag}`);
}
Expand All @@ -62,7 +64,7 @@ export async function build(image: string) {
}

export async function push(image: string) {
await dockerCommand('build', image);
// await dockerCommand('build', image);
await dockerCommand('push', image);
}

Expand Down

0 comments on commit 356eb66

Please sign in to comment.