Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/vb-zks-375-update-logic-in-stora…
Browse files Browse the repository at this point in the history
…ge' into dvush-new-sc
  • Loading branch information
dvush committed Jan 29, 2021
2 parents 274caff + d128f32 commit 6a8c988
Show file tree
Hide file tree
Showing 26 changed files with 1,351 additions and 1,565 deletions.
37 changes: 22 additions & 15 deletions core/bin/zksync_api/src/api_server/event_notify/event_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use futures::{channel::mpsc, SinkExt};
use std::time::{Duration, Instant};
use zksync_storage::ConnectionPool;
use zksync_types::{
block::ExecutedOperations, block::PendingBlock, ActionType, BlockNumber, Operation,
aggregated_operations::{AggregatedActionType, AggregatedOperation},
block::ExecutedOperations,
block::PendingBlock,
BlockNumber,
};

/// Simple awaiter for the database futures, which will add a log entry upon DB failure
Expand Down Expand Up @@ -34,15 +37,15 @@ pub struct EventFetcher {
last_verified_block: BlockNumber,
pending_block: Option<PendingBlock>,

operations_sender: mpsc::Sender<Operation>,
operations_sender: mpsc::Sender<AggregatedOperation>,
txs_sender: mpsc::Sender<ExecutedOps>,
}

impl EventFetcher {
pub async fn new(
db_pool: ConnectionPool,
miniblock_interval: Duration,
operations_sender: mpsc::Sender<Operation>,
operations_sender: mpsc::Sender<AggregatedOperation>,
txs_sender: mpsc::Sender<ExecutedOps>,
) -> anyhow::Result<Self> {
let mut fetcher = EventFetcher {
Expand Down Expand Up @@ -85,7 +88,7 @@ impl EventFetcher {
self.send_operations(
self.last_verified_block,
last_verified_block,
ActionType::VERIFY,
AggregatedActionType::ExecuteBlocks,
)
.await;
self.last_verified_block = last_verified_block;
Expand All @@ -97,7 +100,7 @@ impl EventFetcher {
self.send_operations(
self.last_committed_block,
last_committed_block,
ActionType::COMMIT,
AggregatedActionType::CommitBlocks,
)
.await;
self.last_committed_block = last_committed_block;
Expand Down Expand Up @@ -168,14 +171,17 @@ impl EventFetcher {
&mut self,
current_last_block: BlockNumber,
new_last_operation: BlockNumber,
action: ActionType,
aggregated_action: AggregatedActionType,
) {
let start = Instant::now();
// There may be more than one block in the gap.
for block_idx in (current_last_block + 1)..=new_last_operation {
let operation = await_db!(self.load_operation(block_idx, action), continue);
let aggregated_operation = await_db!(
self.load_aggregated_operation(block_idx, aggregated_action),
continue
);
self.operations_sender
.send(operation)
.send(aggregated_operation)
.await
.unwrap_or_default();
}
Expand Down Expand Up @@ -231,26 +237,27 @@ impl EventFetcher {
Ok(last_block)
}

async fn load_operation(
async fn load_aggregated_operation(
&mut self,
block_number: BlockNumber,
action_type: ActionType,
) -> anyhow::Result<Operation> {
aggregated_action_type: AggregatedActionType,
) -> anyhow::Result<AggregatedOperation> {
let start = Instant::now();
let mut storage = self
.db_pool
.access_storage()
.await
.expect("Can't get access to the storage");

let op = storage
let aggregated_operation = storage
.chain()
.operations_schema()
.get_operation(block_number, action_type)
.await
.get_aggregated_op_that_affects_block(aggregated_action_type, block_number)
.await?
.map(|(_, operation)| operation)
.expect("Operation must exist");

metrics::histogram!("api.event_fetcher.load_operation", start.elapsed());
op.into_op(&mut storage).await
Ok(aggregated_operation)
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use super::{
state::NotifierState, sub_store::SubStorage, EventNotifierRequest, EventSubscribeRequest,
ExecutedOps,
};
use crate::api_server::rpc_server::types::{
BlockInfo, ETHOpInfoResp, ResponseAccountState, TransactionInfoResp,
};
use jsonrpc_pubsub::{typed::Subscriber, SubscriptionId};
use std::time::Instant;
use zksync_storage::ConnectionPool;
use zksync_types::aggregated_operations::AggregatedOperation;
use zksync_types::tx::TxHash;
use zksync_types::BlockNumber;
use zksync_types::{block::ExecutedOperations, AccountId, ActionType, Address, Operation};

use super::{
state::NotifierState, sub_store::SubStorage, EventNotifierRequest, EventSubscribeRequest,
ExecutedOps,
};

pub struct OperationNotifier {
state: NotifierState,

Expand Down Expand Up @@ -68,38 +68,48 @@ impl OperationNotifier {
}

/// Processes new block action (commit or verify), notifying the subscribers.
pub async fn handle_new_block(&mut self, op: Operation) -> Result<(), anyhow::Error> {
pub async fn handle_new_block(
&mut self,
aggregation_operation: AggregatedOperation,
) -> anyhow::Result<()> {
let start = Instant::now();
let action = op.action.get_type();

self.handle_executed_operations(
op.block.block_transactions.clone(),
action,
op.block.block_number,
)?;
let (action, blocks) = match aggregation_operation {
AggregatedOperation::CommitBlocks(operation) => (ActionType::COMMIT, operation.blocks),
AggregatedOperation::ExecuteBlocks(operation) => (ActionType::VERIFY, operation.blocks),
_ => return Ok(()),
};

let updated_accounts: Vec<AccountId> = op
.block
.block_transactions
.iter()
.flat_map(|exec_op| exec_op.get_updated_account_ids())
.collect();
for block in blocks {
self.handle_executed_operations(
block.block_transactions.clone(),
action,
block.block_number,
)?;

for id in updated_accounts {
if self.account_subs.subscriber_exists(id, action) {
let account_state = match self.state.get_account_state(id, action).await? {
Some(account_state) => account_state,
None => {
log::warn!(
"Account is updated but not stored in DB, id: {}, block: {:#?}",
id,
op.block
);
continue;
}
};
let updated_accounts: Vec<AccountId> = block
.block_transactions
.iter()
.map(|exec_op| exec_op.get_updated_account_ids())
.flatten()
.collect();

self.account_subs.notify(id, action, account_state);
for id in updated_accounts {
if self.account_subs.subscriber_exists(id, action) {
let account_state = match self.state.get_account_state(id, action).await? {
Some(account_state) => account_state,
None => {
log::warn!(
"Account is updated but not stored in DB, id: {}, block: {:#?}",
id,
block
);
continue;
}
};

self.account_subs.notify(id, action, account_state);
}
}
}

Expand Down
15 changes: 8 additions & 7 deletions core/bin/zksync_api/src/api_server/event_notify/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Instant;
use zksync_storage::chain::operations::records::StoredExecutedPriorityOperation;
use zksync_storage::chain::operations_ext::records::TxReceiptResponse;
use zksync_storage::ConnectionPool;
use zksync_types::aggregated_operations::AggregatedActionType;
use zksync_types::tx::TxHash;
use zksync_types::BlockNumber;
use zksync_types::{AccountId, ActionType, Address};
Expand Down Expand Up @@ -78,16 +79,16 @@ impl NotifierState {
.get_block(block_number)
.await?
{
let verified = if let Some(block_verify) = transaction
let verified = transaction
.chain()
.operations_schema()
.get_operation(block_number, ActionType::VERIFY)
.get_stored_aggregated_operation(
block_number,
AggregatedActionType::ExecuteBlocks,
)
.await
{
block_verify.confirmed
} else {
false
};
.map(|operation| operation.confirmed)
.unwrap_or_default();

BlockInfo {
block_number: i64::from(block_with_op.block_number),
Expand Down
23 changes: 10 additions & 13 deletions core/bin/zksync_core/src/committer/aggregated_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,25 +196,20 @@ const AVAILABLE_AGGREGATE_PROOFS: &[usize] = &[1, 5];
async fn create_aggregated_commits_storage(
storage: &mut StorageProcessor<'_>,
) -> anyhow::Result<bool> {
let last_committed_block = BlockSchema(storage).get_last_committed_block().await?;
let last_aggregate_committed_block = OperationsSchema(storage)
.get_last_affected_block_by_aggregated_action(AggregatedActionType::CommitBlocks)
.await?;
if last_committed_block <= last_aggregate_committed_block {
return Ok(false);
}

let old_committed_block = BlockSchema(storage)
.get_block(last_aggregate_committed_block)
.await?
.expect("Failed to get last committed block from db");

let mut new_blocks = Vec::new();
for block_number in last_aggregate_committed_block + 1..=last_committed_block {
let block = BlockSchema(storage)
.get_block(block_number)
.await?
.expect("Failed to get committed block");
let mut block_number = last_aggregate_committed_block + 1;

while let Some(block) = BlockSchema(storage).get_block(block_number).await? {
new_blocks.push(block);
block_number += 1;
}

let commit_operation = create_new_commit_operation(
Expand All @@ -241,16 +236,18 @@ async fn create_aggregated_commits_storage(
async fn create_aggregated_prover_task_storage(
storage: &mut StorageProcessor<'_>,
) -> anyhow::Result<bool> {
let last_committed_block = BlockSchema(storage).get_last_committed_block().await?;
let last_aggregate_committed_block = OperationsSchema(storage)
.get_last_affected_block_by_aggregated_action(AggregatedActionType::CommitBlocks)
.await?;
let last_aggregate_create_proof_block = OperationsSchema(storage)
.get_last_affected_block_by_aggregated_action(AggregatedActionType::CreateProofBlocks)
.await?;
if last_committed_block <= last_aggregate_create_proof_block {
if last_aggregate_committed_block <= last_aggregate_create_proof_block {
return Ok(false);
}

let mut blocks_with_proofs = Vec::new();
for block_number in last_aggregate_create_proof_block + 1..=last_committed_block {
for block_number in last_aggregate_create_proof_block + 1..=last_aggregate_committed_block {
let proof_exists = ProverSchema(storage)
.load_proof(block_number)
.await?
Expand Down
10 changes: 3 additions & 7 deletions core/bin/zksync_core/src/committer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,12 @@ async fn commit_block(
.await
.expect("committer must commit the pending block into db");

let op = Operation {
action: Action::Commit,
block,
id: None,
};
log::info!("commit block #{}", op.block.block_number);
log::info!("commit block #{}", block.block_number);

transaction
.chain()
.block_schema()
.execute_operation(op.clone())
.save_block(block)
.await
.expect("committer must commit the op into db");

Expand Down
26 changes: 15 additions & 11 deletions core/bin/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use zksync_crypto::ff::{PrimeField, PrimeFieldRepr};
use zksync_state::state::{CollectedFee, OpSuccess, ZkSyncState};
use zksync_storage::ConnectionPool;
use zksync_types::{
aggregated_operations::AggregatedActionType,
block::{
Block, ExecutedOperations, ExecutedPriorityOp, ExecutedTx,
PendingBlock as SendablePendingBlock,
Expand Down Expand Up @@ -328,19 +329,22 @@ impl ZkSyncStateInitParams {
storage: &mut zksync_storage::StorageProcessor<'_>,
block_number: BlockNumber,
) -> Result<u64, anyhow::Error> {
let storage_op = storage
let is_operation_exists = storage
.chain()
.operations_schema()
.get_operation(block_number, ActionType::COMMIT)
.await;
if let Some(storage_op) = storage_op {
Ok(storage_op
.into_op(storage)
.await
.map_err(|e| anyhow::format_err!("could not convert storage_op: {}", e))?
.block
.processed_priority_ops
.1)
.get_stored_aggregated_operation(block_number, AggregatedActionType::CommitBlocks)
.await
.is_some();

if is_operation_exists {
let block = storage
.chain()
.block_schema()
.get_block(block_number)
.await?
.expect("Block should exist");

Ok(block.processed_priority_ops.1)
} else {
Ok(0)
}
Expand Down
23 changes: 10 additions & 13 deletions core/bin/zksync_eth_sender/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ impl DatabaseInterface for Database {
transaction
.chain()
.operations_schema()
.confirm_operations(first_block, last_block, ActionType::COMMIT)
.confirm_aggregated_operations(
first_block,
last_block,
AggregatedActionType::CommitBlocks,
)
.await?;
}
Some((_, AggregatedOperation::ExecuteBlocks(op))) => {
Expand All @@ -238,23 +242,16 @@ impl DatabaseInterface for Database {
.state_schema()
.apply_state_update(block.block_number)
.await?;
transaction
.chain()
.block_schema()
.execute_operation(Operation {
id: None,
action: Action::Verify {
proof: Default::default(),
},
block: block.clone(),
})
.await?;
}

transaction
.chain()
.operations_schema()
.confirm_operations(first_block, last_block, ActionType::VERIFY)
.confirm_aggregated_operations(
first_block,
last_block,
AggregatedActionType::ExecuteBlocks,
)
.await?;
}
_ => {}
Expand Down
Loading

0 comments on commit 6a8c988

Please sign in to comment.