Skip to content

Commit

Permalink
Merge #2143
Browse files Browse the repository at this point in the history
2143: Remove tx_filters with rejected txs r=Deniallugo a=Deniallugo

Signed-off-by: deniallugo <[email protected]>

Co-authored-by: deniallugo <[email protected]>
  • Loading branch information
bors-matterlabs-dev[bot] and Deniallugo authored Jan 26, 2022
2 parents 7e8723a + 40b9e16 commit 5d02d65
Show file tree
Hide file tree
Showing 8 changed files with 282 additions and 28 deletions.
15 changes: 15 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ members = [
"core/bin/parse_pub_data",
"core/bin/block_revert",
"core/bin/remove_proofs",

"core/bin/remove_outstanding_tx_filters",
# Server micro-services
"core/bin/zksync_api",
"core/bin/zksync_core",
Expand Down
26 changes: 26 additions & 0 deletions core/bin/remove_outstanding_tx_filters/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "remove_outstanding_tx_filters"
version = "0.1.0"
edition = "2021"
authors = ["The Matter Labs Team <[email protected]>"]
homepage = "https://zksync.io/"
repository = "https://github.com/matter-labs/zksync"
license = "Apache-2.0"
keywords = ["blockchain", "zksync"]
categories = ["cryptography"]
publish = false # We don't want to publish our binaries.

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

# TODO remove it ZKS-931
[dependencies]
zksync_types = { path = "../../lib/types", version = "1.0" }
zksync_storage = { path = "../../lib/storage", version = "1.0" }
zksync_eth_client = { path = "../../lib/eth_client", version = "1.0" }
zksync_config = { path = "../../lib/config", version = "1.0" }

tokio = { version = "1", features = ["full"] }
ethabi = "14.0.0"
anyhow = "1.0"
web3 = "0.16.0"
structopt = "0.3.20"
12 changes: 12 additions & 0 deletions core/bin/remove_outstanding_tx_filters/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use zksync_storage::StorageProcessor;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut storage = StorageProcessor::establish_connection().await?;
storage
.chain()
.operations_schema()
.remove_outstanding_tx_filters()
.await?;
Ok(())
}
2 changes: 1 addition & 1 deletion core/lib/config/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl DBConfig {
}

pub fn rejected_transactions_cleaner_interval(&self) -> time::Duration {
time::Duration::from_secs(self.rejected_transactions_cleaner_interval * Self::SECS_PER_HOUR)
time::Duration::from_secs(self.rejected_transactions_cleaner_interval)
}
}

Expand Down
92 changes: 80 additions & 12 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,24 @@
"nullable": []
}
},
"11decdd07f890f291a8526e42e110ed277ff894b1b5b52f319001c9d8311e954": {
"query": "\n WITH transactions AS (\n SELECT tx_hash\n FROM executed_transactions\n ), priority_ops AS (\n SELECT tx_hash\n FROM executed_priority_operations\n ), everything AS (\n SELECT * FROM transactions\n UNION ALL\n SELECT * FROM priority_ops\n )\n SELECT\n tx_hash as \"tx_hash!\"\n FROM everything",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash!",
"type_info": "Bytea"
}
],
"parameters": {
"Left": []
},
"nullable": [
null
]
}
},
"1401ea10d9e110da48aac1ebfa7aeb855c273adf34f6ee92b0fdaaf7de603049": {
"query": "\n SELECT tx_hash, created_at\n FROM mempool_txs\n INNER JOIN txs_batches_hashes\n ON txs_batches_hashes.batch_id = mempool_txs.batch_id\n WHERE batch_hash = $1\n ORDER BY id ASC\n ",
"describe": {
Expand Down Expand Up @@ -2854,6 +2872,18 @@
"nullable": []
}
},
"5e5becde03270ceb82f605ea94c70dac192e9a0f7dd2c918d8dc26d1902d2067": {
"query": "DELETE FROM tx_filters WHERE tx_hash = ANY ($1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray"
]
},
"nullable": []
}
},
"5fac3f8e9ad91897751e7f14c56723f24d1c85ed146679296525e667b55b3947": {
"query": "\n SELECT id, address, decimals, kind as \"kind: _\", symbol FROM tokens\n WHERE id >= $1 AND kind = 'ERC20'::token_kind\n ORDER BY id ASC\n LIMIT $2\n ",
"describe": {
Expand Down Expand Up @@ -3778,6 +3808,24 @@
]
}
},
"7fe39c62da8c820d95f98238a0f206970328e97a9eed6f532e97d2346074941e": {
"query": "SELECT DISTINCT tx_hash FROM tx_filters",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
}
],
"parameters": {
"Left": []
},
"nullable": [
false
]
}
},
"7ff98a4fddc441ea83f72a4a75a7caf53b9661c37f26a90984a349bfa5aeab70": {
"query": "INSERT INTO eth_aggregated_ops_binding (op_id, eth_op_id) VALUES ($1, $2)",
"describe": {
Expand Down Expand Up @@ -4823,18 +4871,6 @@
"nullable": []
}
},
"a545c7251d381afbb2bdcbaef0cc5f1bc322db47ee0db555b83b9e19119bc756": {
"query": "DELETE FROM executed_transactions WHERE tx_hash IN (SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": []
}
},
"a665923ec57382f357f6bb65f6e35876fbfedbf1661b3ce34f2458b63eebc68e": {
"query": "\n INSERT INTO subsidies ( tx_hash, usd_amount_scale6, full_cost_usd_scale6, token_id, token_amount, full_cost_token, subsidy_type )\n VALUES ( $1, $2, $3, $4, $5, $6, $7 )\n ",
"describe": {
Expand Down Expand Up @@ -5707,6 +5743,18 @@
]
}
},
"c31936ecaa097fc0711fa24e79ee415bfc3da855f29b2138ecbaced1341d5e7f": {
"query": "DELETE FROM executed_transactions WHERE tx_hash = ANY ($1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray"
]
},
"nullable": []
}
},
"c3632674ee6614b83e258c75447dc986507481a56fbdd5e05dedd0775f21fb79": {
"query": "\n SELECT\n token_id as \"token_id!\", creator_account_id as \"creator_account_id!\",\n creator_address as \"creator_address!\", serial_id as \"serial_id!\",\n nft.address as \"address!\", content_hash as \"content_hash!\",\n tokens.symbol as \"symbol!\"\n FROM nft\n INNER JOIN tokens\n ON tokens.id = nft.token_id\n ",
"describe": {
Expand Down Expand Up @@ -7655,6 +7703,26 @@
]
}
},
"f57d1b54785c52a96afa9a95c4f1a08808732c9aaa850c32ed7d1158fdf541c4": {
"query": "\n SELECT tx_hash FROM executed_transactions \n WHERE success = false AND created_at < $1 LIMIT 1000",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": [
false
]
}
},
"f5a24f01f525ede5d8e61b97e452a82d372c2bececacf693ab654eef0e453d94": {
"query": "SELECT max(to_block) from aggregate_operations where action_type = $1",
"describe": {
Expand Down
105 changes: 101 additions & 4 deletions core/lib/storage/src/chain/operations/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::HashSet;
// Built-in deps
use std::time::Instant;
// External imports
Expand Down Expand Up @@ -240,7 +241,6 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
tokens.push(*token);
}
}

sqlx::query!(
"
INSERT INTO tx_filters (address, token, tx_hash)
Expand Down Expand Up @@ -270,21 +270,118 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
pub async fn remove_rejected_transactions(&mut self, max_age: Duration) -> QueryResult<()> {
let start = Instant::now();

let mut transaction = self.0.start_transaction().await?;
let offset = Utc::now() - max_age;
sqlx::query!(
"DELETE FROM executed_transactions WHERE tx_hash IN (SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000)",
let tx_hashes: Vec<Vec<u8>> = sqlx::query!(
r#"
SELECT tx_hash FROM executed_transactions
WHERE success = false AND created_at < $1 LIMIT 1000"#,
offset
)
.execute(self.0.conn())
.fetch_all(transaction.conn())
.await?
.into_iter()
.map(|value| value.tx_hash)
.collect();

sqlx::query!(
"DELETE FROM executed_transactions WHERE tx_hash = ANY ($1)",
&tx_hashes
)
.execute(transaction.conn())
.await?;
sqlx::query!(
"DELETE FROM tx_filters WHERE tx_hash = ANY ($1)",
&tx_hashes
)
.execute(transaction.conn())
.await?;

transaction.commit().await?;

metrics::histogram!(
"sql.chain.operations.remove_rejected_transactions",
start.elapsed()
);
Ok(())
}

// TODO remove it ZKS-931
pub async fn remove_outstanding_tx_filters(&mut self) -> QueryResult<()> {
// We can do something like this, but this query will block tx_filter table for a long long time.
// So I have to rewrite this logic to rust
// sqlx::query!(
// r#"DELETE FROM tx_filters WHERE tx_hash NOT IN (
// WITH transactions AS (
// SELECT tx_hash
// FROM executed_transactions
// ), priority_ops AS (
// SELECT tx_hash
// FROM executed_priority_operations
// ), everything AS (
// SELECT * FROM transactions
// UNION ALL
// SELECT * FROM priority_ops
// )
// SELECT
// tx_hash as "tx_hash!"
// FROM everything
// )"#
// )
// .execute(self.0.conn())
// .await?;

let mut transaction = self.0.start_transaction().await?;
let tx_hashes: HashSet<Vec<u8>> = sqlx::query!(
r#"
WITH transactions AS (
SELECT tx_hash
FROM executed_transactions
), priority_ops AS (
SELECT tx_hash
FROM executed_priority_operations
), everything AS (
SELECT * FROM transactions
UNION ALL
SELECT * FROM priority_ops
)
SELECT
tx_hash as "tx_hash!"
FROM everything"#
)
.fetch_all(transaction.conn())
.await?
.into_iter()
.map(|value| value.tx_hash)
.collect();

println!("Txs len {:?}", tx_hashes.len());
let tx_filter_hashes: HashSet<Vec<u8>> =
sqlx::query!("SELECT DISTINCT tx_hash FROM tx_filters")
.fetch_all(transaction.conn())
.await?
.into_iter()
.map(|value| value.tx_hash)
.collect();
println!("Filters len {:?}", tx_filter_hashes.len());

let difference: Vec<Vec<u8>> = tx_filter_hashes
.difference(&tx_hashes)
.into_iter()
.cloned()
.collect();

println!("Difference len {:?}", difference.len());
for chunk in difference.chunks(100) {
sqlx::query!("DELETE FROM tx_filters WHERE tx_hash = ANY ($1)", chunk)
.execute(transaction.conn())
.await?;
}

transaction.commit().await?;

Ok(())
}
/// Stores executed priority operation in database.
///
/// This method is made public to fill the database for tests, do not use it for
Expand Down
Loading

0 comments on commit 5d02d65

Please sign in to comment.