Skip to content

Commit

Permalink
Create two separate schemas for tree caches
Browse files Browse the repository at this point in the history
  • Loading branch information
popzxc committed Mar 28, 2022
1 parent f153fac commit cdb1195
Show file tree
Hide file tree
Showing 6 changed files with 460 additions and 164 deletions.
8 changes: 8 additions & 0 deletions core/lib/storage/src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ impl<'a, 'c> ChainIntermediator<'a, 'c> {
pub fn mempool_schema(self) -> mempool::MempoolSchema<'a, 'c> {
mempool::MempoolSchema(self.0)
}

pub fn tree_cache_schema_json(self) -> tree_cache::TreeCacheSchemaJSON<'a, 'c> {
tree_cache::TreeCacheSchemaJSON(self.0)
}

pub fn tree_cache_schema_bincode(self) -> tree_cache::TreeCacheSchemaBincode<'a, 'c> {
tree_cache::TreeCacheSchemaBincode(self.0)
}
}
171 changes: 171 additions & 0 deletions core/lib/storage/src/chain/tree_cache/bincode_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Built-in deps
use std::time::Instant;
// External imports
// Workspace imports
use zksync_types::BlockNumber;
// Local imports
use super::records::AccountTreeCache;
use crate::{QueryResult, StorageProcessor};

/// Tree cache schema contains methods to store/load Merkle tree cache.
#[derive(Debug)]
pub struct TreeCacheSchemaBincode<'a, 'c>(pub &'a mut StorageProcessor<'c>);

impl<'a, 'c> TreeCacheSchemaBincode<'a, 'c> {
/// Stores account tree cache for a block.
/// Expects `tree_cache` to be a byte sequence encoded according to the `bincode` protocol.
pub async fn store_account_tree_cache(
&mut self,
block: BlockNumber,
tree_cache: Vec<u8>,
) -> QueryResult<()> {
let start = Instant::now();
if *block == 0 {
return Ok(());
}

sqlx::query!(
"
INSERT INTO account_tree_cache (block, tree_cache_binary)
VALUES ($1, $2)
ON CONFLICT (block)
DO NOTHING
",
*block as i64,
tree_cache,
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.bincode.store_account_tree_cache",
start.elapsed()
);
Ok(())
}

/// Gets the number of the latest block that has a stored cache.
/// Returns `None` if there are no caches in the database.
pub async fn get_last_block_with_account_tree_cache(
&mut self,
) -> QueryResult<Option<BlockNumber>> {
let start = Instant::now();

let last_block_with_cache = sqlx::query!("SELECT MAX(block) FROM account_tree_cache")
.fetch_one(self.0.conn())
.await?
.max;

metrics::histogram!(
"sql.chain.tree_cache.bincode.get_last_block_with_account_tree_cache",
start.elapsed()
);
Ok(last_block_with_cache.map(|block| BlockNumber(block as u32)))
}

/// Gets the latest stored account tree cache.
/// Returns `None` if there are no caches in the database.
/// Returns the block number and associated cache otherwise.
pub async fn get_account_tree_cache(&mut self) -> QueryResult<Option<(BlockNumber, Vec<u8>)>> {
let start = Instant::now();
let account_tree_cache = sqlx::query_as!(
AccountTreeCache,
"
SELECT * FROM account_tree_cache
ORDER BY block DESC
LIMIT 1
",
)
.fetch_optional(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.bincode.get_account_tree_cache",
start.elapsed()
);
Ok(account_tree_cache.map(|w| {
assert!(
w.tree_cache_binary.is_some(),
"Binary/bincode schema was used to fetch from table without binary data. Entry: {:?}",
w
);
(
BlockNumber(w.block as u32),
w.tree_cache_binary.unwrap(),
)
}))
}

/// Gets stored account tree cache for a certain block.
/// Returns `None` if there is no cache for requested block.
pub async fn get_account_tree_cache_block(
&mut self,
block: BlockNumber,
) -> QueryResult<Option<Vec<u8>>> {
let start = Instant::now();
let account_tree_cache = sqlx::query_as!(
AccountTreeCache,
"
SELECT * FROM account_tree_cache
WHERE block = $1
",
*block as i64
)
.fetch_optional(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.bincode.get_account_tree_cache_block",
start.elapsed()
);
Ok(account_tree_cache.map(|w| {
assert!(
w.tree_cache_binary.is_some(),
"Binary/bincode schema was used to fetch from table without binary data. Entry: {:?}",
w
);

w.tree_cache_binary.unwrap()
}))
}

// Removes account tree cache for blocks with number greater than `last_block`
pub async fn remove_new_account_tree_cache(
&mut self,
last_block: BlockNumber,
) -> QueryResult<()> {
let start = Instant::now();
sqlx::query!(
"DELETE FROM account_tree_cache WHERE block > $1",
*last_block as i64
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.bincode.remove_new_account_tree_cache",
start.elapsed()
);
Ok(())
}

// Removes account tree cache for blocks with number less than `last_block`
pub async fn remove_old_account_tree_cache(
&mut self,
last_block: BlockNumber,
) -> QueryResult<()> {
let start = Instant::now();
sqlx::query!(
"DELETE FROM account_tree_cache WHERE block < $1",
*last_block as i64
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.bincode.remove_old_account_tree_cache",
start.elapsed()
);
Ok(())
}
}
176 changes: 176 additions & 0 deletions core/lib/storage/src/chain/tree_cache/json_schema.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
// Built-in deps
use std::time::Instant;
// External imports
// Workspace imports
use zksync_types::BlockNumber;
// Local imports
use super::records::AccountTreeCache;
use crate::{QueryResult, StorageProcessor};

/// Tree cache schema contains methods to store/load Merkle tree cache.
#[derive(Debug)]
pub struct TreeCacheSchemaJSON<'a, 'c>(pub &'a mut StorageProcessor<'c>);

impl<'a, 'c> TreeCacheSchemaJSON<'a, 'c> {
/// Stores account tree cache for a block.
/// Expects `tree_cache` to be a valid encoded JSON.
pub async fn store_account_tree_cache(
&mut self,
block: BlockNumber,
tree_cache: String,
) -> QueryResult<()> {
let start = Instant::now();
if *block == 0 {
return Ok(());
}

sqlx::query!(
"
INSERT INTO account_tree_cache (block, tree_cache)
VALUES ($1, $2)
ON CONFLICT (block)
DO NOTHING
",
*block as i64,
tree_cache,
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.json.store_account_tree_cache",
start.elapsed()
);
Ok(())
}

/// Gets the number of the latest block that has a stored cache.
/// Returns `None` if there are no caches in the database.
pub async fn get_last_block_with_account_tree_cache(
&mut self,
) -> QueryResult<Option<BlockNumber>> {
let start = Instant::now();

let last_block_with_cache = sqlx::query!("SELECT MAX(block) FROM account_tree_cache")
.fetch_one(self.0.conn())
.await?
.max;

metrics::histogram!(
"sql.chain.tree_cache.json.get_last_block_with_account_tree_cache",
start.elapsed()
);
Ok(last_block_with_cache.map(|block| BlockNumber(block as u32)))
}

/// Gets the latest stored account tree cache.
/// Returns `None` if there are no caches in the database.
/// Returns the block number and associated cache otherwise.
pub async fn get_account_tree_cache(
&mut self,
) -> QueryResult<Option<(BlockNumber, serde_json::Value)>> {
let start = Instant::now();
let account_tree_cache = sqlx::query_as!(
AccountTreeCache,
"
SELECT * FROM account_tree_cache
ORDER BY block DESC
LIMIT 1
",
)
.fetch_optional(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.json.get_account_tree_cache",
start.elapsed()
);
Ok(account_tree_cache.map(|w| {
assert!(
w.tree_cache.is_some(),
"JSON schema was used to fetch from table without JSON data. Entry: {:?}",
w
);

(
BlockNumber(w.block as u32),
serde_json::from_str(&w.tree_cache.unwrap())
.expect("Failed to deserialize Account Tree Cache"),
)
}))
}

/// Gets stored account tree cache for a certain block.
/// Returns `None` if there is no cache for requested block.
pub async fn get_account_tree_cache_block(
&mut self,
block: BlockNumber,
) -> QueryResult<Option<serde_json::Value>> {
let start = Instant::now();
let account_tree_cache = sqlx::query_as!(
AccountTreeCache,
"
SELECT * FROM account_tree_cache
WHERE block = $1
",
*block as i64
)
.fetch_optional(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.json.get_account_tree_cache_block",
start.elapsed()
);
Ok(account_tree_cache.map(|w| {
assert!(
w.tree_cache.is_some(),
"JSON schema was used to fetch from table without JSON data. Entry: {:?}",
w
);

serde_json::from_str(&w.tree_cache.unwrap())
.expect("Failed to deserialize Account Tree Cache")
}))
}

// Removes account tree cache for blocks with number greater than `last_block`
pub async fn remove_new_account_tree_cache(
&mut self,
last_block: BlockNumber,
) -> QueryResult<()> {
let start = Instant::now();
sqlx::query!(
"DELETE FROM account_tree_cache WHERE block > $1",
*last_block as i64
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.json.remove_new_account_tree_cache",
start.elapsed()
);
Ok(())
}

// Removes account tree cache for blocks with number less than `last_block`
pub async fn remove_old_account_tree_cache(
&mut self,
last_block: BlockNumber,
) -> QueryResult<()> {
let start = Instant::now();
sqlx::query!(
"DELETE FROM account_tree_cache WHERE block < $1",
*last_block as i64
)
.execute(self.0.conn())
.await?;

metrics::histogram!(
"sql.chain.tree_cache.json.remove_old_account_tree_cache",
start.elapsed()
);
Ok(())
}
}
Loading

0 comments on commit cdb1195

Please sign in to comment.