From cdb11958a6dc923ccecc35c6e8e632c493938eff Mon Sep 17 00:00:00 2001 From: Igor Aleksanov Date: Mon, 28 Mar 2022 14:59:31 +0300 Subject: [PATCH] Create two separate schemas for tree caches --- core/lib/storage/src/chain/mod.rs | 8 + .../src/chain/tree_cache/bincode_schema.rs | 171 +++++++++++++++++ .../src/chain/tree_cache/json_schema.rs | 176 ++++++++++++++++++ core/lib/storage/src/chain/tree_cache/mod.rs | 155 +-------------- .../storage/src/chain/tree_cache/records.rs | 11 +- .../lib/storage/src/tests/chain/tree_cache.rs | 103 ++++++++-- 6 files changed, 460 insertions(+), 164 deletions(-) create mode 100644 core/lib/storage/src/chain/tree_cache/bincode_schema.rs create mode 100644 core/lib/storage/src/chain/tree_cache/json_schema.rs diff --git a/core/lib/storage/src/chain/mod.rs b/core/lib/storage/src/chain/mod.rs index 8d7a871722..8954d50b79 100644 --- a/core/lib/storage/src/chain/mod.rs +++ b/core/lib/storage/src/chain/mod.rs @@ -42,4 +42,12 @@ impl<'a, 'c> ChainIntermediator<'a, 'c> { pub fn mempool_schema(self) -> mempool::MempoolSchema<'a, 'c> { mempool::MempoolSchema(self.0) } + + pub fn tree_cache_schema_json(self) -> tree_cache::TreeCacheSchemaJSON<'a, 'c> { + tree_cache::TreeCacheSchemaJSON(self.0) + } + + pub fn tree_cache_schema_bincode(self) -> tree_cache::TreeCacheSchemaBincode<'a, 'c> { + tree_cache::TreeCacheSchemaBincode(self.0) + } } diff --git a/core/lib/storage/src/chain/tree_cache/bincode_schema.rs b/core/lib/storage/src/chain/tree_cache/bincode_schema.rs new file mode 100644 index 0000000000..20dbf2c8c7 --- /dev/null +++ b/core/lib/storage/src/chain/tree_cache/bincode_schema.rs @@ -0,0 +1,171 @@ +// Built-in deps +use std::time::Instant; +// External imports +// Workspace imports +use zksync_types::BlockNumber; +// Local imports +use super::records::AccountTreeCache; +use crate::{QueryResult, StorageProcessor}; + +/// Tree cache schema contains methods to store/load Merkle tree cache. +#[derive(Debug)] +pub struct TreeCacheSchemaBincode<'a, 'c>(pub &'a mut StorageProcessor<'c>); + +impl<'a, 'c> TreeCacheSchemaBincode<'a, 'c> { + /// Stores account tree cache for a block. + /// Expects `tree_cache` to be a byte sequence encoded according to the `bincode` protocol. + pub async fn store_account_tree_cache( + &mut self, + block: BlockNumber, + tree_cache: Vec, + ) -> QueryResult<()> { + let start = Instant::now(); + if *block == 0 { + return Ok(()); + } + + sqlx::query!( + " + INSERT INTO account_tree_cache (block, tree_cache_binary) + VALUES ($1, $2) + ON CONFLICT (block) + DO NOTHING + ", + *block as i64, + tree_cache, + ) + .execute(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.bincode.store_account_tree_cache", + start.elapsed() + ); + Ok(()) + } + + /// Gets the number of the latest block that has a stored cache. + /// Returns `None` if there are no caches in the database. + pub async fn get_last_block_with_account_tree_cache( + &mut self, + ) -> QueryResult> { + let start = Instant::now(); + + let last_block_with_cache = sqlx::query!("SELECT MAX(block) FROM account_tree_cache") + .fetch_one(self.0.conn()) + .await? + .max; + + metrics::histogram!( + "sql.chain.tree_cache.bincode.get_last_block_with_account_tree_cache", + start.elapsed() + ); + Ok(last_block_with_cache.map(|block| BlockNumber(block as u32))) + } + + /// Gets the latest stored account tree cache. + /// Returns `None` if there are no caches in the database. + /// Returns the block number and associated cache otherwise. + pub async fn get_account_tree_cache(&mut self) -> QueryResult)>> { + let start = Instant::now(); + let account_tree_cache = sqlx::query_as!( + AccountTreeCache, + " + SELECT * FROM account_tree_cache + ORDER BY block DESC + LIMIT 1 + ", + ) + .fetch_optional(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.bincode.get_account_tree_cache", + start.elapsed() + ); + Ok(account_tree_cache.map(|w| { + assert!( + w.tree_cache_binary.is_some(), + "Binary/bincode schema was used to fetch from table without binary data. Entry: {:?}", + w + ); + ( + BlockNumber(w.block as u32), + w.tree_cache_binary.unwrap(), + ) + })) + } + + /// Gets stored account tree cache for a certain block. + /// Returns `None` if there is no cache for requested block. + pub async fn get_account_tree_cache_block( + &mut self, + block: BlockNumber, + ) -> QueryResult>> { + let start = Instant::now(); + let account_tree_cache = sqlx::query_as!( + AccountTreeCache, + " + SELECT * FROM account_tree_cache + WHERE block = $1 + ", + *block as i64 + ) + .fetch_optional(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.bincode.get_account_tree_cache_block", + start.elapsed() + ); + Ok(account_tree_cache.map(|w| { + assert!( + w.tree_cache_binary.is_some(), + "Binary/bincode schema was used to fetch from table without binary data. Entry: {:?}", + w + ); + + w.tree_cache_binary.unwrap() + })) + } + + // Removes account tree cache for blocks with number greater than `last_block` + pub async fn remove_new_account_tree_cache( + &mut self, + last_block: BlockNumber, + ) -> QueryResult<()> { + let start = Instant::now(); + sqlx::query!( + "DELETE FROM account_tree_cache WHERE block > $1", + *last_block as i64 + ) + .execute(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.bincode.remove_new_account_tree_cache", + start.elapsed() + ); + Ok(()) + } + + // Removes account tree cache for blocks with number less than `last_block` + pub async fn remove_old_account_tree_cache( + &mut self, + last_block: BlockNumber, + ) -> QueryResult<()> { + let start = Instant::now(); + sqlx::query!( + "DELETE FROM account_tree_cache WHERE block < $1", + *last_block as i64 + ) + .execute(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.bincode.remove_old_account_tree_cache", + start.elapsed() + ); + Ok(()) + } +} diff --git a/core/lib/storage/src/chain/tree_cache/json_schema.rs b/core/lib/storage/src/chain/tree_cache/json_schema.rs new file mode 100644 index 0000000000..9150aba193 --- /dev/null +++ b/core/lib/storage/src/chain/tree_cache/json_schema.rs @@ -0,0 +1,176 @@ +// Built-in deps +use std::time::Instant; +// External imports +// Workspace imports +use zksync_types::BlockNumber; +// Local imports +use super::records::AccountTreeCache; +use crate::{QueryResult, StorageProcessor}; + +/// Tree cache schema contains methods to store/load Merkle tree cache. +#[derive(Debug)] +pub struct TreeCacheSchemaJSON<'a, 'c>(pub &'a mut StorageProcessor<'c>); + +impl<'a, 'c> TreeCacheSchemaJSON<'a, 'c> { + /// Stores account tree cache for a block. + /// Expects `tree_cache` to be a valid encoded JSON. + pub async fn store_account_tree_cache( + &mut self, + block: BlockNumber, + tree_cache: String, + ) -> QueryResult<()> { + let start = Instant::now(); + if *block == 0 { + return Ok(()); + } + + sqlx::query!( + " + INSERT INTO account_tree_cache (block, tree_cache) + VALUES ($1, $2) + ON CONFLICT (block) + DO NOTHING + ", + *block as i64, + tree_cache, + ) + .execute(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.json.store_account_tree_cache", + start.elapsed() + ); + Ok(()) + } + + /// Gets the number of the latest block that has a stored cache. + /// Returns `None` if there are no caches in the database. + pub async fn get_last_block_with_account_tree_cache( + &mut self, + ) -> QueryResult> { + let start = Instant::now(); + + let last_block_with_cache = sqlx::query!("SELECT MAX(block) FROM account_tree_cache") + .fetch_one(self.0.conn()) + .await? + .max; + + metrics::histogram!( + "sql.chain.tree_cache.json.get_last_block_with_account_tree_cache", + start.elapsed() + ); + Ok(last_block_with_cache.map(|block| BlockNumber(block as u32))) + } + + /// Gets the latest stored account tree cache. + /// Returns `None` if there are no caches in the database. + /// Returns the block number and associated cache otherwise. + pub async fn get_account_tree_cache( + &mut self, + ) -> QueryResult> { + let start = Instant::now(); + let account_tree_cache = sqlx::query_as!( + AccountTreeCache, + " + SELECT * FROM account_tree_cache + ORDER BY block DESC + LIMIT 1 + ", + ) + .fetch_optional(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.json.get_account_tree_cache", + start.elapsed() + ); + Ok(account_tree_cache.map(|w| { + assert!( + w.tree_cache.is_some(), + "JSON schema was used to fetch from table without JSON data. Entry: {:?}", + w + ); + + ( + BlockNumber(w.block as u32), + serde_json::from_str(&w.tree_cache.unwrap()) + .expect("Failed to deserialize Account Tree Cache"), + ) + })) + } + + /// Gets stored account tree cache for a certain block. + /// Returns `None` if there is no cache for requested block. + pub async fn get_account_tree_cache_block( + &mut self, + block: BlockNumber, + ) -> QueryResult> { + let start = Instant::now(); + let account_tree_cache = sqlx::query_as!( + AccountTreeCache, + " + SELECT * FROM account_tree_cache + WHERE block = $1 + ", + *block as i64 + ) + .fetch_optional(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.json.get_account_tree_cache_block", + start.elapsed() + ); + Ok(account_tree_cache.map(|w| { + assert!( + w.tree_cache.is_some(), + "JSON schema was used to fetch from table without JSON data. Entry: {:?}", + w + ); + + serde_json::from_str(&w.tree_cache.unwrap()) + .expect("Failed to deserialize Account Tree Cache") + })) + } + + // Removes account tree cache for blocks with number greater than `last_block` + pub async fn remove_new_account_tree_cache( + &mut self, + last_block: BlockNumber, + ) -> QueryResult<()> { + let start = Instant::now(); + sqlx::query!( + "DELETE FROM account_tree_cache WHERE block > $1", + *last_block as i64 + ) + .execute(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.json.remove_new_account_tree_cache", + start.elapsed() + ); + Ok(()) + } + + // Removes account tree cache for blocks with number less than `last_block` + pub async fn remove_old_account_tree_cache( + &mut self, + last_block: BlockNumber, + ) -> QueryResult<()> { + let start = Instant::now(); + sqlx::query!( + "DELETE FROM account_tree_cache WHERE block < $1", + *last_block as i64 + ) + .execute(self.0.conn()) + .await?; + + metrics::histogram!( + "sql.chain.tree_cache.json.remove_old_account_tree_cache", + start.elapsed() + ); + Ok(()) + } +} diff --git a/core/lib/storage/src/chain/tree_cache/mod.rs b/core/lib/storage/src/chain/tree_cache/mod.rs index 5e64b91ec1..3e4d799136 100644 --- a/core/lib/storage/src/chain/tree_cache/mod.rs +++ b/core/lib/storage/src/chain/tree_cache/mod.rs @@ -1,159 +1,10 @@ // Built-in deps -use std::time::Instant; // External imports // Workspace imports -use zksync_types::BlockNumber; // Local imports -use self::records::AccountTreeCache; -use crate::{QueryResult, StorageProcessor}; +mod bincode_schema; +mod json_schema; pub mod records; -/// Tree cache schema contains methods to store/load Merkle tree cache. -#[derive(Debug)] -pub struct TreeCacheSchema<'a, 'c>(pub &'a mut StorageProcessor<'c>); - -impl<'a, 'c> TreeCacheSchema<'a, 'c> { - /// Stores account tree cache for a block. - /// Expects `tree_cache` to be a valid encoded JSON. - pub async fn store_account_tree_cache( - &mut self, - block: BlockNumber, - tree_cache: String, - ) -> QueryResult<()> { - let start = Instant::now(); - if *block == 0 { - return Ok(()); - } - - sqlx::query!( - " - INSERT INTO account_tree_cache (block, tree_cache) - VALUES ($1, $2) - ON CONFLICT (block) - DO NOTHING - ", - *block as i64, - tree_cache, - ) - .execute(self.0.conn()) - .await?; - - metrics::histogram!("sql.chain.block.store_account_tree_cache", start.elapsed()); - Ok(()) - } - - /// Gets the number of the latest block that has a stored cache. - /// Returns `None` if there are no caches in the database. - pub async fn get_last_block_with_account_tree_cache( - &mut self, - ) -> QueryResult> { - let start = Instant::now(); - - let last_block_with_cache = sqlx::query!("SELECT MAX(block) FROM account_tree_cache") - .fetch_one(self.0.conn()) - .await? - .max; - - metrics::histogram!( - "sql.chain.block.get_last_block_with_account_tree_cache", - start.elapsed() - ); - Ok(last_block_with_cache.map(|block| BlockNumber(block as u32))) - } - - /// Gets the latest stored account tree cache. - /// Returns `None` if there are no caches in the database. - /// Returns the block number and associated cache otherwise. - pub async fn get_account_tree_cache( - &mut self, - ) -> QueryResult> { - let start = Instant::now(); - let account_tree_cache = sqlx::query_as!( - AccountTreeCache, - " - SELECT * FROM account_tree_cache - ORDER BY block DESC - LIMIT 1 - ", - ) - .fetch_optional(self.0.conn()) - .await?; - - metrics::histogram!("sql.chain.block.get_account_tree_cache", start.elapsed()); - Ok(account_tree_cache.map(|w| { - ( - BlockNumber(w.block as u32), - serde_json::from_str(&w.tree_cache) - .expect("Failed to deserialize Account Tree Cache"), - ) - })) - } - - /// Gets stored account tree cache for a certain block. - /// Returns `None` if there is no cache for requested block. - pub async fn get_account_tree_cache_block( - &mut self, - block: BlockNumber, - ) -> QueryResult> { - let start = Instant::now(); - let account_tree_cache = sqlx::query_as!( - AccountTreeCache, - " - SELECT * FROM account_tree_cache - WHERE block = $1 - ", - *block as i64 - ) - .fetch_optional(self.0.conn()) - .await?; - - metrics::histogram!( - "sql.chain.block.get_account_tree_cache_block", - start.elapsed() - ); - Ok(account_tree_cache.map(|w| { - serde_json::from_str(&w.tree_cache).expect("Failed to deserialize Account Tree Cache") - })) - } - - // Removes account tree cache for blocks with number greater than `last_block` - pub async fn remove_new_account_tree_cache( - &mut self, - last_block: BlockNumber, - ) -> QueryResult<()> { - let start = Instant::now(); - sqlx::query!( - "DELETE FROM account_tree_cache WHERE block > $1", - *last_block as i64 - ) - .execute(self.0.conn()) - .await?; - - metrics::histogram!( - "sql.chain.block.remove_new_account_tree_cache", - start.elapsed() - ); - Ok(()) - } - - // Removes account tree cache for blocks with number less than `last_block` - pub async fn remove_old_account_tree_cache( - &mut self, - last_block: BlockNumber, - ) -> QueryResult<()> { - let start = Instant::now(); - sqlx::query!( - "DELETE FROM account_tree_cache WHERE block < $1", - *last_block as i64 - ) - .execute(self.0.conn()) - .await?; - - metrics::histogram!( - "sql.chain.block.remove_old_account_tree_cache", - start.elapsed() - ); - Ok(()) - } -} +pub use self::{bincode_schema::TreeCacheSchemaBincode, json_schema::TreeCacheSchemaJSON}; diff --git a/core/lib/storage/src/chain/tree_cache/records.rs b/core/lib/storage/src/chain/tree_cache/records.rs index 999490785f..6e4e131ac7 100644 --- a/core/lib/storage/src/chain/tree_cache/records.rs +++ b/core/lib/storage/src/chain/tree_cache/records.rs @@ -3,8 +3,17 @@ use serde::{Deserialize, Serialize}; // Workspace imports // Local imports +/// Stored Merkle tree cache. +/// Can have either cache encoded as JSON (old one) or binary encoded via `bincode` protocol (new one). +/// +/// Old encoding is used in the data restore tool for backward compatibility. +/// New encoding is used in the server itself, since it's much faster and compact. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AccountTreeCache { + /// Number of the block for this cache. pub block: i64, - pub tree_cache: String, + /// JSON encoded cache. + pub tree_cache: Option, + /// Binary (bincode) encoded cache. + pub tree_cache_binary: Option>, } diff --git a/core/lib/storage/src/tests/chain/tree_cache.rs b/core/lib/storage/src/tests/chain/tree_cache.rs index 2adb610e95..610e878812 100644 --- a/core/lib/storage/src/tests/chain/tree_cache.rs +++ b/core/lib/storage/src/tests/chain/tree_cache.rs @@ -3,7 +3,10 @@ use zksync_types::BlockNumber; // Local imports use crate::{ - chain::{block::BlockSchema, tree_cache::TreeCacheSchema}, + chain::{ + block::BlockSchema, + tree_cache::{TreeCacheSchemaBincode, TreeCacheSchemaJSON}, + }, test_data::{gen_sample_block, BLOCK_SIZE_CHUNKS}, tests::db_test, QueryResult, StorageProcessor, @@ -11,7 +14,9 @@ use crate::{ /// Check that account tree cache is removed correctly. #[db_test] -async fn test_remove_old_account_tree_cache(mut storage: StorageProcessor<'_>) -> QueryResult<()> { +async fn test_remove_old_account_tree_cache_json( + mut storage: StorageProcessor<'_>, +) -> QueryResult<()> { // Insert account tree cache for 5 blocks. for block_number in 1..=5 { BlockSchema(&mut storage) @@ -21,7 +26,7 @@ async fn test_remove_old_account_tree_cache(mut storage: StorageProcessor<'_>) - Default::default(), )) .await?; - TreeCacheSchema(&mut storage) + TreeCacheSchemaJSON(&mut storage) .store_account_tree_cache( BlockNumber(block_number), serde_json::Value::default().to_string(), @@ -30,16 +35,16 @@ async fn test_remove_old_account_tree_cache(mut storage: StorageProcessor<'_>) - } // Remove account tree cache for blocks with numbers greater than 2. - TreeCacheSchema(&mut storage) + TreeCacheSchemaJSON(&mut storage) .remove_old_account_tree_cache(BlockNumber(3)) .await?; // Check that the account tree cache for block #3 is present, and for block #1 is not. - assert!(TreeCacheSchema(&mut storage) + assert!(TreeCacheSchemaJSON(&mut storage) .get_account_tree_cache_block(BlockNumber(3)) .await? .is_some()); - assert!(TreeCacheSchema(&mut storage) + assert!(TreeCacheSchemaJSON(&mut storage) .get_account_tree_cache_block(BlockNumber(1)) .await? .is_none()); @@ -49,7 +54,9 @@ async fn test_remove_old_account_tree_cache(mut storage: StorageProcessor<'_>) - /// Check that account tree cache is removed correctly. #[db_test] -async fn test_remove_new_account_tree_cache(mut storage: StorageProcessor<'_>) -> QueryResult<()> { +async fn test_remove_new_account_tree_cache_json( + mut storage: StorageProcessor<'_>, +) -> QueryResult<()> { // Insert account tree cache for 5 blocks. for block_number in 1..=5 { BlockSchema(&mut storage) @@ -59,7 +66,7 @@ async fn test_remove_new_account_tree_cache(mut storage: StorageProcessor<'_>) - Default::default(), )) .await?; - TreeCacheSchema(&mut storage) + TreeCacheSchemaJSON(&mut storage) .store_account_tree_cache( BlockNumber(block_number), serde_json::Value::default().to_string(), @@ -68,16 +75,90 @@ async fn test_remove_new_account_tree_cache(mut storage: StorageProcessor<'_>) - } // Remove account tree cache for blocks with numbers greater than 2. - TreeCacheSchema(&mut storage) + TreeCacheSchemaJSON(&mut storage) .remove_new_account_tree_cache(BlockNumber(2)) .await?; // Check if account tree cache for the 2nd block is present, and for the 3rd is not. - assert!(TreeCacheSchema(&mut storage) + assert!(TreeCacheSchemaJSON(&mut storage) .get_account_tree_cache_block(BlockNumber(2)) .await? .is_some()); - assert!(TreeCacheSchema(&mut storage) + assert!(TreeCacheSchemaJSON(&mut storage) + .get_account_tree_cache_block(BlockNumber(3)) + .await? + .is_none()); + + Ok(()) +} + +/// Check that account tree cache is removed correctly. +#[db_test] +async fn test_remove_old_account_tree_cache_bincode( + mut storage: StorageProcessor<'_>, +) -> QueryResult<()> { + // Insert account tree cache for 5 blocks. + for block_number in 1..=5 { + BlockSchema(&mut storage) + .save_full_block(gen_sample_block( + BlockNumber(block_number), + BLOCK_SIZE_CHUNKS, + Default::default(), + )) + .await?; + TreeCacheSchemaBincode(&mut storage) + .store_account_tree_cache(BlockNumber(block_number), vec![1u8, 2, 3]) + .await?; + } + + // Remove account tree cache for blocks with numbers greater than 2. + TreeCacheSchemaBincode(&mut storage) + .remove_old_account_tree_cache(BlockNumber(3)) + .await?; + + // Check that the account tree cache for block #3 is present, and for block #1 is not. + assert!(TreeCacheSchemaBincode(&mut storage) + .get_account_tree_cache_block(BlockNumber(3)) + .await? + .is_some()); + assert!(TreeCacheSchemaBincode(&mut storage) + .get_account_tree_cache_block(BlockNumber(1)) + .await? + .is_none()); + + Ok(()) +} + +/// Check that account tree cache is removed correctly. +#[db_test] +async fn test_remove_new_account_tree_cache_bincode( + mut storage: StorageProcessor<'_>, +) -> QueryResult<()> { + // Insert account tree cache for 5 blocks. + for block_number in 1..=5 { + BlockSchema(&mut storage) + .save_full_block(gen_sample_block( + BlockNumber(block_number), + BLOCK_SIZE_CHUNKS, + Default::default(), + )) + .await?; + TreeCacheSchemaBincode(&mut storage) + .store_account_tree_cache(BlockNumber(block_number), vec![1u8, 2, 3]) + .await?; + } + + // Remove account tree cache for blocks with numbers greater than 2. + TreeCacheSchemaBincode(&mut storage) + .remove_new_account_tree_cache(BlockNumber(2)) + .await?; + + // Check if account tree cache for the 2nd block is present, and for the 3rd is not. + assert!(TreeCacheSchemaBincode(&mut storage) + .get_account_tree_cache_block(BlockNumber(2)) + .await? + .is_some()); + assert!(TreeCacheSchemaBincode(&mut storage) .get_account_tree_cache_block(BlockNumber(3)) .await? .is_none());