Skip to content

Commit

Permalink
Restore state for tx queue after restart
Browse files Browse the repository at this point in the history
  • Loading branch information
vladbochok committed Feb 4, 2021
1 parent 27a4f34 commit 3f65ed9
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 0 deletions.
19 changes: 19 additions & 0 deletions core/bin/zksync_eth_sender/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,13 @@ pub(super) trait DatabaseInterface {
connection: &mut StorageProcessor<'_>,
) -> anyhow::Result<VecDeque<ETHOperation>>;

/// Load all aggregated operation that have no confirmation yet and have not yet been sent to Ethereum.
/// Use only after server restart.
async fn restore_unprocessed_operations(
&self,
connection: &mut StorageProcessor<'_>,
) -> anyhow::Result<()>;

/// Loads the unprocessed operations from the database.
/// Unprocessed operations are zkSync operations that were not started at all.
async fn load_new_operations(
Expand Down Expand Up @@ -133,6 +140,18 @@ impl DatabaseInterface for Database {
Ok(unconfirmed_ops)
}

async fn restore_unprocessed_operations(
&self,
connection: &mut StorageProcessor<'_>,
) -> anyhow::Result<()> {
connection
.ethereum_schema()
.restore_unprocessed_operations()
.await?;

Ok(())
}

async fn load_new_operations(
&self,
connection: &mut StorageProcessor<'_>,
Expand Down
4 changes: 4 additions & 0 deletions core/bin/zksync_eth_sender/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl<ETH: EthereumInterface, DB: DatabaseInterface> ETHSender<ETH, DB> {
.await
.expect("Unable create database transaction");

db.restore_unprocessed_operations(&mut transaction)
.await
.expect("Can't restore unprocessed operations");

let ongoing_ops = db
.load_unconfirmed_operations(&mut transaction)
.await
Expand Down
7 changes: 7 additions & 0 deletions core/bin/zksync_eth_sender/src/tests/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,13 @@ impl DatabaseInterface for MockDatabase {
Ok(())
}

async fn restore_unprocessed_operations(
&self,
_connection: &mut StorageProcessor<'_>,
) -> anyhow::Result<()> {
Ok(())
}

async fn load_unconfirmed_operations(
&self,
_connection: &mut StorageProcessor<'_>,
Expand Down
13 changes: 13 additions & 0 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@
"nullable": []
}
},
"18923147a9a9f03dae77d31f106ac53ca69321df1194c921baef8f48ff963c12": {
"query": "WITH aggregate_ops AS (\n SELECT aggregate_operations.id FROM aggregate_operations\n WHERE confirmed = $1 and action_type != $2 and aggregate_operations.id != ANY(SELECT id from eth_aggregated_ops_binding)\n ORDER BY aggregate_operations.id ASC\n )\n INSERT INTO eth_unprocessed_aggregated_ops (op_id)\n SELECT id from aggregate_ops\n ON CONFLICT (op_id)\n DO NOTHING",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Bool",
"Text"
]
},
"nullable": []
}
},
"18b31bb04ceac3422973692fdac7119b42f8d48fa98af34cfc644a30bd72d7b9": {
"query": "\n WITH block_details AS (\n WITH eth_ops AS (\n SELECT DISTINCT ON (to_block, action_type)\n aggregate_operations.from_block,\n aggregate_operations.to_block,\n eth_tx_hashes.tx_hash,\n aggregate_operations.action_type,\n aggregate_operations.created_at,\n aggregate_operations.confirmed\n FROM aggregate_operations\n left join eth_aggregated_ops_binding on eth_aggregated_ops_binding.op_id = aggregate_operations.id\n left join eth_tx_hashes on eth_tx_hashes.eth_op_id = eth_aggregated_ops_binding.eth_op_id\n ORDER BY to_block DESC, action_type, confirmed\n )\n SELECT\n blocks.number AS details_block_number,\n committed.tx_hash AS commit_tx_hash,\n verified.tx_hash AS verify_tx_hash\n FROM blocks\n INNER JOIN eth_ops committed ON\n committed.from_block <= blocks.number AND committed.to_block >= blocks.number AND committed.action_type = 'CommitBlocks' AND committed.confirmed = true\n LEFT JOIN eth_ops verified ON\n verified.from_block <= blocks.number AND verified.to_block >= blocks.number AND verified.action_type = 'ExecuteBlocks' AND verified.confirmed = true\n )\n SELECT\n block_number, \n block_index as \"block_index?\",\n tx_hash,\n success,\n fail_reason as \"fail_reason?\",\n details.commit_tx_hash as \"commit_tx_hash?\",\n details.verify_tx_hash as \"verify_tx_hash?\"\n FROM executed_transactions\n LEFT JOIN block_details details ON details.details_block_number = executed_transactions.block_number\n WHERE (\n (primary_account_address = $1 OR from_account = $1 OR to_account = $1)\n AND (\n block_number = $2 AND (\n COALESCE(block_index, -1) >= $3\n ) OR (\n block_number > $2\n )\n )\n )\n ORDER BY block_number ASC, COALESCE(block_index, -1) ASC\n LIMIT $4\n ",
"describe": {
Expand Down
29 changes: 29 additions & 0 deletions core/lib/storage/src/ethereum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,35 @@ impl<'a, 'c> EthereumSchema<'a, 'c> {
Ok(ops)
}

/// Load all aggregated operation that have no confirmation yet and have not yet been sent to Ethereum.
/// Use only after server restart.
pub async fn restore_unprocessed_operations(&mut self) -> QueryResult<()> {
let start = Instant::now();

sqlx::query!(
"WITH aggregate_ops AS (
SELECT aggregate_operations.id FROM aggregate_operations
WHERE confirmed = $1 and action_type != $2 and aggregate_operations.id != ANY(SELECT id from eth_aggregated_ops_binding)
ORDER BY aggregate_operations.id ASC
)
INSERT INTO eth_unprocessed_aggregated_ops (op_id)
SELECT id from aggregate_ops
ON CONFLICT (op_id)
DO NOTHING",
false,
AggregatedActionType::CreateProofBlocks.to_string()
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.ethereum.restore_unprocessed_operations",
start.elapsed()
);

Ok(())
}

/// Loads the operations which were stored in `aggregate_operations` table,
/// and are in `eth_unprocessed_aggregated_ops`.
pub async fn load_unprocessed_operations(
Expand Down
1 change: 1 addition & 0 deletions infrastructure/grafana/dashboards/sql_ethereum.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ local metrics = [
"sql.ethereum.load_gas_price_limit",
"sql.ethereum.load_stats",
"sql.ethereum.load_unconfirmed_operations",
"sql.ethereum.restore_unprocessed_operations",
"sql.ethereum.load_unprocessed_operations",
"sql.ethereum.report_created_operation",
"sql.ethereum.save_new_eth_tx",
Expand Down

0 comments on commit 3f65ed9

Please sign in to comment.