From ce94ac57203c2abe58ce461479808b9a3ccba9f5 Mon Sep 17 00:00:00 2001 From: deniallugo Date: Tue, 25 Jan 2022 19:08:27 +0300 Subject: [PATCH 1/4] Remove tx_filters with rejected txs Signed-off-by: deniallugo --- core/lib/config/src/configs/database.rs | 2 +- core/lib/storage/sqlx-data.json | 56 +++++++++++++++---- core/lib/storage/src/chain/operations/mod.rs | 23 ++++++-- .../lib/storage/src/tests/chain/operations.rs | 56 +++++++++++++++---- 4 files changed, 110 insertions(+), 27 deletions(-) diff --git a/core/lib/config/src/configs/database.rs b/core/lib/config/src/configs/database.rs index 7785e51ecc..b40191f3a2 100644 --- a/core/lib/config/src/configs/database.rs +++ b/core/lib/config/src/configs/database.rs @@ -32,7 +32,7 @@ impl DBConfig { } pub fn rejected_transactions_cleaner_interval(&self) -> time::Duration { - time::Duration::from_secs(self.rejected_transactions_cleaner_interval * Self::SECS_PER_HOUR) + time::Duration::from_secs(self.rejected_transactions_cleaner_interval) } } diff --git a/core/lib/storage/sqlx-data.json b/core/lib/storage/sqlx-data.json index 2390124ba3..d2aaa26083 100644 --- a/core/lib/storage/sqlx-data.json +++ b/core/lib/storage/sqlx-data.json @@ -2854,6 +2854,18 @@ "nullable": [] } }, + "5e5becde03270ceb82f605ea94c70dac192e9a0f7dd2c918d8dc26d1902d2067": { + "query": "DELETE FROM tx_filters WHERE tx_hash = ANY ($1)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [] + } + }, "5fac3f8e9ad91897751e7f14c56723f24d1c85ed146679296525e667b55b3947": { "query": "\n SELECT id, address, decimals, kind as \"kind: _\", symbol FROM tokens\n WHERE id >= $1 AND kind = 'ERC20'::token_kind\n ORDER BY id ASC\n LIMIT $2\n ", "describe": { @@ -4823,18 +4835,6 @@ "nullable": [] } }, - "a545c7251d381afbb2bdcbaef0cc5f1bc322db47ee0db555b83b9e19119bc756": { - "query": "DELETE FROM executed_transactions WHERE tx_hash IN (SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamptz" - ] - }, - "nullable": [] - } - }, "a665923ec57382f357f6bb65f6e35876fbfedbf1661b3ce34f2458b63eebc68e": { "query": "\n INSERT INTO subsidies ( tx_hash, usd_amount_scale6, full_cost_usd_scale6, token_id, token_amount, full_cost_token, subsidy_type )\n VALUES ( $1, $2, $3, $4, $5, $6, $7 )\n ", "describe": { @@ -5707,6 +5707,18 @@ ] } }, + "c31936ecaa097fc0711fa24e79ee415bfc3da855f29b2138ecbaced1341d5e7f": { + "query": "DELETE FROM executed_transactions WHERE tx_hash = ANY ($1)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "ByteaArray" + ] + }, + "nullable": [] + } + }, "c3632674ee6614b83e258c75447dc986507481a56fbdd5e05dedd0775f21fb79": { "query": "\n SELECT\n token_id as \"token_id!\", creator_account_id as \"creator_account_id!\",\n creator_address as \"creator_address!\", serial_id as \"serial_id!\",\n nft.address as \"address!\", content_hash as \"content_hash!\",\n tokens.symbol as \"symbol!\"\n FROM nft\n INNER JOIN tokens\n ON tokens.id = nft.token_id\n ", "describe": { @@ -7004,6 +7016,26 @@ ] } }, + "e8c69e16fe0e447d578dee61807b418ee4e8d5aebb60034cbc640eafceb593e2": { + "query": "SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [ + false + ] + } + }, "e99d990d2d9b1c6068efb623634d6d6cf49a3c7ec33a5a916b7ddaa745e24c9b": { "query": "\n SELECT * FROM prover_job_queue\n WHERE job_status = $1\n ORDER BY (job_priority, id, first_block)\n LIMIT 1\n ", "describe": { diff --git a/core/lib/storage/src/chain/operations/mod.rs b/core/lib/storage/src/chain/operations/mod.rs index 6d52442e55..acbaed8d01 100644 --- a/core/lib/storage/src/chain/operations/mod.rs +++ b/core/lib/storage/src/chain/operations/mod.rs @@ -240,7 +240,6 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { tokens.push(*token); } } - sqlx::query!( " INSERT INTO tx_filters (address, token, tx_hash) @@ -270,13 +269,29 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { pub async fn remove_rejected_transactions(&mut self, max_age: Duration) -> QueryResult<()> { let start = Instant::now(); + let mut transaction = self.0.start_transaction().await?; let offset = Utc::now() - max_age; + let tx_hashes: Vec> = sqlx::query!("SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000", offset).fetch_all(transaction.conn()).await? + .into_iter() + .map(|value| { + value.tx_hash + }) + .collect(); + sqlx::query!( - "DELETE FROM executed_transactions WHERE tx_hash IN (SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000)", - offset + "DELETE FROM executed_transactions WHERE tx_hash = ANY ($1)", + &tx_hashes ) - .execute(self.0.conn()) + .execute(transaction.conn()) .await?; + sqlx::query!( + "DELETE FROM tx_filters WHERE tx_hash = ANY ($1)", + &tx_hashes + ) + .execute(transaction.conn()) + .await?; + + transaction.commit().await?; metrics::histogram!( "sql.chain.operations.remove_rejected_transactions", diff --git a/core/lib/storage/src/tests/chain/operations.rs b/core/lib/storage/src/tests/chain/operations.rs index dc71bf4e27..51668273f9 100644 --- a/core/lib/storage/src/tests/chain/operations.rs +++ b/core/lib/storage/src/tests/chain/operations.rs @@ -1,7 +1,7 @@ // External imports use chrono::{Duration, Utc}; // Workspace imports -use zksync_types::{aggregated_operations::AggregatedActionType, BlockNumber}; +use zksync_types::{aggregated_operations::AggregatedActionType, Address, BlockNumber}; // Local imports use crate::{ chain::{ @@ -297,7 +297,7 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer let timestamp_1 = Utc::now() - Duration::weeks(1); let executed_tx_1 = NewExecutedTransaction { block_number: BLOCK_NUMBER, - tx_hash: vec![0x12, 0xAD, 0xBE, 0xEF], + tx_hash: vec![1, 2, 3, 4], tx: Default::default(), operation: Default::default(), from_account: Default::default(), @@ -310,19 +310,24 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer created_at: timestamp_1, eth_sign_data: None, batch_id: None, - affected_accounts: Vec::new(), - used_tokens: Vec::new(), + affected_accounts: vec![Address::zero().as_bytes().to_vec()], + used_tokens: vec![0], }; let timestamp_2 = timestamp_1 - Duration::weeks(1); let mut executed_tx_2 = executed_tx_1.clone(); // Set new timestamp and different tx_hash since it's a PK. executed_tx_2.created_at = timestamp_2; executed_tx_2.tx_hash = vec![0, 11, 21, 5]; - // Successful one. + let mut executed_tx_3 = executed_tx_1.clone(); - executed_tx_3.success = true; - executed_tx_3.tx_hash = vec![1, 1, 2, 30]; - executed_tx_3.created_at = timestamp_2 - Duration::weeks(1); + executed_tx_3.success = false; + executed_tx_3.tx_hash = vec![10, 2, 4, 30]; + executed_tx_3.created_at = timestamp_2; + // Successful one. + let mut executed_tx_4 = executed_tx_1.clone(); + executed_tx_4.success = true; + executed_tx_4.tx_hash = vec![1, 1, 2, 30]; + executed_tx_4.created_at = timestamp_2 - Duration::weeks(1); // Store them. storage .chain() @@ -339,6 +344,11 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer .operations_schema() .store_executed_tx(executed_tx_3) .await?; + storage + .chain() + .operations_schema() + .store_executed_tx(executed_tx_4) + .await?; // First check, no transactions removed. storage .chain() @@ -351,8 +361,16 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer .stats_schema() .count_outstanding_proofs(block_number) .await?; - assert_eq!(count, 3); - // Second transaction should be removed. + + let count_tx_filters = storage + .chain() + .operations_ext_schema() + .get_account_transactions_count(Default::default(), None, None) + .await?; + assert_eq!(count, 4); + assert_eq!(count_tx_filters, 4); + + // Second and third transaction should be removed. storage .chain() .operations_schema() @@ -363,7 +381,13 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer .stats_schema() .count_outstanding_proofs(block_number) .await?; + let count_tx_filters = storage + .chain() + .operations_ext_schema() + .get_account_transactions_count(Default::default(), None, None) + .await?; assert_eq!(count, 2); + assert_eq!(count_tx_filters, 2); // Finally, no rejected transactions remain. storage .chain() @@ -375,14 +399,26 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer .stats_schema() .count_outstanding_proofs(block_number) .await?; + let count_tx_filters = storage + .chain() + .operations_ext_schema() + .get_account_transactions_count(Default::default(), None, None) + .await?; assert_eq!(count, 1); + assert_eq!(count_tx_filters, 1); // The last one is indeed succesful. let count = storage .chain() .stats_schema() .count_total_transactions() .await?; + let count_tx_filters = storage + .chain() + .operations_ext_schema() + .get_account_transactions_count(Default::default(), None, None) + .await?; assert_eq!(count, 1); + assert_eq!(count_tx_filters, 1); Ok(()) } From 51148680caf2081e5b2114a5870e8e3d150a91ba Mon Sep 17 00:00:00 2001 From: deniallugo Date: Tue, 25 Jan 2022 20:00:51 +0300 Subject: [PATCH 2/4] Migration for remove outstanding tx_filters Signed-off-by: deniallugo --- .../remove_outstanding_tx_filters/Cargo.toml | 26 +++++++ .../remove_outstanding_tx_filters/src/main.rs | 12 ++++ core/lib/storage/sqlx-data.json | 36 ++++++++++ core/lib/storage/src/chain/operations/mod.rs | 71 +++++++++++++++++++ 4 files changed, 145 insertions(+) create mode 100644 core/bin/remove_outstanding_tx_filters/Cargo.toml create mode 100644 core/bin/remove_outstanding_tx_filters/src/main.rs diff --git a/core/bin/remove_outstanding_tx_filters/Cargo.toml b/core/bin/remove_outstanding_tx_filters/Cargo.toml new file mode 100644 index 0000000000..c1fcbd9e03 --- /dev/null +++ b/core/bin/remove_outstanding_tx_filters/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "remove_outstanding_tx_filters" +version = "0.1.0" +edition = "2021" +authors = ["The Matter Labs Team "] +homepage = "https://zksync.io/" +repository = "https://github.com/matter-labs/zksync" +license = "Apache-2.0" +keywords = ["blockchain", "zksync"] +categories = ["cryptography"] +publish = false # We don't want to publish our binaries. + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +# TODO remove it ZKS-931 +[dependencies] +zksync_types = { path = "../../lib/types", version = "1.0" } +zksync_storage = { path = "../../lib/storage", version = "1.0" } +zksync_eth_client = { path = "../../lib/eth_client", version = "1.0" } +zksync_config = { path = "../../lib/config", version = "1.0" } + +tokio = { version = "1", features = ["full"] } +ethabi = "14.0.0" +anyhow = "1.0" +web3 = "0.16.0" +structopt = "0.3.20" diff --git a/core/bin/remove_outstanding_tx_filters/src/main.rs b/core/bin/remove_outstanding_tx_filters/src/main.rs new file mode 100644 index 0000000000..9dc33b6449 --- /dev/null +++ b/core/bin/remove_outstanding_tx_filters/src/main.rs @@ -0,0 +1,12 @@ +use zksync_storage::StorageProcessor; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let mut storage = StorageProcessor::establish_connection().await?; + storage + .chain() + .operations_schema() + .remove_outstanding_tx_filters() + .await?; + Ok(()) +} diff --git a/core/lib/storage/sqlx-data.json b/core/lib/storage/sqlx-data.json index d2aaa26083..79e996c65c 100644 --- a/core/lib/storage/sqlx-data.json +++ b/core/lib/storage/sqlx-data.json @@ -489,6 +489,24 @@ "nullable": [] } }, + "11decdd07f890f291a8526e42e110ed277ff894b1b5b52f319001c9d8311e954": { + "query": "\n WITH transactions AS (\n SELECT tx_hash\n FROM executed_transactions\n ), priority_ops AS (\n SELECT tx_hash\n FROM executed_priority_operations\n ), everything AS (\n SELECT * FROM transactions\n UNION ALL\n SELECT * FROM priority_ops\n )\n SELECT\n tx_hash as \"tx_hash!\"\n FROM everything", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash!", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + null + ] + } + }, "1401ea10d9e110da48aac1ebfa7aeb855c273adf34f6ee92b0fdaaf7de603049": { "query": "\n SELECT tx_hash, created_at\n FROM mempool_txs\n INNER JOIN txs_batches_hashes\n ON txs_batches_hashes.batch_id = mempool_txs.batch_id\n WHERE batch_hash = $1\n ORDER BY id ASC\n ", "describe": { @@ -3790,6 +3808,24 @@ ] } }, + "7fe39c62da8c820d95f98238a0f206970328e97a9eed6f532e97d2346074941e": { + "query": "SELECT DISTINCT tx_hash FROM tx_filters", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [ + false + ] + } + }, "7ff98a4fddc441ea83f72a4a75a7caf53b9661c37f26a90984a349bfa5aeab70": { "query": "INSERT INTO eth_aggregated_ops_binding (op_id, eth_op_id) VALUES ($1, $2)", "describe": { diff --git a/core/lib/storage/src/chain/operations/mod.rs b/core/lib/storage/src/chain/operations/mod.rs index acbaed8d01..9a5d568e86 100644 --- a/core/lib/storage/src/chain/operations/mod.rs +++ b/core/lib/storage/src/chain/operations/mod.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; // Built-in deps use std::time::Instant; // External imports @@ -299,7 +300,77 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { ); Ok(()) } + // TODO remove it ZKS-931 + pub async fn remove_outstanding_tx_filters(&mut self) -> QueryResult<()> { + // We can do something like this, but this query will block tx_filter table for a long long time. + // So I have to rewrite this logic to rust + // sqlx::query!( + // r#"DELETE FROM tx_filters WHERE tx_hash NOT IN ( + // WITH transactions AS ( + // SELECT tx_hash + // FROM executed_transactions + // ), priority_ops AS ( + // SELECT tx_hash + // FROM executed_priority_operations + // ), everything AS ( + // SELECT * FROM transactions + // UNION ALL + // SELECT * FROM priority_ops + // ) + // SELECT + // tx_hash as "tx_hash!" + // FROM everything + // )"# + // ) + // .execute(self.0.conn()) + // .await?; + let mut transaction = self.0.start_transaction().await?; + let tx_hashes: HashSet> = sqlx::query!( + r#" + WITH transactions AS ( + SELECT tx_hash + FROM executed_transactions + ), priority_ops AS ( + SELECT tx_hash + FROM executed_priority_operations + ), everything AS ( + SELECT * FROM transactions + UNION ALL + SELECT * FROM priority_ops + ) + SELECT + tx_hash as "tx_hash!" + FROM everything"# + ) + .fetch_all(transaction.conn()) + .await? + .into_iter() + .map(|value| value.tx_hash) + .collect(); + + let tx_filter_hashes: HashSet> = + sqlx::query!("SELECT DISTINCT tx_hash FROM tx_filters") + .fetch_all(transaction.conn()) + .await? + .into_iter() + .map(|value| value.tx_hash) + .collect(); + + let difference: Vec> = tx_filter_hashes + .difference(&tx_hashes) + .into_iter() + .cloned() + .collect(); + + for chunk in difference.chunks(100) { + sqlx::query!("DELETE FROM tx_filters WHERE tx_hash = ANY ($1)", chunk) + .execute(transaction.conn()) + .await?; + } + + Ok(()) + } /// Stores executed priority operation in database. /// /// This method is made public to fill the database for tests, do not use it for From 7b5a6d74039176c552bc927853582a1079e8e3fa Mon Sep 17 00:00:00 2001 From: deniallugo Date: Tue, 25 Jan 2022 20:24:23 +0300 Subject: [PATCH 3/4] Review fixes Signed-off-by: deniallugo --- Cargo.lock | 15 ++++++++ Cargo.toml | 2 +- core/lib/storage/sqlx-data.json | 40 ++++++++++---------- core/lib/storage/src/chain/operations/mod.rs | 14 +++++-- 4 files changed, 46 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ef804e2440..2ad124d85d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3860,6 +3860,21 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "remove_outstanding_tx_filters" +version = "0.1.0" +dependencies = [ + "anyhow", + "ethabi", + "structopt", + "tokio", + "web3", + "zksync_config", + "zksync_eth_client", + "zksync_storage", + "zksync_types", +] + [[package]] name = "remove_proofs" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 312cf2179b..f6c0fbe8f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "core/bin/parse_pub_data", "core/bin/block_revert", "core/bin/remove_proofs", - + "core/bin/remove_outstanding_tx_filters", # Server micro-services "core/bin/zksync_api", "core/bin/zksync_core", diff --git a/core/lib/storage/sqlx-data.json b/core/lib/storage/sqlx-data.json index 79e996c65c..e4c776c7bf 100644 --- a/core/lib/storage/sqlx-data.json +++ b/core/lib/storage/sqlx-data.json @@ -7052,26 +7052,6 @@ ] } }, - "e8c69e16fe0e447d578dee61807b418ee4e8d5aebb60034cbc640eafceb593e2": { - "query": "SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "tx_hash", - "type_info": "Bytea" - } - ], - "parameters": { - "Left": [ - "Timestamptz" - ] - }, - "nullable": [ - false - ] - } - }, "e99d990d2d9b1c6068efb623634d6d6cf49a3c7ec33a5a916b7ddaa745e24c9b": { "query": "\n SELECT * FROM prover_job_queue\n WHERE job_status = $1\n ORDER BY (job_priority, id, first_block)\n LIMIT 1\n ", "describe": { @@ -7723,6 +7703,26 @@ ] } }, + "f57d1b54785c52a96afa9a95c4f1a08808732c9aaa850c32ed7d1158fdf541c4": { + "query": "\n SELECT tx_hash FROM executed_transactions \n WHERE success = false AND created_at < $1 LIMIT 1000", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_hash", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Timestamptz" + ] + }, + "nullable": [ + false + ] + } + }, "f5a24f01f525ede5d8e61b97e452a82d372c2bececacf693ab654eef0e453d94": { "query": "SELECT max(to_block) from aggregate_operations where action_type = $1", "describe": { diff --git a/core/lib/storage/src/chain/operations/mod.rs b/core/lib/storage/src/chain/operations/mod.rs index 9a5d568e86..2a7d18cc66 100644 --- a/core/lib/storage/src/chain/operations/mod.rs +++ b/core/lib/storage/src/chain/operations/mod.rs @@ -272,11 +272,16 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { let mut transaction = self.0.start_transaction().await?; let offset = Utc::now() - max_age; - let tx_hashes: Vec> = sqlx::query!("SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000", offset).fetch_all(transaction.conn()).await? + let tx_hashes: Vec> = sqlx::query!( + r#" + SELECT tx_hash FROM executed_transactions + WHERE success = false AND created_at < $1 LIMIT 1000"#, + offset + ) + .fetch_all(transaction.conn()) + .await? .into_iter() - .map(|value| { - value.tx_hash - }) + .map(|value| value.tx_hash) .collect(); sqlx::query!( @@ -300,6 +305,7 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { ); Ok(()) } + // TODO remove it ZKS-931 pub async fn remove_outstanding_tx_filters(&mut self) -> QueryResult<()> { // We can do something like this, but this query will block tx_filter table for a long long time. From 40b9e16a2af62ebe05a537767e18642fc05aa2ad Mon Sep 17 00:00:00 2001 From: deniallugo Date: Wed, 26 Jan 2022 12:47:19 +0300 Subject: [PATCH 4/4] Commit transaction Signed-off-by: deniallugo --- core/lib/storage/src/chain/operations/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/lib/storage/src/chain/operations/mod.rs b/core/lib/storage/src/chain/operations/mod.rs index 2a7d18cc66..367cbea677 100644 --- a/core/lib/storage/src/chain/operations/mod.rs +++ b/core/lib/storage/src/chain/operations/mod.rs @@ -355,6 +355,7 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { .map(|value| value.tx_hash) .collect(); + println!("Txs len {:?}", tx_hashes.len()); let tx_filter_hashes: HashSet> = sqlx::query!("SELECT DISTINCT tx_hash FROM tx_filters") .fetch_all(transaction.conn()) @@ -362,6 +363,7 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { .into_iter() .map(|value| value.tx_hash) .collect(); + println!("Filters len {:?}", tx_filter_hashes.len()); let difference: Vec> = tx_filter_hashes .difference(&tx_hashes) @@ -369,12 +371,15 @@ impl<'a, 'c> OperationsSchema<'a, 'c> { .cloned() .collect(); + println!("Difference len {:?}", difference.len()); for chunk in difference.chunks(100) { sqlx::query!("DELETE FROM tx_filters WHERE tx_hash = ANY ($1)", chunk) .execute(transaction.conn()) .await?; } + transaction.commit().await?; + Ok(()) } /// Stores executed priority operation in database.