Skip to content

Commit

Permalink
Emit queued transaction events from the state keeper
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber committed Oct 11, 2021
1 parent d541409 commit 3fd7c3e
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 122 deletions.
11 changes: 11 additions & 0 deletions core/bin/zksync_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub mod register_factory_handler;
pub mod rejected_tx_cleaner;
pub mod state_keeper;
pub mod token_handler;
pub mod tx_event_emitter;

/// Waits for any of the tokio tasks to be finished.
/// Since the main tokio tasks are used as actors which should live as long
Expand Down Expand Up @@ -110,6 +111,9 @@ pub async fn run_core(
let (mempool_block_request_sender, mempool_block_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);

let (processed_tx_events_sender, processed_tx_events_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);

// Start Ethereum Watcher.
let eth_watch_task = start_eth_watch(
eth_watch_req_sender.clone(),
Expand All @@ -135,6 +139,7 @@ pub async fn run_core(
config.chain.state_keeper.block_chunk_sizes.clone(),
config.chain.state_keeper.miniblock_iterations as usize,
config.chain.state_keeper.fast_block_miniblock_iterations as usize,
processed_tx_events_sender,
);
let state_keeper_task = start_state_keeper(state_keeper, pending_block);

Expand Down Expand Up @@ -175,6 +180,11 @@ pub async fn run_core(
// Start rejected transactions cleaner task.
let rejected_tx_cleaner_task = run_rejected_tx_cleaner(&config, connection_pool.clone());

let tx_event_emitter_task = tx_event_emitter::run_tx_event_emitter_task(
connection_pool.clone(),
processed_tx_events_receiver,
);

// Start block proposer.
let proposer_task = run_block_proposer_task(
&config,
Expand All @@ -199,6 +209,7 @@ pub async fn run_core(
rejected_tx_cleaner_task,
token_handler_task,
register_factory_task,
tx_event_emitter_task,
];

if let Some(task) = gateway_watcher_task_opt {
Expand Down
15 changes: 15 additions & 0 deletions core/bin/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use zksync_types::{
use crate::{
committer::{AppliedUpdatesRequest, BlockCommitRequest, CommitRequest},
mempool::ProposedBlock,
tx_event_emitter::ProcessedOperations,
};
use zksync_state::error::{OpError, TxBatchError};

Expand Down Expand Up @@ -131,6 +132,10 @@ pub struct ZkSyncStateKeeper {
success_txs_pending_len: usize,
/// Amount of failed transactions in the pending block at the last pending block synchronization step.
failed_txs_pending_len: usize,

// Channel used for sending queued transaction events. Required since state keeper
// has no access to the database.
processed_tx_events_sender: mpsc::Sender<ProcessedOperations>,
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -401,6 +406,7 @@ impl ZkSyncStateKeeper {
available_block_chunk_sizes: Vec<usize>,
max_miniblock_iterations: usize,
fast_miniblock_iterations: usize,
processed_tx_events_sender: mpsc::Sender<ProcessedOperations>,
) -> Self {
assert!(!available_block_chunk_sizes.is_empty());

Expand Down Expand Up @@ -447,6 +453,7 @@ impl ZkSyncStateKeeper {

success_txs_pending_len: 0,
failed_txs_pending_len: 0,
processed_tx_events_sender,
};

let root = keeper.state.root_hash();
Expand Down Expand Up @@ -693,6 +700,14 @@ impl ZkSyncStateKeeper {
}
}

let _ = self
.processed_tx_events_sender
.send(ProcessedOperations {
block_number: self.state.block_number,
executed_ops,
})
.await;

if !self.pending_block.success_operations.is_empty() {
self.pending_block.pending_block_iteration += 1;
}
Expand Down
4 changes: 4 additions & 0 deletions core/bin/zksync_core/src/state_keeper/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct StateKeeperTester {
impl StateKeeperTester {
fn new(available_chunk_size: usize, max_iterations: usize, fast_iterations: usize) -> Self {
const CHANNEL_SIZE: usize = 32768;
let (events_sender, _events_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_request_tx, request_rx) = mpsc::channel(CHANNEL_SIZE);
let (response_tx, response_rx) = mpsc::channel(CHANNEL_SIZE);

Expand All @@ -37,6 +38,7 @@ impl StateKeeperTester {
vec![available_chunk_size],
max_iterations,
fast_iterations,
events_sender,
);

Self {
Expand Down Expand Up @@ -236,6 +238,7 @@ fn test_create_incorrect_state_keeper() {
const MAX_ITERATIONS: usize = 100;
const FAST_ITERATIONS: usize = 100;

let (events_sender, _events_receiver) = mpsc::channel(CHANNEL_SIZE);
let (_request_tx, request_rx) = mpsc::channel(CHANNEL_SIZE);
let (response_tx, _response_rx) = mpsc::channel(CHANNEL_SIZE);

Expand All @@ -253,6 +256,7 @@ fn test_create_incorrect_state_keeper() {
vec![1, 2, 2], // `available_block_chunk_sizes` must be strictly increasing.
MAX_ITERATIONS,
FAST_ITERATIONS,
events_sender,
);
}

Expand Down
44 changes: 44 additions & 0 deletions core/bin/zksync_core/src/tx_event_emitter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
//! Transaction event emitter is responsible for storing `Queued` events
//! in the database.
//!
//! It exists solely for isolating the state keeper from the storage since
//! it's used as the event queue backend.
// External uses
use futures::{channel::mpsc, StreamExt};
use tokio::task::JoinHandle;

// Workspace deps
use zksync_storage::ConnectionPool;
use zksync_types::{BlockNumber, ExecutedOperations};

/// Miniblock operations processed by the state keeper.
#[derive(Debug)]
pub struct ProcessedOperations {
pub block_number: BlockNumber,
pub executed_ops: Vec<ExecutedOperations>,
}

#[must_use]
pub fn run_tx_event_emitter_task(
db_pool: ConnectionPool,
mut receiever: mpsc::Receiver<ProcessedOperations>,
) -> JoinHandle<()> {
tokio::spawn(async move {
while let Some(ProcessedOperations {
block_number,
executed_ops,
}) = receiever.next().await
{
let mut storage = db_pool
.access_storage()
.await
.expect("tx event emitter failed to access the database");
storage
.event_schema()
.store_executed_transaction_event(block_number, executed_ops)
.await
.expect("tx event emitter failed to store events in the database");
}
})
}
7 changes: 0 additions & 7 deletions core/lib/storage/src/chain/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,13 +843,6 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
.save_block_transactions(block.block_number, block.block_transactions)
.await?;

// Notify about queued and rejected transactions right away without
// waiting for the block commit.
transaction
.event_schema()
.store_executed_transaction_event(block.block_number)
.await?;

let new_block = StorageBlock {
number,
root_hash,
Expand Down
140 changes: 33 additions & 107 deletions core/lib/storage/src/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::{slice, time::Instant};
// External uses
use serde_json::Value;
// Workspace uses
use zksync_basic_types::{AccountId, BlockNumber};
use zksync_basic_types::BlockNumber;
use zksync_types::block::ExecutedOperations;
use zksync_types::event::{
account::{
AccountEvent, AccountStateChangeStatus, AccountStateChangeType, AccountUpdateDetails,
Expand All @@ -12,7 +13,6 @@ use zksync_types::event::{
transaction::{TransactionEvent, TransactionStatus},
EventId,
};
use zksync_types::{block::ExecutedOperations, priority_ops::ZkSyncPriorityOp};
// Local uses
use crate::{QueryResult, StorageProcessor};
use records::StoredEvent;
Expand Down Expand Up @@ -201,32 +201,6 @@ impl<'a, 'c> EventSchema<'a, 'c> {
Ok(())
}

/// Returns the account affected by the operation.
/// In case of `Deposit` we have to query the database in order
/// to get the id, at this point the account creation should be already
/// committed in storage.
async fn account_id_from_op(
&mut self,
executed_operation: &ExecutedOperations,
) -> QueryResult<AccountId> {
let priority_op = match executed_operation {
ExecutedOperations::Tx(tx) => {
return tx.signed_tx.tx.account_id().map_err(anyhow::Error::from)
}
ExecutedOperations::PriorityOp(priority_op) => priority_op,
};
match &priority_op.priority_op.data {
ZkSyncPriorityOp::Deposit(deposit) => self
.0
.chain()
.account_schema()
.account_id_by_address(deposit.to)
.await?
.ok_or_else(|| anyhow::Error::msg("Account doesn't exist")),
ZkSyncPriorityOp::FullExit(full_exit) => Ok(full_exit.account_id),
}
}

/// Create new transaction events and store them in the database.
/// The block is expected to be either committed or finalized.
pub async fn store_confirmed_transaction_event(
Expand All @@ -243,45 +217,20 @@ impl<'a, 'c> EventSchema<'a, 'c> {
.get_block_executed_ops(block_number)
.await?;

let mut events = Vec::with_capacity(block_operations.len());
for executed_operation in block_operations {
// Rejected transactions are not included in the block, skip them.
if !executed_operation.is_successful() {
continue;
}
// Like in the case of block events, we return `Ok`
// if we didn't manage to fetch the account id for the given
// operation.
let account_id = match transaction
.event_schema()
.account_id_from_op(&executed_operation)
.await
{
Ok(account_id) => account_id,
_ => {
// Logging is currently disabled.
//
// vlog::warn!(
// "Couldn't create transaction event, no account id exists \
// in the database. Operation: {:?}",
// executed_operation
// );
continue;
}
};

let transaction_event = TransactionEvent::from_executed_operation(
executed_operation,
block_number,
account_id,
status,
);

let event_data = serde_json::to_value(transaction_event)
.expect("couldn't serialize transaction event");

events.push(event_data);
}
let events: Vec<serde_json::Value> = block_operations
.into_iter()
.filter(ExecutedOperations::is_successful) // Rejected transactions are not included into block.
.map(|executed_operation| {
let transaction_event = TransactionEvent::from_executed_operation(
executed_operation,
block_number,
status,
);

serde_json::to_value(transaction_event)
.expect("couldn't serialize transaction event")
})
.collect();

transaction
.event_schema()
Expand All @@ -294,53 +243,30 @@ impl<'a, 'c> EventSchema<'a, 'c> {
}

/// Fetch executed transactions for the given block and store corresponding
/// `Queued` or `Rejected` events in the database. This method is called when
/// the `committer` saves the block in the database.
/// `Queued` or `Rejected` events in the database. These events are created by
/// the state keeper and emitted by the special actor as soon as `block_operations`
/// are processed.
pub async fn store_executed_transaction_event(
&mut self,
block_number: BlockNumber,
block_operations: Vec<ExecutedOperations>,
) -> QueryResult<()> {
let start = Instant::now();
let mut transaction = self.0.start_transaction().await?;
// Load all operations executed in the given block.
let block_operations = transaction
.chain()
.block_schema()
.get_block_executed_ops(block_number)
.await?;

let mut events = Vec::with_capacity(block_operations.len());
for executed_tx in block_operations {
let account_id = match transaction
.event_schema()
.account_id_from_op(&executed_tx)
.await
{
Ok(account_id) => account_id,
_ => {
// Logging is currently disabled.
//
// vlog::warn!(
// "Couldn't create transaction event, no account id exists \
// in the database. Operation: {:?}",
// rejected_tx
// );
continue;
}
};

let transaction_event = TransactionEvent::from_executed_operation(
executed_tx,
block_number,
account_id,
TransactionStatus::Queued,
);

let event_data = serde_json::to_value(transaction_event)
.expect("couldn't serialize transaction event");

events.push(event_data);
}
let events: Vec<serde_json::Value> = block_operations
.into_iter()
.map(|executed_tx| {
let transaction_event = TransactionEvent::from_executed_operation(
executed_tx,
block_number,
TransactionStatus::Queued,
);

serde_json::to_value(transaction_event)
.expect("couldn't serialize transaction event")
})
.collect();

transaction
.event_schema()
Expand Down
Loading

0 comments on commit 3fd7c3e

Please sign in to comment.