Skip to content

Commit

Permalink
Save data to tx_filters
Browse files Browse the repository at this point in the history
  • Loading branch information
perekopskiy committed Oct 14, 2021
1 parent 2da4e9a commit aeb7cb9
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 19 deletions.
4 changes: 4 additions & 0 deletions core/bin/zksync_api/src/api_server/rest/v02/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,8 @@ impl TestServerConfig {
tx_hash: dummy_ethereum_tx_hash(VERIFIED_OP_SERIAL_ID as i64)
.as_bytes()
.to_vec(),
affected_accounts: vec![Default::default()],
token: 0,
},
// Committed priority operation.
NewExecutedPriorityOperation {
Expand All @@ -679,6 +681,8 @@ impl TestServerConfig {
tx_hash: dummy_ethereum_tx_hash(COMMITTED_OP_SERIAL_ID as i64)
.as_bytes()
.to_vec(),
affected_accounts: vec![Default::default()],
token: 0,
},
];

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
CREATE TABLE tx_filters
(
id BIGSERIAL PRIMARY KEY,
address bytea NOT NULL,
token INTEGER NOT NULL,
tx_hash bytea NOT NULL
tx_hash bytea NOT NULL,
PRIMARY KEY (address, token, tx_hash)
);
14 changes: 14 additions & 0 deletions core/lib/storage/sqlx-data.json
Original file line number Diff line number Diff line change
Expand Up @@ -2971,6 +2971,20 @@
]
}
},
"786c418fd011ff4f7be35ac33e8b495f18ecb34adeb4585bfd3beebdf1939ccf": {
"query": "\n INSERT INTO tx_filters (address, token, tx_hash)\n SELECT u.address, u.token, $3\n FROM UNNEST ($1::bytea[], $2::integer[])\n AS u(address, token)\n ON CONFLICT ON CONSTRAINT tx_filters_pkey DO NOTHING\n ",
"describe": {
"columns": [],
"parameters": {
"Left": [
"ByteaArray",
"Int4Array",
"Bytea"
]
},
"nullable": []
}
},
"790d46519ceaa7fbd152f1edf29b85c97ab491488b7302d8df3f57e5fc3eff55": {
"query": "\n SELECT account_id FROM account_creates\n WHERE address = $1 AND is_create = $2\n ORDER BY block_number desc\n LIMIT 1\n ",
"describe": {
Expand Down
39 changes: 25 additions & 14 deletions core/lib/storage/src/chain/block/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,15 @@ impl NewExecutedPriorityOperation {
),
};

let affected_accounts = exec_prior_op
.priority_op
.data
.affected_accounts()
.into_iter()
.map(|address| address.as_bytes().to_vec())
.collect();
let token = exec_prior_op.priority_op.data.token_id().0 as i32;

Self {
block_number: i64::from(*block),
block_index: exec_prior_op.block_index as i32,
Expand All @@ -136,6 +145,8 @@ impl NewExecutedPriorityOperation {
.eth_block_index
.map(|index| index as i64),
tx_hash,
affected_accounts,
token,
}
}
}
Expand Down Expand Up @@ -195,18 +206,18 @@ impl NewExecutedTransaction {
serde_json::to_value(sign_data).expect("Failed to encode EthSignData")
});

// let affected_accounts = affected_accounts(&exec_tx.signed_tx.tx, storage)
// .await?
// .into_iter()
// .map(|address| address.as_bytes().to_vec())
// .collect();
// let used_tokens = exec_tx
// .signed_tx
// .tx
// .tokens()
// .into_iter()
// .map(|id| id.0 as i32)
// .collect();
let affected_accounts = affected_accounts(&exec_tx.signed_tx.tx, storage)
.await?
.into_iter()
.map(|address| address.as_bytes().to_vec())
.collect();
let used_tokens = exec_tx
.signed_tx
.tx
.tokens()
.into_iter()
.map(|id| id.0 as i32)
.collect();
Ok(Self {
block_number: i64::from(*block),
tx_hash: exec_tx.signed_tx.hash().as_ref().to_vec(),
Expand All @@ -222,8 +233,8 @@ impl NewExecutedTransaction {
created_at: exec_tx.created_at,
eth_sign_data,
batch_id: exec_tx.batch_id,
// affected_accounts,
// used_tokens,
affected_accounts,
used_tokens,
})
}
}
Expand Down
47 changes: 46 additions & 1 deletion core/lib/storage/src/chain/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,30 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
.await?;
};

let mut addresses = Vec::new();
let mut tokens = Vec::new();
for address in operation.affected_accounts {
for token in operation.used_tokens.iter() {
addresses.push(address.clone());
tokens.push(*token);
}
}

sqlx::query!(
"
INSERT INTO tx_filters (address, token, tx_hash)
SELECT u.address, u.token, $3
FROM UNNEST ($1::bytea[], $2::integer[])
AS u(address, token)
ON CONFLICT ON CONSTRAINT tx_filters_pkey DO NOTHING
",
&addresses,
&tokens,
&operation.tx_hash
)
.execute(transaction.conn())
.await?;

transaction.commit().await?;
metrics::histogram!("sql.chain.operations.store_executed_tx", start.elapsed());
Ok(())
Expand Down Expand Up @@ -267,6 +291,7 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
operation: NewExecutedPriorityOperation,
) -> QueryResult<()> {
let start = Instant::now();
let mut transaction = self.0.start_transaction().await?;

sqlx::query!(
"INSERT INTO executed_priority_operations (block_number, block_index, operation, from_account, to_account,
Expand All @@ -287,8 +312,28 @@ impl<'a, 'c> OperationsSchema<'a, 'c> {
operation.eth_block_index,
operation.tx_hash,
)
.execute(self.0.conn())
.execute(transaction.conn())
.await?;

let mut tokens = Vec::new();
tokens.resize(operation.affected_accounts.len(), operation.token);

sqlx::query!(
"
INSERT INTO tx_filters (address, token, tx_hash)
SELECT u.address, u.token, $3
FROM UNNEST ($1::bytea[], $2::integer[])
AS u(address, token)
ON CONFLICT ON CONSTRAINT tx_filters_pkey DO NOTHING
",
&operation.affected_accounts,
&tokens,
&operation.tx_hash
)
.execute(transaction.conn())
.await?;

transaction.commit().await?;
metrics::histogram!(
"sql.chain.operations.store_executed_priority_op",
start.elapsed()
Expand Down
6 changes: 4 additions & 2 deletions core/lib/storage/src/chain/operations/records.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct NewExecutedPriorityOperation {
/// This field must be optional because of backward compatibility.
pub eth_block_index: Option<i64>,
pub tx_hash: Vec<u8>,
pub affected_accounts: Vec<Vec<u8>>,
pub token: i32,
}

#[derive(Debug, Clone)]
Expand All @@ -89,8 +91,8 @@ pub struct NewExecutedTransaction {
pub created_at: DateTime<Utc>,
pub eth_sign_data: Option<serde_json::Value>,
pub batch_id: Option<i64>,
// pub affected_accounts: Vec<Vec<u8>>,
// pub used_tokens: Vec<i32>,
pub affected_accounts: Vec<Vec<u8>>,
pub used_tokens: Vec<i32>,
}

#[derive(Debug, Clone)]
Expand Down
4 changes: 4 additions & 0 deletions core/lib/storage/src/tests/chain/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ async fn collect_garbage(mut storage: StorageProcessor<'_>) -> QueryResult<()> {
created_at: chrono::Utc::now(),
eth_sign_data: None,
batch_id: None,
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
};
OperationsSchema(&mut storage)
.store_executed_tx(executed_tx)
Expand Down Expand Up @@ -455,6 +457,8 @@ async fn test_return_executed_txs_to_mempool(mut storage: StorageProcessor<'_>)
created_at: chrono::Utc::now(),
eth_sign_data: None,
batch_id: None,
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
};

OperationsSchema(&mut storage)
Expand Down
16 changes: 16 additions & 0 deletions core/lib/storage/src/tests/chain/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ async fn executed_operations(mut storage: StorageProcessor<'_>) -> QueryResult<(
created_at: chrono::Utc::now(),
eth_sign_data: None,
batch_id: Some(10),
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
};

OperationsSchema(&mut storage)
Expand Down Expand Up @@ -106,6 +108,8 @@ async fn executed_priority_operations(mut storage: StorageProcessor<'_>) -> Quer
created_at: chrono::Utc::now(),
tx_hash: Default::default(),
eth_block_index: Some(1),
affected_accounts: Default::default(),
token: Default::default(),
};
OperationsSchema(&mut storage)
.store_executed_priority_op(executed_tx.clone())
Expand Down Expand Up @@ -151,6 +155,8 @@ async fn duplicated_operations(mut storage: StorageProcessor<'_>) -> QueryResult
created_at: chrono::Utc::now(),
eth_sign_data: None,
batch_id: None,
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
};

let executed_priority_op = NewExecutedPriorityOperation {
Expand All @@ -166,6 +172,8 @@ async fn duplicated_operations(mut storage: StorageProcessor<'_>) -> QueryResult
created_at: chrono::Utc::now(),
tx_hash: Default::default(),
eth_block_index: Some(1),
affected_accounts: Default::default(),
token: Default::default(),
};

// Save the same operations twice.
Expand Down Expand Up @@ -222,6 +230,8 @@ async fn transaction_resent(mut storage: StorageProcessor<'_>) -> QueryResult<()
created_at: chrono::Utc::now(),
eth_sign_data: None,
batch_id: None,
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
};

// Save the failed operation.
Expand Down Expand Up @@ -300,6 +310,8 @@ async fn remove_rejected_transactions(mut storage: StorageProcessor<'_>) -> Quer
created_at: timestamp_1,
eth_sign_data: None,
batch_id: None,
affected_accounts: Vec::new(),
used_tokens: Vec::new(),
};
let timestamp_2 = timestamp_1 - Duration::weeks(1);
let mut executed_tx_2 = executed_tx_1.clone();
Expand Down Expand Up @@ -391,6 +403,8 @@ async fn priority_ops_hashes(mut storage: StorageProcessor<'_>) -> QueryResult<(
created_at: chrono::Utc::now(),
tx_hash: vec![0xBB, 0xBB, 0xBB, 0xBB],
eth_block_index: Some(1),
affected_accounts: Default::default(),
token: Default::default(),
};
// Store executed priority op and try to get it by `eth_hash`.
storage
Expand Down Expand Up @@ -439,6 +453,8 @@ async fn test_remove_executed_priority_operations(
created_at: chrono::Utc::now(),
eth_block_index: Some(1),
tx_hash: Default::default(),
affected_accounts: Default::default(),
token: Default::default(),
};
OperationsSchema(&mut storage)
.store_executed_priority_op(executed_priority_op)
Expand Down

0 comments on commit aeb7cb9

Please sign in to comment.