Skip to content

Commit

Permalink
Remove tx_filters with rejected txs
Browse files Browse the repository at this point in the history
Signed-off-by: deniallugo <[email protected]>
  • Loading branch information
Deniallugo committed Jan 25, 2022
1 parent 7e8723a commit ce94ac5
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 27 deletions.
2 changes: 1 addition & 1 deletion core/lib/config/src/configs/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ impl DBConfig {
}

pub fn rejected_transactions_cleaner_interval(&self) -> time::Duration {
time::Duration::from_secs(self.rejected_transactions_cleaner_interval * Self::SECS_PER_HOUR)
time::Duration::from_secs(self.rejected_transactions_cleaner_interval)
}
}

Expand Down
56 changes: 44 additions & 12 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2854,6 +2854,18 @@
"nullable": []
}
},
"5e5becde03270ceb82f605ea94c70dac192e9a0f7dd2c918d8dc26d1902d2067": {
"query": "DELETE FROM tx_filters WHERE tx_hash = ANY ($1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray"
]
},
"nullable": []
}
},
"5fac3f8e9ad91897751e7f14c56723f24d1c85ed146679296525e667b55b3947": {
"query": "\n SELECT id, address, decimals, kind as \"kind: _\", symbol FROM tokens\n WHERE id >= $1 AND kind = 'ERC20'::token_kind\n ORDER BY id ASC\n LIMIT $2\n ",
"describe": {
Expand Down Expand Up @@ -4823,18 +4835,6 @@
"nullable": []
}
},
"a545c7251d381afbb2bdcbaef0cc5f1bc322db47ee0db555b83b9e19119bc756": {
"query": "DELETE FROM executed_transactions WHERE tx_hash IN (SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": []
}
},
"a665923ec57382f357f6bb65f6e35876fbfedbf1661b3ce34f2458b63eebc68e": {
"query": "\n INSERT INTO subsidies ( tx_hash, usd_amount_scale6, full_cost_usd_scale6, token_id, token_amount, full_cost_token, subsidy_type )\n VALUES ( $1, $2, $3, $4, $5, $6, $7 )\n ",
"describe": {
Expand Down Expand Up @@ -5707,6 +5707,18 @@
]
}
},
"c31936ecaa097fc0711fa24e79ee415bfc3da855f29b2138ecbaced1341d5e7f": {
"query": "DELETE FROM executed_transactions WHERE tx_hash = ANY ($1)",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray"
]
},
"nullable": []
}
},
"c3632674ee6614b83e258c75447dc986507481a56fbdd5e05dedd0775f21fb79": {
"query": "\n SELECT\n token_id as \"token_id!\", creator_account_id as \"creator_account_id!\",\n creator_address as \"creator_address!\", serial_id as \"serial_id!\",\n nft.address as \"address!\", content_hash as \"content_hash!\",\n tokens.symbol as \"symbol!\"\n FROM nft\n INNER JOIN tokens\n ON tokens.id = nft.token_id\n ",
"describe": {
Expand Down Expand Up @@ -7004,6 +7016,26 @@
]
}
},
"e8c69e16fe0e447d578dee61807b418ee4e8d5aebb60034cbc640eafceb593e2": {
"query": "SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000",
"describe": {
"columns": [
{
"ordinal": 0,
"name": "tx_hash",
"type_info": "Bytea"
}
],
"parameters": {
"Left": [
"Timestamptz"
]
},
"nullable": [
false
]
}
},
"e99d990d2d9b1c6068efb623634d6d6cf49a3c7ec33a5a916b7ddaa745e24c9b": {
"query": "\n SELECT * FROM prover_job_queue\n WHERE job_status = $1\n ORDER BY (job_priority, id, first_block)\n LIMIT 1\n ",
"describe": {
Expand Down
23 changes: 19 additions & 4 deletions core/lib/storage/src/chain/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
tokens.push(*token);
}
}

sqlx::query!(
"
INSERT INTO tx_filters (address, token, tx_hash)
Expand Down Expand Up @@ -270,13 +269,29 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
pub async fn remove_rejected_transactions(&mut self, max_age: Duration) -> QueryResult<()> {
let start = Instant::now();

let mut transaction = self.0.start_transaction().await?;
let offset = Utc::now() - max_age;
let tx_hashes: Vec<Vec<u8>> = sqlx::query!("SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000", offset).fetch_all(transaction.conn()).await?
.into_iter()
.map(|value| {
value.tx_hash
})
.collect();

sqlx::query!(
"DELETE FROM executed_transactions WHERE tx_hash IN (SELECT tx_hash FROM executed_transactions WHERE success = false AND created_at < $1 LIMIT 1000)",
offset
"DELETE FROM executed_transactions WHERE tx_hash = ANY ($1)",
&tx_hashes
)
.execute(self.0.conn())
.execute(transaction.conn())
.await?;
sqlx::query!(
"DELETE FROM tx_filters WHERE tx_hash = ANY ($1)",
&tx_hashes
)
.execute(transaction.conn())
.await?;

transaction.commit().await?;

metrics::histogram!(
"sql.chain.operations.remove_rejected_transactions",
Expand Down
56 changes: 46 additions & 10 deletions core/lib/storage/src/tests/chain/operations.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// External imports
use chrono::{Duration, Utc};
// Workspace imports
use zksync_types::{aggregated_operations::AggregatedActionType, BlockNumber};
use zksync_types::{aggregated_operations::AggregatedActionType, Address, BlockNumber};
// Local imports
use crate::{
chain::{
Expand Down Expand Up @@ -297,7 +297,7 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
let timestamp_1 = Utc::now() - Duration::weeks(1);
let executed_tx_1 = NewExecutedTransaction {
block_number: BLOCK_NUMBER,
tx_hash: vec![0x12, 0xAD, 0xBE, 0xEF],
tx_hash: vec![1, 2, 3, 4],
tx: Default::default(),
operation: Default::default(),
from_account: Default::default(),
Expand All @@ -310,19 +310,24 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
created_at: timestamp_1,
eth_sign_data: None,
batch_id: None,
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
affected_accounts: vec![Address::zero().as_bytes().to_vec()],
used_tokens: vec![0],
};
let timestamp_2 = timestamp_1 - Duration::weeks(1);
let mut executed_tx_2 = executed_tx_1.clone();
// Set new timestamp and different tx_hash since it's a PK.
executed_tx_2.created_at = timestamp_2;
executed_tx_2.tx_hash = vec![0, 11, 21, 5];
// Successful one.

let mut executed_tx_3 = executed_tx_1.clone();
executed_tx_3.success = true;
executed_tx_3.tx_hash = vec![1, 1, 2, 30];
executed_tx_3.created_at = timestamp_2 - Duration::weeks(1);
executed_tx_3.success = false;
executed_tx_3.tx_hash = vec![10, 2, 4, 30];
executed_tx_3.created_at = timestamp_2;
// Successful one.
let mut executed_tx_4 = executed_tx_1.clone();
executed_tx_4.success = true;
executed_tx_4.tx_hash = vec![1, 1, 2, 30];
executed_tx_4.created_at = timestamp_2 - Duration::weeks(1);
// Store them.
storage
.chain()
Expand All @@ -339,6 +344,11 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
.operations_schema()
.store_executed_tx(executed_tx_3)
.await?;
storage
.chain()
.operations_schema()
.store_executed_tx(executed_tx_4)
.await?;
// First check, no transactions removed.
storage
.chain()
Expand All @@ -351,8 +361,16 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
.stats_schema()
.count_outstanding_proofs(block_number)
.await?;
assert_eq!(count, 3);
// Second transaction should be removed.

let count_tx_filters = storage
.chain()
.operations_ext_schema()
.get_account_transactions_count(Default::default(), None, None)
.await?;
assert_eq!(count, 4);
assert_eq!(count_tx_filters, 4);

// Second and third transaction should be removed.
storage
.chain()
.operations_schema()
Expand All @@ -363,7 +381,13 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
.stats_schema()
.count_outstanding_proofs(block_number)
.await?;
let count_tx_filters = storage
.chain()
.operations_ext_schema()
.get_account_transactions_count(Default::default(), None, None)
.await?;
assert_eq!(count, 2);
assert_eq!(count_tx_filters, 2);
// Finally, no rejected transactions remain.
storage
.chain()
Expand All @@ -375,14 +399,26 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
.stats_schema()
.count_outstanding_proofs(block_number)
.await?;
let count_tx_filters = storage
.chain()
.operations_ext_schema()
.get_account_transactions_count(Default::default(), None, None)
.await?;
assert_eq!(count, 1);
assert_eq!(count_tx_filters, 1);
// The last one is indeed succesful.
let count = storage
.chain()
.stats_schema()
.count_total_transactions()
.await?;
let count_tx_filters = storage
.chain()
.operations_ext_schema()
.get_account_transactions_count(Default::default(), None, None)
.await?;
assert_eq!(count, 1);
assert_eq!(count_tx_filters, 1);

Ok(())
}
Expand Down

0 comments on commit ce94ac5

Please sign in to comment.