Skip to content

Commit

Permalink
Return priority operations to mempool
Browse files Browse the repository at this point in the history
Signed-off-by: deniallugo <[email protected]>
  • Loading branch information
Deniallugo committed Jan 11, 2022
1 parent 893b179 commit bc79ef7
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 36 deletions.
2 changes: 1 addition & 1 deletion core/bin/block_revert/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn revert_blocks_in_storage(
transaction
.chain()
.operations_schema()
.remove_executed_priority_operations(last_block)
.return_executed_priority_operations_to_mempool(last_block)
.await?;
println!("`executed_priority_operations` table is cleaned");
transaction
Expand Down
106 changes: 86 additions & 20 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2024,6 +2024,92 @@
]
}
},
"38a95c4e1356fb51dfb58fc880aea90b6ffb514520150e2c9b7bfe38fdeb0d80": {
"query": "SELECT * FROM executed_priority_operations WHERE block_number > $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "block_number",
"type_info": "Int8"
},
{
"ordinal": 1,
"name": "block_index",
"type_info": "Int4"
},
{
"ordinal": 2,
"name": "operation",
"type_info": "Jsonb"
},
{
"ordinal": 3,
"name": "from_account",
"type_info": "Bytea"
},
{
"ordinal": 4,
"name": "to_account",
"type_info": "Bytea"
},
{
"ordinal": 5,
"name": "priority_op_serialid",
"type_info": "Int8"
},
{
"ordinal": 6,
"name": "deadline_block",
"type_info": "Int8"
},
{
"ordinal": 7,
"name": "eth_hash",
"type_info": "Bytea"
},
{
"ordinal": 8,
"name": "eth_block",
"type_info": "Int8"
},
{
"ordinal": 9,
"name": "created_at",
"type_info": "Timestamptz"
},
{
"ordinal": 10,
"name": "eth_block_index",
"type_info": "Int8"
},
{
"ordinal": 11,
"name": "tx_hash",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false,
false,
false,
false,
false,
false,
false,
false,
false,
false,
true,
false
]
}
},
"38e7464ba17d495fe87cf1412ffe10af8dc4b44f99dc2df1ec580f20f609b650": {
"query": "SELECT max(number) FROM incomplete_blocks",
"describe": {
Expand Down Expand Up @@ -2087,26 +2173,6 @@
"nullable": []
}
},
"399d6d7196f0353420ebc7546baa916d5f963bbe97aa5b35543582f0b873d773": {
"query": "SELECT tx_hash FROM executed_priority_operations WHERE block_number > $1",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Int8"
]
},
"nullable": [
false
]
}
},
"3a61f335dc699e6126346c77cea44995e48efb57d39624c63c55d342ca2ea1b1": {
"query": "DELETE FROM tx_filters\n WHERE tx_hash = $1",
"describe": {
Expand Down
26 changes: 18 additions & 8 deletions core/lib/storage/src/chain/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chrono::{Duration, Utc};
use zksync_types::{
aggregated_operations::{AggregatedActionType, AggregatedOperation},
tx::TxHash,
BlockNumber, SerialId, H256,
BlockNumber, PriorityOp, SerialId, H256,
};
// Local imports
use self::records::{
Expand Down Expand Up @@ -590,31 +590,41 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
}

// Removes executed priority operations for blocks with number greater than `last_block`
pub async fn remove_executed_priority_operations(
pub async fn return_executed_priority_operations_to_mempool(
&mut self,
last_block: BlockNumber,
) -> QueryResult<()> {
let start = Instant::now();
let mut transaction = self.0.start_transaction().await?;

let records = sqlx::query!(
"SELECT tx_hash FROM executed_priority_operations WHERE block_number > $1",
let records = sqlx::query_as!(
StoredExecutedPriorityOperation,
"SELECT * FROM executed_priority_operations WHERE block_number > $1",
*last_block as i64
)
.fetch_all(self.0.conn())
.fetch_all(transaction.conn())
.await?;
let hashes: Vec<Vec<u8>> = records.into_iter().map(|r| r.tx_hash).collect();

let hashes: Vec<Vec<u8>> = records.iter().map(|r| r.tx_hash.clone()).collect();
{
let ops: Vec<PriorityOp> = records.into_iter().map(|rec| rec.into()).collect();
MempoolSchema(&mut transaction)
.insert_priority_ops(&ops, true)
.await?;
}

sqlx::query!("DELETE FROM tx_filters WHERE tx_hash = ANY($1)", &hashes)
.execute(self.0.conn())
.execute(transaction.conn())
.await?;

sqlx::query!(
"DELETE FROM executed_priority_operations WHERE block_number > $1",
*last_block as i64
)
.execute(self.0.conn())
.execute(transaction.conn())
.await?;

transaction.commit().await?;
metrics::histogram!(
"sql.chain.operations.remove_executed_priority_operations",
start.elapsed()
Expand Down
14 changes: 14 additions & 0 deletions core/lib/storage/src/chain/operations/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use chrono::prelude::*;
use serde_json::value::Value;
use sqlx::FromRow;
use zksync_types::{PriorityOp, H256};
// Workspace imports
// Local imports
use crate::StorageActionType;
Expand Down Expand Up @@ -32,6 +33,19 @@ pub struct StoredExecutedPriorityOperation {
pub tx_hash: Vec<u8>,
}

impl From<StoredExecutedPriorityOperation> for PriorityOp {
fn from(value: StoredExecutedPriorityOperation) -> Self {
Self {
serial_id: value.priority_op_serialid as u64,
data: serde_json::from_value(value.operation).expect("Should be correct stored"),
deadline_block: value.deadline_block as u64,
eth_hash: H256::from_slice(&value.eth_hash),
eth_block: value.eth_block as u64,
eth_block_index: Some(value.block_index as u64),
}
}
}

#[derive(Debug, Clone, FromRow)]
pub struct StoredExecutedTransaction {
pub block_number: i64,
Expand Down
28 changes: 21 additions & 7 deletions core/lib/storage/src/tests/chain/operations.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
// External imports
use chrono::{Duration, Utc};
// Workspace imports
use zksync_types::{aggregated_operations::AggregatedActionType, BlockNumber};
use zksync_types::{
aggregated_operations::AggregatedActionType, Address, BlockNumber, Deposit, ZkSyncPriorityOp,
H256,
};
// Local imports
use crate::chain::mempool::MempoolSchema;
use crate::{
chain::{
block::BlockSchema,
Expand Down Expand Up @@ -443,16 +447,22 @@ async fn test_remove_executed_priority_operations(
let executed_priority_op = NewExecutedPriorityOperation {
block_number,
block_index: 1,
operation: Default::default(),
from_account: Default::default(),
to_account: Default::default(),
operation: serde_json::to_value(ZkSyncPriorityOp::Deposit(Deposit {
from: Address::zero(),
token: Default::default(),
amount: Default::default(),
to: Address::zero(),
}))
.unwrap(),
from_account: Address::zero().as_bytes().to_vec(),
to_account: Address::zero().as_bytes().to_vec(),
priority_op_serialid: block_number,
deadline_block: 100,
eth_hash: vec![0xDE, 0xAD, 0xBE, 0xEF],
eth_hash: H256::zero().as_bytes().to_vec(),
eth_block: 10,
created_at: chrono::Utc::now(),
eth_block_index: Some(1),
tx_hash: Default::default(),
tx_hash: H256::zero().as_bytes().to_vec(),
affected_accounts: Default::default(),
token: Default::default(),
};
Expand All @@ -463,7 +473,7 @@ async fn test_remove_executed_priority_operations(

// Remove priority operation with block numbers greater than 3.
OperationsSchema(&mut storage)
.remove_executed_priority_operations(BlockNumber(3))
.return_executed_priority_operations_to_mempool(BlockNumber(3))
.await?;

// Check that priority operation from the 3rd block is present and from the 4th is not.
Expand All @@ -477,6 +487,10 @@ async fn test_remove_executed_priority_operations(
.await?;
assert!(block4_txs.is_empty());

let mempool_txs = MempoolSchema(&mut storage)
.get_confirmed_priority_ops()
.await?;
assert_eq!(mempool_txs.len(), 2);
Ok(())
}

Expand Down

0 comments on commit bc79ef7

Please sign in to comment.