Skip to content

Commit

Permalink
refactor(state-keeper): Propagate I/O errors in state keeper (matter-…
Browse files Browse the repository at this point in the history
…labs#1080)

## What ❔

- Propagates I/O errors in the state keeper using `anyhow::Result`,
adding context to calls where appropriate.
- Propagates DB errors in `FactoryDepsDal`.

## Why ❔

Propagating errors makes it easier to attach additional context to them;
thus, it improves DevEx and code maintainability.

## Checklist

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [x] Tests for the changes have been added / updated.
- [x] Documentation comments have been added / updated.
- [x] Code has been formatted via `zk fmt` and `zk lint`.
- [x] Spellcheck has been run via `zk spellcheck`.
- [x] Linkcheck has been run via `zk linkcheck`.
  • Loading branch information
slowli authored Feb 22, 2024
1 parent 3aa12e8 commit ecb4118
Show file tree
Hide file tree
Showing 13 changed files with 312 additions and 266 deletions.
33 changes: 19 additions & 14 deletions core/lib/dal/src/factory_deps_dal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::{HashMap, HashSet};

use anyhow::Context as _;
use zksync_contracts::{BaseSystemContracts, SystemContractCode};
use zksync_types::{MiniblockNumber, H256, U256};
use zksync_utils::{bytes_to_be_words, bytes_to_chunks};
Expand All @@ -22,7 +23,7 @@ impl FactoryDepsDal<'_, '_> {
) -> sqlx::Result<()> {
let (bytecode_hashes, bytecodes): (Vec<_>, Vec<_>) = factory_deps
.iter()
.map(|dep| (dep.0.as_bytes(), dep.1.as_slice()))
.map(|(hash, bytecode)| (hash.as_bytes(), bytecode.as_slice()))
.unzip();

// Copy from stdin can't be used here because of `ON CONFLICT`.
Expand Down Expand Up @@ -51,8 +52,8 @@ impl FactoryDepsDal<'_, '_> {
}

/// Returns bytecode for a factory dependency with the specified bytecode `hash`.
pub async fn get_factory_dep(&mut self, hash: H256) -> Option<Vec<u8>> {
sqlx::query!(
pub async fn get_factory_dep(&mut self, hash: H256) -> sqlx::Result<Option<Vec<u8>>> {
Ok(sqlx::query!(
r#"
SELECT
bytecode
Expand All @@ -64,20 +65,20 @@ impl FactoryDepsDal<'_, '_> {
hash.as_bytes(),
)
.fetch_optional(self.storage.conn())
.await
.unwrap()
.map(|row| row.bytecode)
.await?
.map(|row| row.bytecode))
}

pub async fn get_base_system_contracts(
&mut self,
bootloader_hash: H256,
default_aa_hash: H256,
) -> BaseSystemContracts {
) -> anyhow::Result<BaseSystemContracts> {
let bootloader_bytecode = self
.get_factory_dep(bootloader_hash)
.await
.expect("Bootloader code should be present in the database");
.context("failed loading bootloader code")?
.with_context(|| format!("bootloader code with hash {bootloader_hash:?} should be present in the database"))?;
let bootloader_code = SystemContractCode {
code: bytes_to_be_words(bootloader_bytecode),
hash: bootloader_hash,
Expand All @@ -86,16 +87,17 @@ impl FactoryDepsDal<'_, '_> {
let default_aa_bytecode = self
.get_factory_dep(default_aa_hash)
.await
.expect("Default account code should be present in the database");
.context("failed loading default account code")?
.with_context(|| format!("default account code with hash {default_aa_hash:?} should be present in the database"))?;

let default_aa_code = SystemContractCode {
code: bytes_to_be_words(default_aa_bytecode),
hash: default_aa_hash,
};
BaseSystemContracts {
Ok(BaseSystemContracts {
bootloader: bootloader_code,
default_aa: default_aa_code,
}
})
}

/// Returns bytecodes for factory deps with the specified `hashes`.
Expand Down Expand Up @@ -155,7 +157,10 @@ impl FactoryDepsDal<'_, '_> {
}

/// Removes all factory deps with a miniblock number strictly greater than the specified `block_number`.
pub async fn rollback_factory_deps(&mut self, block_number: MiniblockNumber) {
pub async fn rollback_factory_deps(
&mut self,
block_number: MiniblockNumber,
) -> sqlx::Result<()> {
sqlx::query!(
r#"
DELETE FROM factory_deps
Expand All @@ -165,7 +170,7 @@ impl FactoryDepsDal<'_, '_> {
block_number.0 as i64
)
.execute(self.storage.conn())
.await
.unwrap();
.await?;
Ok(())
}
}
40 changes: 23 additions & 17 deletions core/lib/dal/src/protocol_versions_dal.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::convert::TryInto;

use anyhow::Context as _;
use zksync_contracts::{BaseSystemContracts, BaseSystemContractsHashes};
use zksync_types::{
protocol_version::{L1VerifierConfig, ProtocolUpgradeTx, ProtocolVersion, VerifierParams},
Expand Down Expand Up @@ -142,7 +143,7 @@ impl ProtocolVersionsDal<'_, '_> {
pub async fn base_system_contracts_by_timestamp(
&mut self,
current_timestamp: u64,
) -> (BaseSystemContracts, ProtocolVersionId) {
) -> anyhow::Result<(BaseSystemContracts, ProtocolVersionId)> {
let row = sqlx::query!(
r#"
SELECT
Expand All @@ -162,22 +163,26 @@ impl ProtocolVersionsDal<'_, '_> {
)
.fetch_one(self.storage.conn())
.await
.unwrap();
.context("cannot fetch system contract hashes")?;

let protocol_version = (row.id as u16)
.try_into()
.context("bogus protocol version ID")?;
let contracts = self
.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await;
(contracts, (row.id as u16).try_into().unwrap())
.await?;
Ok((contracts, protocol_version))
}

pub async fn load_base_system_contracts_by_version_id(
&mut self,
version_id: u16,
) -> Option<BaseSystemContracts> {
) -> anyhow::Result<Option<BaseSystemContracts>> {
let row = sqlx::query!(
r#"
SELECT
Expand All @@ -192,20 +197,21 @@ impl ProtocolVersionsDal<'_, '_> {
)
.fetch_optional(self.storage.conn())
.await
.unwrap();
if let Some(row) = row {
Some(
self.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await,
)
.context("cannot fetch system contract hashes")?;

Ok(if let Some(row) = row {
let contracts = self
.storage
.factory_deps_dal()
.get_base_system_contracts(
H256::from_slice(&row.bootloader_code_hash),
H256::from_slice(&row.default_account_code_hash),
)
.await?;
Some(contracts)
} else {
None
}
})
}

pub async fn load_previous_version(
Expand Down
3 changes: 2 additions & 1 deletion core/lib/vm_utils/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ impl L1BatchParamsProvider {
let base_system_contracts = storage
.factory_deps_dal()
.get_base_system_contracts(contract_hashes.bootloader, contract_hashes.default_aa)
.await;
.await
.context("failed getting base system contracts")?;

Ok(l1_batch_params(
first_miniblock_in_batch.l1_batch_number,
Expand Down
7 changes: 5 additions & 2 deletions core/lib/zksync_core/src/api_server/web3/namespaces/zks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,8 +525,11 @@ impl ZksNamespace {

let method_latency = API_METRICS.start_call(METHOD_NAME);
let mut storage = self.access_storage(METHOD_NAME).await?;
let bytecode = storage.factory_deps_dal().get_factory_dep(hash).await;

let bytecode = storage
.factory_deps_dal()
.get_factory_dep(hash)
.await
.map_err(|err| internal_error(METHOD_NAME, err))?;
method_latency.observe();
Ok(bytecode)
}
Expand Down
5 changes: 2 additions & 3 deletions core/lib/zksync_core/src/block_reverter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,15 @@ impl BlockReverter {
transaction
.factory_deps_dal()
.rollback_factory_deps(last_miniblock_to_keep)
.await;

.await
.expect("Failed rolling back factory dependencies");
tracing::info!("rolling back storage...");
#[allow(deprecated)]
transaction
.storage_logs_dal()
.rollback_storage(last_miniblock_to_keep)
.await
.expect("failed rolling back storage");

tracing::info!("rolling back storage logs...");
transaction
.storage_logs_dal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use zksync_dal::{ConnectionPool, StorageProcessor};
use zksync_types::MiniblockNumber;

/// Runs the migration for pending miniblocks.
pub(crate) async fn migrate_pending_miniblocks(storage: &mut StorageProcessor<'_>) {
pub(crate) async fn migrate_pending_miniblocks(
storage: &mut StorageProcessor<'_>,
) -> anyhow::Result<()> {
let started_at = Instant::now();
tracing::info!("Started migrating `fee_account_address` for pending miniblocks");

Expand All @@ -19,20 +21,21 @@ pub(crate) async fn migrate_pending_miniblocks(storage: &mut StorageProcessor<'_
.blocks_dal()
.check_l1_batches_have_fee_account_address()
.await
.expect("Failed getting metadata for l1_batches table");
.context("failed getting metadata for l1_batches table")?;
if !l1_batches_have_fee_account_address {
tracing::info!("`l1_batches.fee_account_address` column is removed; assuming that the migration is complete");
return;
return Ok(());
}

#[allow(deprecated)]
let rows_affected = storage
.blocks_dal()
.copy_fee_account_address_for_pending_miniblocks()
.await
.expect("Failed migrating `fee_account_address` for pending miniblocks");
.context("failed migrating `fee_account_address` for pending miniblocks")?;
let elapsed = started_at.elapsed();
tracing::info!("Migrated `fee_account_address` for {rows_affected} miniblocks in {elapsed:?}");
Ok(())
}

/// Runs the migration for non-pending miniblocks. Should be run as a background task.
Expand Down
Loading

0 comments on commit ecb4118

Please sign in to comment.