From dae45ac640996d1bd3946188992384ff3f888107 Mon Sep 17 00:00:00 2001 From: Danil Lugovskoy Date: Thu, 11 Feb 2021 14:35:58 +0300 Subject: [PATCH] Fix data restore for new contracts --- core/bin/data_restore/src/contract/mod.rs | 55 +------ core/bin/data_restore/src/contract/v4.rs | 52 ++++--- .../data_restore/src/data_restore_driver.rs | 138 ++++-------------- .../src/database_storage_interactor.rs | 28 ++-- core/bin/data_restore/src/events_state.rs | 72 ++------- core/bin/data_restore/src/main.rs | 14 +- core/bin/data_restore/src/rollup_ops.rs | 12 +- core/bin/data_restore/src/tests/mod.rs | 67 +++------ core/bin/data_restore/src/tree_state.rs | 14 +- core/lib/storage/sqlx-data.json | 40 ++--- core/lib/storage/src/data_restore/mod.rs | 77 ++++++---- core/lib/types/src/block.rs | 38 +++++ .../testkit/src/bin/revert_blocks_test.rs | 1 - core/tests/testkit/src/data_restore.rs | 20 +-- core/tests/testkit/src/scenarios.rs | 1 - 15 files changed, 243 insertions(+), 386 deletions(-) diff --git a/core/bin/data_restore/src/contract/mod.rs b/core/bin/data_restore/src/contract/mod.rs index 5be3244b71..982728cf0d 100644 --- a/core/bin/data_restore/src/contract/mod.rs +++ b/core/bin/data_restore/src/contract/mod.rs @@ -1,6 +1,6 @@ use web3::api::Eth; use web3::contract::Options; -use web3::types::{Address, BlockId, BlockNumber, U256}; +use web3::types::{Address, BlockId, U256}; use web3::Transport; use zksync_contracts::{ @@ -20,8 +20,6 @@ pub struct ZkSyncDeployedContract { pub web3_contract: web3::contract::Contract, pub abi: ethabi::Contract, pub version: ZkSyncContractVersion, - pub from: BlockNumber, - pub to: BlockNumber, } impl ZkSyncDeployedContract { @@ -33,11 +31,7 @@ impl ZkSyncDeployedContract { /// * `zksync_contract` - Rollup contract /// pub async fn get_total_verified_blocks(&self) -> u32 { - use ZkSyncContractVersion::*; - let func = match self.version { - V0 | V1 | V2 | V3 => "totalBlocksVerified", - V4 => "totalBlocksExecuted", - }; + let func = "totalBlocksExecuted"; self.web3_contract .query::, Option, ()>( func, @@ -51,79 +45,44 @@ impl ZkSyncDeployedContract { .as_u32() } - pub fn version0( - eth: Eth, - address: Address, - from: BlockNumber, - to: BlockNumber, - ) -> ZkSyncDeployedContract { + pub fn version0(eth: Eth, address: Address) -> ZkSyncDeployedContract { let abi = zksync_contract_v0(); ZkSyncDeployedContract { web3_contract: web3::contract::Contract::new(eth, address, abi.clone()), abi, version: ZkSyncContractVersion::V0, - from, - to, } } - pub fn version1( - eth: Eth, - address: Address, - from: BlockNumber, - to: BlockNumber, - ) -> ZkSyncDeployedContract { + pub fn version1(eth: Eth, address: Address) -> ZkSyncDeployedContract { let abi = zksync_contract_v1(); ZkSyncDeployedContract { web3_contract: web3::contract::Contract::new(eth, address, abi.clone()), abi, version: ZkSyncContractVersion::V1, - from, - to, } } - pub fn version2( - eth: Eth, - address: Address, - from: BlockNumber, - to: BlockNumber, - ) -> ZkSyncDeployedContract { + pub fn version2(eth: Eth, address: Address) -> ZkSyncDeployedContract { let abi = zksync_contract_v2(); ZkSyncDeployedContract { web3_contract: web3::contract::Contract::new(eth, address, abi.clone()), abi, version: ZkSyncContractVersion::V2, - from, - to, } } - pub fn version3( - eth: Eth, - address: Address, - from: BlockNumber, - to: BlockNumber, - ) -> ZkSyncDeployedContract { + pub fn version3(eth: Eth, address: Address) -> ZkSyncDeployedContract { let abi = zksync_contract_v3(); ZkSyncDeployedContract { web3_contract: web3::contract::Contract::new(eth, address, abi.clone()), abi, version: ZkSyncContractVersion::V3, - from, - to, } } - pub fn version4( - eth: Eth, - address: Address, - from: BlockNumber, - to: BlockNumber, - ) -> ZkSyncDeployedContract { + pub fn version4(eth: Eth, address: Address) -> ZkSyncDeployedContract { let abi = zksync_contract(); ZkSyncDeployedContract { web3_contract: web3::contract::Contract::new(eth, address, abi.clone()), abi, version: ZkSyncContractVersion::V4, - from, - to, } } } diff --git a/core/bin/data_restore/src/contract/v4.rs b/core/bin/data_restore/src/contract/v4.rs index c09ab0528a..99387e2c91 100644 --- a/core/bin/data_restore/src/contract/v4.rs +++ b/core/bin/data_restore/src/contract/v4.rs @@ -37,44 +37,42 @@ fn decode_commitment_parameters(input_data: Vec) -> anyhow::Result) -> anyhow::Result> { let fee_account_argument_id = 5; + let op_block_number_argument_id = 4; let public_data_argument_id = 1; let decoded_commitment_parameters = decode_commitment_parameters(data)?; assert_eq!(decoded_commitment_parameters.len(), 2); - if let (ethabi::Token::Tuple(block), ethabi::Token::Array(operations)) = ( + if let (ethabi::Token::Tuple(_block), ethabi::Token::Array(operations)) = ( &decoded_commitment_parameters[0], &decoded_commitment_parameters[1], ) { let mut blocks = vec![]; - if let ethabi::Token::Uint(block_num) = block[0] { - for operation in operations { - if let ethabi::Token::Tuple(operation) = operation { - if let (ethabi::Token::Uint(fee_acc), ethabi::Token::Bytes(public_data)) = ( - &operation[fee_account_argument_id], - &operation[public_data_argument_id], - ) { - let ops = get_rollup_ops_from_data(public_data.as_slice())?; - blocks.push(RollupOpsBlock { - block_num: BlockNumber(block_num.as_u32()), - ops, - fee_account: AccountId(fee_acc.as_u32()), - }) - } else { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "can't parse operation parameters", - ) - .into()); - } + for operation in operations { + if let ethabi::Token::Tuple(operation) = operation { + if let ( + ethabi::Token::Uint(fee_acc), + ethabi::Token::Bytes(public_data), + ethabi::Token::Uint(block_number), + ) = ( + &operation[fee_account_argument_id], + &operation[public_data_argument_id], + &operation[op_block_number_argument_id], + ) { + let ops = get_rollup_ops_from_data(public_data.as_slice())?; + blocks.push(RollupOpsBlock { + block_num: BlockNumber(block_number.as_u32()), + ops, + fee_account: AccountId(fee_acc.as_u32()), + }) + } else { + return Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "can't parse operation parameters", + ) + .into()); } } - } else { - return Err(std::io::Error::new( - std::io::ErrorKind::NotFound, - "can't parse block parameters", - ) - .into()); } Ok(blocks) } else { diff --git a/core/bin/data_restore/src/data_restore_driver.rs b/core/bin/data_restore/src/data_restore_driver.rs index 0b55a3e227..777cc6fdc5 100644 --- a/core/bin/data_restore/src/data_restore_driver.rs +++ b/core/bin/data_restore/src/data_restore_driver.rs @@ -1,7 +1,7 @@ // External deps use web3::{ contract::Contract, - types::{BlockNumber as Web3BlockNumber, FilterBuilder, Log, H160, H256, U256}, + types::{BlockNumber as Web3BlockNumber, FilterBuilder, Log, H160, H256}, Transport, Web3, }; // Workspace deps @@ -10,14 +10,16 @@ use zksync_crypto::Fr; use zksync_types::{AccountId, AccountMap, AccountUpdate, BlockNumber}; // Local deps -use crate::contract::{ZkSyncContractVersion, ZkSyncDeployedContract}; -use crate::storage_interactor::StorageInteractor; use crate::{ - contract::get_genesis_account, eth_tx_helpers::get_ethereum_transaction, - events_state::EventsState, rollup_ops::RollupOpsBlock, tree_state::TreeState, + contract::{get_genesis_account, ZkSyncDeployedContract}, + eth_tx_helpers::get_ethereum_transaction, + events_state::EventsState, + rollup_ops::RollupOpsBlock, + storage_interactor::StorageInteractor, + tree_state::TreeState, }; use ethabi::Address; -use std::convert::TryFrom; + use std::marker::PhantomData; /// Storage state update: @@ -51,7 +53,7 @@ pub struct DataRestoreDriver { /// Provides Ethereum Governance contract unterface pub governance_contract: (ethabi::Contract, Contract), /// Provides Ethereum Rollup contract unterface - pub zksync_contracts: Vec>, + pub zksync_contract: ZkSyncDeployedContract, /// Rollup contract events state pub events_state: EventsState, /// Rollup accounts state @@ -60,8 +62,6 @@ pub struct DataRestoreDriver { pub eth_blocks_step: u64, /// The distance to the last ethereum block pub end_eth_blocks_offset: u64, - /// Available block chunk sizes - pub available_block_chunk_sizes: Vec, /// Finite mode flag. In finite mode, driver will only work until /// amount of restored blocks will become equal to amount of known /// verified blocks. After that, it will stop. @@ -89,16 +89,14 @@ where /// #[allow(clippy::too_many_arguments)] pub fn new( - web3_transport: T, + web3: Web3, governance_contract_eth_addr: H160, eth_blocks_step: u64, end_eth_blocks_offset: u64, - available_block_chunk_sizes: Vec, finite_mode: bool, final_hash: Option, + zksync_contract: ZkSyncDeployedContract, ) -> Self { - let web3 = Web3::new(web3_transport); - let governance_contract = { let abi = governance_contract(); ( @@ -109,16 +107,15 @@ where let events_state = EventsState::default(); - let tree_state = TreeState::new(available_block_chunk_sizes.clone()); + let tree_state = TreeState::new(); Self { web3, governance_contract, - zksync_contracts: vec![], + zksync_contract, events_state, tree_state, eth_blocks_step, end_eth_blocks_offset, - available_block_chunk_sizes, finite_mode, final_hash, phantom_data: Default::default(), @@ -150,82 +147,6 @@ where .map_err(|e| anyhow::format_err!("No new logs: {}", e))?; Ok(result) } - pub async fn init_contracts( - &mut self, - upgrade_gatekeeper_contract_addr: Address, - zksync_contract_addr: Address, - ) -> anyhow::Result<()> { - let logs = self - .get_gatekeeper_logs(upgrade_gatekeeper_contract_addr) - .await?; - - let mut last_updated_block = Web3BlockNumber::Earliest; - let mut contract_version = vec![]; - let mut previous_version: Option = None; - // Find starts and ends for contracts - for log in logs { - let block_number = log.block_number.expect("Block number should exist"); - let version = U256::from(log.topics[1].as_bytes()).as_u32(); - let version = ZkSyncContractVersion::try_from(version)?; - let current_block = Web3BlockNumber::Number(block_number); - if let Some(previous_version) = previous_version { - contract_version.push((last_updated_block, current_block, previous_version)); - } - previous_version = Some(version); - last_updated_block = current_block; - } - contract_version.push(( - last_updated_block, - Web3BlockNumber::Latest, - previous_version.expect("At least one contract should exist"), - )); - - for (from, to, version) in contract_version { - match version { - ZkSyncContractVersion::V0 => { - self.zksync_contracts.push(ZkSyncDeployedContract::version0( - self.web3.eth(), - zksync_contract_addr, - from, - to, - )) - } - ZkSyncContractVersion::V1 => { - self.zksync_contracts.push(ZkSyncDeployedContract::version1( - self.web3.eth(), - zksync_contract_addr, - from, - to, - )) - } - ZkSyncContractVersion::V2 => { - self.zksync_contracts.push(ZkSyncDeployedContract::version2( - self.web3.eth(), - zksync_contract_addr, - from, - to, - )) - } - ZkSyncContractVersion::V3 => { - self.zksync_contracts.push(ZkSyncDeployedContract::version3( - self.web3.eth(), - zksync_contract_addr, - from, - to, - )) - } - ZkSyncContractVersion::V4 => { - self.zksync_contracts.push(ZkSyncDeployedContract::version4( - self.web3.eth(), - zksync_contract_addr, - from, - to, - )) - } - } - } - Ok(()) - } /// Sets the 'genesis' state. /// Tree with inserted genesis account will be created. @@ -276,7 +197,6 @@ where account_map, current_unprocessed_priority_op, AccountId(fee_acc_num), - self.available_block_chunk_sizes.clone(), ); vlog::info!("Genesis tree root hash: {:?}", tree_state.root_hash()); @@ -289,11 +209,6 @@ where self.tree_state = tree_state; } - fn actual_zksync_contract(&self) -> &ZkSyncDeployedContract { - self.zksync_contracts - .last() - .expect("At least one should exist") - } /// Stops states from storage pub async fn load_state_from_storage(&mut self, interactor: &mut I) -> bool { vlog::info!("Loading state from storage"); @@ -305,7 +220,6 @@ where tree_state.account_map, // account map tree_state.unprocessed_prior_ops, // unprocessed priority op tree_state.fee_acc_id, // fee account - self.available_block_chunk_sizes.clone(), ); match state { StorageUpdateState::Events => { @@ -322,10 +236,7 @@ where } StorageUpdateState::None => {} } - let total_verified_blocks = self - .actual_zksync_contract() - .get_total_verified_blocks() - .await; + let total_verified_blocks = self.zksync_contract.get_total_verified_blocks().await; let last_verified_block = self.tree_state.state.block_number; vlog::info!( @@ -353,10 +264,8 @@ where // Update tree self.update_tree_state(interactor, new_ops_blocks).await; - let total_verified_blocks = self - .actual_zksync_contract() - .get_total_verified_blocks() - .await; + let total_verified_blocks = + self.zksync_contract.get_total_verified_blocks().await; let last_verified_block = self.tree_state.state.block_number; @@ -378,7 +287,6 @@ where if let Some(root_hash) = self.final_hash { if root_hash == self.tree_state.root_hash() { final_hash_was_found = true; - vlog::info!( "Correct expected root hash was met on the block {} out of {}", *last_verified_block, @@ -415,7 +323,7 @@ where .events_state .update_events_state( &self.web3, - &self.zksync_contracts, + &self.zksync_contract, &self.governance_contract, self.eth_blocks_step, self.end_eth_blocks_offset, @@ -477,15 +385,27 @@ where pub async fn get_new_operation_blocks_from_events(&mut self) -> Vec { let mut blocks = Vec::new(); + let mut last_event_tx = None; for event in self .events_state .get_only_verified_committed_events() .iter() { + if let Some(tx) = last_event_tx { + if tx == event.transaction_hash { + continue; + } + } + + vlog::info!("New event iteration"); let block = RollupOpsBlock::get_rollup_ops_blocks(&self.web3, &event) .await .expect("Cant get new operation blocks from events"); + for b in block.iter() { + vlog::info!("new RollupBlocks {:?}", b.block_num); + } blocks.extend(block); + last_event_tx = Some(event.transaction_hash); } blocks diff --git a/core/bin/data_restore/src/database_storage_interactor.rs b/core/bin/data_restore/src/database_storage_interactor.rs index aa15bafb82..2a7e89de92 100644 --- a/core/bin/data_restore/src/database_storage_interactor.rs +++ b/core/bin/data_restore/src/database_storage_interactor.rs @@ -3,7 +3,7 @@ use std::str::FromStr; // Workspace deps use zksync_storage::{data_restore::records::NewBlockEvent, StorageProcessor}; use zksync_types::{ - AccountId, Action, BlockNumber, Operation, Token, TokenGenesisListItem, TokenId, + AccountId, BlockNumber, Token, TokenGenesisListItem, TokenId, {block::Block, AccountUpdate, AccountUpdates, ZkSyncOp}, }; @@ -19,6 +19,7 @@ use crate::{ stored_ops_block_into_ops_block, StorageInteractor, }, }; +use zksync_types::aggregated_operations::{BlocksCommitOperation, BlocksExecuteOperation}; impl From<&NewTokenEvent> for zksync_storage::data_restore::records::NewTokenEvent { fn from(event: &NewTokenEvent) -> Self { @@ -63,6 +64,9 @@ impl StorageInteractor for DatabaseStorageInteractor<'_> { let mut ops: Vec<(BlockNumber, &ZkSyncOp, AccountId)> = vec![]; for block in blocks { + if block.block_num.0 == 5702 { + vlog::info!("Block number 5702"); + } for op in &block.ops { ops.push((block.block_num, op, block.fee_account)); } @@ -82,18 +86,13 @@ impl StorageInteractor for DatabaseStorageInteractor<'_> { .await .expect("Failed initializing a DB transaction"); - let commit_op = Operation { - action: Action::Commit, - block: block.clone(), - id: None, + let commit_aggregated_operation = BlocksCommitOperation { + last_committed_block: block.clone(), + blocks: vec![block.clone()], }; - let verify_op = Operation { - action: Action::Verify { - proof: Box::new(Default::default()), - }, - block: block.clone(), - id: None, + let execute_aggregated_operation = BlocksExecuteOperation { + blocks: vec![block.clone()], }; transaction @@ -105,10 +104,15 @@ impl StorageInteractor for DatabaseStorageInteractor<'_> { transaction .data_restore_schema() - .save_block_operations(commit_op, verify_op) + .save_block_operations(commit_aggregated_operation, execute_aggregated_operation) .await .expect("Cant execute verify operation"); + let block_number = block.block_number; + if let Err(_) = transaction.chain().block_schema().save_block(block).await { + vlog::info!("Block {} was reverted", block_number) + } + transaction .commit() .await diff --git a/core/bin/data_restore/src/events_state.rs b/core/bin/data_restore/src/events_state.rs index 2378472b22..8a453c8ec9 100644 --- a/core/bin/data_restore/src/events_state.rs +++ b/core/bin/data_restore/src/events_state.rs @@ -87,7 +87,7 @@ impl EventsState { pub async fn update_events_state( &mut self, web3: &Web3, - zksync_contracts: &[ZkSyncDeployedContract], + zksync_contract: &ZkSyncDeployedContract, governance_contract: &(ethabi::Contract, Contract), eth_blocks_step: u64, end_eth_blocks_offset: u64, @@ -97,7 +97,7 @@ impl EventsState { let (events, token_events, to_block_number) = EventsState::get_new_events_and_last_watched_block( web3, - zksync_contracts, + zksync_contract, governance_contract, self.last_watched_eth_block_number, eth_blocks_step, @@ -144,7 +144,7 @@ impl EventsState { #[allow(clippy::needless_lifetimes)] // Cargo clippy gives a false positive warning on needless_lifetimes there, so can be allowed. async fn get_new_events_and_last_watched_block<'a, T: Transport>( web3: &Web3, - zksync_contracts: &'a [ZkSyncDeployedContract], + zksync_contract: &'a ZkSyncDeployedContract, governance_contract: &(ethabi::Contract, Contract), last_watched_block_number: u64, eth_blocks_step: u64, @@ -161,7 +161,7 @@ impl EventsState { return Ok((vec![], vec![], last_watched_block_number)); // No new eth blocks } - let mut from_block_number_u64 = last_watched_block_number + 1; + let from_block_number_u64 = last_watched_block_number + 1; let to_block_number_u64 = // if (latest eth block < last watched + delta) then choose it @@ -179,50 +179,14 @@ impl EventsState { ) .await?; let mut logs = vec![]; - for zksync_contract in zksync_contracts { - if from_block_number_u64 > to_block_number_u64 { - // Stop if all necessary blocks are loaded - break; - } - let from_block_number = match zksync_contract.from { - Web3BlockNumber::Latest => panic!("Impossible in from block"), - Web3BlockNumber::Earliest => Web3BlockNumber::Number(from_block_number_u64.into()), - Web3BlockNumber::Pending => unreachable!(), - Web3BlockNumber::Number(n) => { - if from_block_number_u64 > n.as_u64() { - continue; - } - Web3BlockNumber::Number(from_block_number_u64.into()) - } - }; - - let to_block_number = match zksync_contract.to { - Web3BlockNumber::Latest => { - from_block_number_u64 = to_block_number_u64; - Web3BlockNumber::Number(to_block_number_u64.into()) - } - Web3BlockNumber::Earliest => panic!("Impossible in to block"), - Web3BlockNumber::Pending => unreachable!(), - Web3BlockNumber::Number(n) => { - if to_block_number_u64 < n.as_u64() { - from_block_number_u64 = n.as_u64(); - Web3BlockNumber::Number(to_block_number_u64.into()) - } else { - from_block_number_u64 = n.as_u64(); - Web3BlockNumber::Number(n) - } - } - }; - - let block_logs = EventsState::get_block_logs( - web3, - zksync_contract, - from_block_number, - to_block_number, - ) - .await?; - logs.push((zksync_contract, block_logs)); - } + let block_logs = EventsState::get_block_logs( + web3, + zksync_contract, + Web3BlockNumber::Number(from_block_number_u64.into()), + Web3BlockNumber::Number(to_block_number_u64.into()), + ) + .await?; + logs.push((zksync_contract, block_logs)); Ok((logs, token_logs, to_block_number_u64)) } @@ -358,7 +322,7 @@ impl EventsState { const U256_SIZE: usize = 32; // Fields in `BlocksRevert` are not `indexed`, thus they're located in `data`. assert_eq!(log.data.0.len(), U256_SIZE * 2); - let total_verified = zksync_types::BlockNumber( + let total_executed = zksync_types::BlockNumber( U256::from_big_endian(&log.data.0[..U256_SIZE]).as_u32(), ); let total_committed = zksync_types::BlockNumber( @@ -368,7 +332,7 @@ impl EventsState { self.committed_events .retain(|bl| bl.block_num <= total_committed); self.verified_events - .retain(|bl| bl.block_num <= total_verified); + .retain(|bl| bl.block_num <= total_executed); continue; } @@ -420,18 +384,12 @@ mod test { use crate::contract::ZkSyncDeployedContract; use crate::tests::utils::{create_log, u32_to_32bytes, FakeTransport}; - use web3::types::BlockNumber; #[test] fn event_state() { let mut events_state = EventsState::default(); - let contract = ZkSyncDeployedContract::version4( - Eth::new(FakeTransport), - [1u8; 20].into(), - BlockNumber::Earliest, - BlockNumber::Latest, - ); + let contract = ZkSyncDeployedContract::version4(Eth::new(FakeTransport), [1u8; 20].into()); let block_verified_topic = contract .abi diff --git a/core/bin/data_restore/src/main.rs b/core/bin/data_restore/src/main.rs index bd56b07d6b..3e593ff01e 100644 --- a/core/bin/data_restore/src/main.rs +++ b/core/bin/data_restore/src/main.rs @@ -6,6 +6,8 @@ use zksync_crypto::convert::FeConvert; use zksync_storage::ConnectionPool; use zksync_types::{Address, H256}; +use web3::Web3; +use zksync_data_restore::contract::ZkSyncDeployedContract; use zksync_data_restore::{ add_tokens_to_storage, data_restore_driver::DataRestoreDriver, database_storage_interactor::DatabaseStorageInteractor, END_ETH_BLOCKS_OFFSET, ETH_BLOCKS_STEP, @@ -102,22 +104,18 @@ async fn main() { None }; let storage = connection_pool.access_storage().await.unwrap(); - + let web3 = Web3::new(transport); + let contract = ZkSyncDeployedContract::version4(web3.eth().clone(), config.contract_addr); let mut driver = DataRestoreDriver::new( - transport, + web3, config.governance_addr, ETH_BLOCKS_STEP, END_ETH_BLOCKS_OFFSET, - config.available_block_chunk_sizes.clone(), finite_mode, final_hash, + contract, ); - driver - .init_contracts(config.upgrade_gatekeeper_addr, config.contract_addr) - .await - .expect("Wrong driver initialization"); - let mut interactor = DatabaseStorageInteractor::new(storage); // If genesis is argument is present - there will be fetching contracts creation transactions to get first eth block and genesis acc address if opt.genesis { diff --git a/core/bin/data_restore/src/rollup_ops.rs b/core/bin/data_restore/src/rollup_ops.rs index ccbf3fea9f..e006989a15 100644 --- a/core/bin/data_restore/src/rollup_ops.rs +++ b/core/bin/data_restore/src/rollup_ops.rs @@ -2,6 +2,7 @@ use web3::{Transport, Web3}; use zksync_types::operations::ZkSyncOp; +use crate::contract; use crate::eth_tx_helpers::{get_ethereum_transaction, get_input_data_from_ethereum_transaction}; use crate::events::BlockEvent; use zksync_types::{AccountId, BlockNumber}; @@ -32,8 +33,13 @@ impl RollupOpsBlock { ) -> anyhow::Result> { let transaction = get_ethereum_transaction(web3, &event_data.transaction_hash).await?; let input_data = get_input_data_from_ethereum_transaction(&transaction)?; - event_data - .contract_version - .rollup_ops_blocks_from_bytes(input_data) + let blocks = if let Ok(block) = + contract::default::rollup_ops_blocks_from_bytes(input_data.clone()) + { + vec![block] + } else { + contract::v4::rollup_ops_blocks_from_bytes(input_data)? + }; + Ok(blocks) } } diff --git a/core/bin/data_restore/src/tests/mod.rs b/core/bin/data_restore/src/tests/mod.rs index 0f3d339ea4..94c0d40dcf 100644 --- a/core/bin/data_restore/src/tests/mod.rs +++ b/core/bin/data_restore/src/tests/mod.rs @@ -8,8 +8,11 @@ use futures::future; use jsonrpc_core::Params; use num::BigUint; use serde_json::{json, Value}; -use web3::types::{BlockNumber as Web3BlockNumber, Bytes}; -use web3::{contract::tokens::Tokenize, types::Transaction, RequestId, Transport}; +use web3::{ + contract::tokens::Tokenize, + types::{Bytes, Transaction}, + RequestId, Transport, Web3, +}; use db_test_macro::test as db_test; use zksync_contracts::{governance_contract, zksync_contract}; @@ -345,24 +348,17 @@ async fn test_run_state_update(mut storage: StorageProcessor<'_>) { ), ]); + let eth = Eth::new(transport.clone()); let mut driver = DataRestoreDriver::new( - transport.clone(), + Web3::new(transport.clone()), [1u8; 20].into(), ETH_BLOCKS_STEP, END_ETH_BLOCKS_OFFSET, - vec![6, 30], true, None, + ZkSyncDeployedContract::version3(eth, [1u8; 20].into()), ); - let eth = Eth::new(transport.clone()); - driver - .zksync_contracts - .push(ZkSyncDeployedContract::version3( - eth, - [1u8; 20].into(), - Web3BlockNumber::Earliest, - Web3BlockNumber::Latest, - )); + driver.run_state_update(&mut interactor).await; // Check that it's stores some account, created by deposit @@ -384,24 +380,17 @@ async fn test_run_state_update(mut storage: StorageProcessor<'_>) { assert_eq!(driver.events_state.committed_events.len(), events.len()); // Nullify the state of driver + let eth = Eth::new(transport.clone()); + let mut driver = DataRestoreDriver::new( - transport.clone(), + Web3::new(transport.clone()), [1u8; 20].into(), ETH_BLOCKS_STEP, END_ETH_BLOCKS_OFFSET, - vec![6, 30], true, None, + ZkSyncDeployedContract::version3(eth, [1u8; 20].into()), ); - let eth = Eth::new(transport.clone()); - driver - .zksync_contracts - .push(ZkSyncDeployedContract::version3( - eth, - [1u8; 20].into(), - Web3BlockNumber::Earliest, - Web3BlockNumber::Latest, - )); // Load state from db and check it assert!(driver.load_state_from_storage(&mut interactor).await); @@ -510,27 +499,19 @@ async fn test_with_inmemory_storage() { ), ), ]); + let web3 = Web3::new(transport.clone()); + let eth = Eth::new(transport.clone()); let mut driver = DataRestoreDriver::new( - transport.clone(), + web3.clone(), [1u8; 20].into(), ETH_BLOCKS_STEP, END_ETH_BLOCKS_OFFSET, - vec![6, 30], true, None, + ZkSyncDeployedContract::version3(eth, [1u8; 20].into()), ); - let eth = Eth::new(transport.clone()); - driver - .zksync_contracts - .push(ZkSyncDeployedContract::version3( - eth, - [1u8; 20].into(), - Web3BlockNumber::Earliest, - Web3BlockNumber::Latest, - )); - driver.run_state_update(&mut interactor).await; // Check that it's stores some account, created by deposit @@ -546,24 +527,16 @@ async fn test_with_inmemory_storage() { assert_eq!(driver.events_state.committed_events.len(), events.len()); // Nullify the state of driver + let eth = Eth::new(transport.clone()); let mut driver = DataRestoreDriver::new( - transport.clone(), + web3.clone(), [1u8; 20].into(), ETH_BLOCKS_STEP, END_ETH_BLOCKS_OFFSET, - vec![6, 30], true, None, + ZkSyncDeployedContract::version3(eth, [1u8; 20].into()), ); - let eth = Eth::new(transport.clone()); - driver - .zksync_contracts - .push(ZkSyncDeployedContract::version3( - eth, - [1u8; 20].into(), - Web3BlockNumber::Earliest, - Web3BlockNumber::Latest, - )); // Load state from db and check it assert!(driver.load_state_from_storage(&mut interactor).await); diff --git a/core/bin/data_restore/src/tree_state.rs b/core/bin/data_restore/src/tree_state.rs index b052e084ce..1fcf64f8d0 100644 --- a/core/bin/data_restore/src/tree_state.rs +++ b/core/bin/data_restore/src/tree_state.rs @@ -21,18 +21,15 @@ pub struct TreeState { pub current_unprocessed_priority_op: u64, /// The last fee account address pub last_fee_account_address: Address, - /// Available block chunk sizes - pub available_block_chunk_sizes: Vec, } impl TreeState { /// Returns empty self state - pub fn new(available_block_chunk_sizes: Vec) -> Self { + pub fn new() -> Self { Self { state: ZkSyncState::empty(), current_unprocessed_priority_op: 0, last_fee_account_address: Address::default(), - available_block_chunk_sizes, } } @@ -50,7 +47,6 @@ impl TreeState { accounts: AccountMap, current_unprocessed_priority_op: u64, fee_account: AccountId, - available_block_chunk_sizes: Vec, ) -> Self { let state = ZkSyncState::from_acc_map(accounts, current_block); let last_fee_account_address = state @@ -61,7 +57,6 @@ impl TreeState { state, current_unprocessed_priority_op, last_fee_account_address, - available_block_chunk_sizes, } } @@ -298,7 +293,7 @@ impl TreeState { // As we restoring an already executed block, this value isn't important. let gas_limit = 0.into(); - let block = Block::new_from_available_block_sizes( + let block = Block::new_with_current_chunk_size( ops_block.block_num, self.state.root_hash(), ops_block.fee_account, @@ -307,7 +302,6 @@ impl TreeState { last_unprocessed_prior_op, self.current_unprocessed_priority_op, ), - &self.available_block_chunk_sizes, gas_limit, gas_limit, H256::default(), @@ -619,7 +613,7 @@ mod test { // fee_account: AccountId(0), // }; // - let mut tree = TreeState::new(vec![50]); + let mut tree = TreeState::new(); tree.update_tree_states_from_ops_block(&block1) .expect("Cant update state from block 1"); let zero_acc = tree.get_account(AccountId(0)).expect("Cant get 0 account"); @@ -800,7 +794,7 @@ mod test { fee_account: AccountId(0), }; - let mut tree = TreeState::new(vec![50]); + let mut tree = TreeState::new(); tree.update_tree_states_from_ops_block(&block) .expect("Cant update state from block"); diff --git a/core/lib/storage/sqlx-data.json b/core/lib/storage/sqlx-data.json index 47565b688d..eb3faea945 100644 --- a/core/lib/storage/sqlx-data.json +++ b/core/lib/storage/sqlx-data.json @@ -1845,26 +1845,6 @@ ] } }, - "71a9539df6b4362ab57a5397be0da6fac8ef23554dce5281e22704964c6f2d29": { - "query": "SELECT COUNT(*) FROM prover_job_queue WHERE job_status = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "count", - "type_info": "Int8" - } - ], - "parameters": { - "Left": [ - "Int4" - ] - }, - "nullable": [ - null - ] - } - }, "74a5cc4affa23433b5b7834df6dfa1a7a2c5a65f23289de3de5a4f1b93f89c06": { "query": "SELECT address FROM account_creates WHERE account_id = $1", "describe": { @@ -2311,6 +2291,26 @@ "nullable": [] } }, + "92663f125319988e4b5d80d3d58286ca90a29ec2fa97d87750942c9e0615d1bc": { + "query": "SELECT COUNT(*) FROM prover_job_queue WHERE job_status != $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Int4" + ] + }, + "nullable": [ + null + ] + } + }, "93bd5b76565dfbadecfd66a394127fd5b701d09dc3d61adb30b337fd12d86f6a": { "query": "\n UPDATE aggregate_operations\n SET confirmed = $1\n WHERE id = (SELECT op_id FROM eth_aggregated_ops_binding WHERE eth_op_id = $2)", "describe": { diff --git a/core/lib/storage/src/data_restore/mod.rs b/core/lib/storage/src/data_restore/mod.rs index 0610fe0e84..af4848c020 100644 --- a/core/lib/storage/src/data_restore/mod.rs +++ b/core/lib/storage/src/data_restore/mod.rs @@ -3,14 +3,19 @@ use std::time::Instant; // External imports use itertools::Itertools; // Workspace imports -use zksync_types::{AccountId, AccountUpdate, BlockNumber, Operation, Token, ZkSyncOp}; +use zksync_types::{AccountId, AccountUpdate, BlockNumber, Token, ZkSyncOp}; // Local imports use self::records::{ NewBlockEvent, NewStorageState, NewTokenEvent, NewZkSyncOp, StoredBlockEvent, StoredLastWatchedEthBlockNumber, StoredRollupOpsBlock, StoredStorageState, StoredZkSyncOp, }; + +use crate::chain::operations::OperationsSchema; use crate::{chain::state::StateSchema, tokens::TokensSchema}; use crate::{QueryResult, StorageProcessor}; +use zksync_types::aggregated_operations::{ + AggregatedActionType, AggregatedOperation, BlocksCommitOperation, BlocksExecuteOperation, +}; pub mod records; @@ -24,36 +29,50 @@ pub struct DataRestoreSchema<'a, 'c>(pub &'a mut StorageProcessor<'c>); impl<'a, 'c> DataRestoreSchema<'a, 'c> { pub async fn save_block_operations( &mut self, - _commit_op: Operation, - _verify_op: Operation, + commit_op: BlocksCommitOperation, + execute_op: BlocksExecuteOperation, ) -> QueryResult<()> { let start = Instant::now(); - let _new_state = self.new_storage_state("None"); - let transaction = self.0.start_transaction().await?; - - // TODO: Restore code (ZKS-427) - - // let commit_op = BlockSchema(&mut transaction) - // .store_operation(commit_op) - // .await?; - // let verify_op = BlockSchema(&mut transaction) - // .store_operation(verify_op) - // .await?; - // // The state is expected to be updated, so it's necessary - // // to do it here. - // StateSchema(&mut transaction) - // .apply_state_update(commit_op.block_number as u32) - // .await?; - // OperationsSchema(&mut transaction) - // .confirm_operation(commit_op.block_number as u32, ActionType::COMMIT) - // .await?; - // OperationsSchema(&mut transaction) - // .confirm_operation(verify_op.block_number as u32, ActionType::VERIFY) - // .await?; - - // DataRestoreSchema(&mut transaction) - // .update_storage_state(new_state) - // .await?; + let new_state = self.new_storage_state("None"); + let mut transaction = self.0.start_transaction().await?; + + OperationsSchema(&mut transaction) + .store_aggregated_action(AggregatedOperation::CommitBlocks(commit_op.clone())) + .await?; + OperationsSchema(&mut transaction) + .store_aggregated_action(AggregatedOperation::ExecuteBlocks(execute_op.clone())) + .await?; + // The state is expected to be updated, so it's necessary + // to do it here. + for block in commit_op.blocks.iter() { + StateSchema(&mut transaction) + .apply_state_update(block.block_number) + .await?; + } + + if commit_op.blocks.len() != 0 { + OperationsSchema(&mut transaction) + .confirm_aggregated_operations( + commit_op.blocks.first().unwrap().block_number, + commit_op.blocks.last().unwrap().block_number, + AggregatedActionType::CommitBlocks, + ) + .await?; + } + + if execute_op.blocks.len() != 0 { + OperationsSchema(&mut transaction) + .confirm_aggregated_operations( + execute_op.blocks.first().unwrap().block_number, + execute_op.blocks.last().unwrap().block_number, + AggregatedActionType::ExecuteBlocks, + ) + .await?; + } + + DataRestoreSchema(&mut transaction) + .update_storage_state(new_state) + .await?; transaction.commit().await?; metrics::histogram!("sql.data_restore.save_block_operations", start.elapsed()); Ok(()) diff --git a/core/lib/types/src/block.rs b/core/lib/types/src/block.rs index 011a7bec5a..65f7dd77a4 100644 --- a/core/lib/types/src/block.rs +++ b/core/lib/types/src/block.rs @@ -166,6 +166,44 @@ impl Block { timestamp, } } + /// Creates a new block, choosing block chunk size + /// + #[allow(clippy::too_many_arguments)] + pub fn new_with_current_chunk_size( + block_number: BlockNumber, + new_root_hash: Fr, + fee_account: AccountId, + block_transactions: Vec, + processed_priority_ops: (u64, u64), + commit_gas_limit: U256, + verify_gas_limit: U256, + previous_block_root_hash: H256, + timestamp: u64, + ) -> Self { + let mut block = Self { + block_number, + new_root_hash, + fee_account, + block_transactions, + processed_priority_ops, + block_chunks_size: 0, + commit_gas_limit, + verify_gas_limit, + block_commitment: H256::default(), + timestamp, + }; + block.block_chunks_size = block.chunks_used(); + block.block_commitment = Block::get_commitment( + block_number, + fee_account, + previous_block_root_hash, + block.get_eth_encoded_root(), + block.timestamp, + &block.get_onchain_op_commitment(), + &block.get_eth_public_data(), + ); + block + } /// Creates a new block, choosing the smallest supported block size which will fit /// all the executed transactions. diff --git a/core/tests/testkit/src/bin/revert_blocks_test.rs b/core/tests/testkit/src/bin/revert_blocks_test.rs index eededc87ae..9848262490 100644 --- a/core/tests/testkit/src/bin/revert_blocks_test.rs +++ b/core/tests/testkit/src/bin/revert_blocks_test.rs @@ -231,7 +231,6 @@ async fn revert_blocks_test() { verify_restore( test_config.web3_url.as_str(), - test_config.available_block_chunk_sizes.clone(), &contracts, fee_account.address, balance_tree_to_account_map(&state.tree), diff --git a/core/tests/testkit/src/data_restore.rs b/core/tests/testkit/src/data_restore.rs index 0e87ed1ae0..ed75b159f3 100644 --- a/core/tests/testkit/src/data_restore.rs +++ b/core/tests/testkit/src/data_restore.rs @@ -1,4 +1,4 @@ -use web3::{transports::Http, types::Address}; +use web3::{transports::Http, types::Address, Web3}; use zksync_crypto::Fr; use zksync_data_restore::{ @@ -8,39 +8,31 @@ use zksync_data_restore::{ use zksync_types::{AccountId, AccountMap, TokenId}; use crate::external_commands::Contracts; -use web3::types::BlockNumber; + use zksync_data_restore::contract::ZkSyncDeployedContract; pub async fn verify_restore( web3_url: &str, - available_block_chunk_sizes: Vec, contracts: &Contracts, fee_account_address: Address, acc_state_from_test_setup: AccountMap, tokens: Vec, root_hash: Fr, ) { - let transport = Http::new(web3_url).expect("http transport start"); + let web3 = Web3::new(Http::new(web3_url).expect("http transport start")); let mut interactor = InMemoryStorageInteractor::new(); + let contract = ZkSyncDeployedContract::version4(web3.eth(), contracts.contract); let mut driver = DataRestoreDriver::new( - transport, + web3, contracts.governance, ETH_BLOCKS_STEP, 0, - available_block_chunk_sizes, true, Default::default(), + contract, ); - driver - .zksync_contracts - .push(ZkSyncDeployedContract::version4( - driver.web3.eth(), - contracts.contract, - BlockNumber::Earliest, - BlockNumber::Latest, - )); interactor.insert_new_account(AccountId(0), &fee_account_address); driver.load_state_from_storage(&mut interactor).await; driver.run_state_update(&mut interactor).await; diff --git a/core/tests/testkit/src/scenarios.rs b/core/tests/testkit/src/scenarios.rs index 4309dbf4fa..fc8e4a9760 100644 --- a/core/tests/testkit/src/scenarios.rs +++ b/core/tests/testkit/src/scenarios.rs @@ -111,7 +111,6 @@ pub async fn perform_basic_tests() { verify_restore( &testkit_config.web3_url, - testkit_config.available_block_chunk_sizes.clone(), &contracts, fee_account_address, test_setup.get_accounts_state().await,