Skip to content

Commit

Permalink
Move calculating block
Browse files Browse the repository at this point in the history
Signed-off-by: deniallugo <[email protected]>
  • Loading branch information
Deniallugo committed Mar 3, 2022
1 parent 343070a commit 3a6a5ff
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 30 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion core/bin/zksync_core/src/committer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async fn seal_incomplete_block(
transaction
.chain()
.block_schema()
.save_incomplete_block(block)
.save_incomplete_block(&block)
.await
.expect("committer must commit the op into db");

Expand All @@ -218,6 +218,10 @@ async fn seal_incomplete_block(
.await
.expect("Unable to commit DB transaction");

// We do this outside of a transaction,
// because we want the incomplete block data to be available as soon as possible.
// If something happened to the metric count, it won't affect the block data
zksync_prometheus_exporter::calculate_volume_for_block(&mut storage, &block).await;
metrics::histogram!("committer.seal_incomplete_block", start.elapsed());
}

Expand Down
1 change: 1 addition & 0 deletions core/lib/prometheus_exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { version = "1", features = ["full"] }
futures = "0.3"
anyhow = "1.0"

num = { version = "0.3.1", features = ["serde"] }
metrics = "0.17"
metrics-exporter-prometheus = "0.6"
metrics-macros = "0.4"
Expand Down
44 changes: 43 additions & 1 deletion core/lib/prometheus_exporter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
//! This module handles metric export to the Prometheus server
use metrics_exporter_prometheus::PrometheusBuilder;
use num::rational::Ratio;
use num::{BigUint, ToPrimitive};
use std::collections::HashMap;
use std::ops::Add;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use zksync_storage::{ConnectionPool, QueryResult};
use zksync_storage::{ConnectionPool, QueryResult, StorageProcessor};
use zksync_types::aggregated_operations::AggregatedActionType::*;
use zksync_types::block::IncompleteBlock;
use zksync_types::TokenId;

const QUERY_INTERVAL: Duration = Duration::from_secs(30);

Expand Down Expand Up @@ -55,6 +61,42 @@ async fn prometheus_exporter_iteration(connection_pool: ConnectionPool) -> Query
Ok(())
}

pub async fn calculate_volume_for_block(
storage: &mut StorageProcessor<'_>,
block: &IncompleteBlock,
) {
let mut volumes: HashMap<TokenId, BigUint> = HashMap::new();
for executed_op in &block.block_transactions {
if let Some(tx) = executed_op.get_executed_op() {
if executed_op.is_successful() {
if let Some(data) = tx.get_amount_info() {
for (token, amount) in data {
let full_amount;
if let Some(volume) = volumes.get(&token) {
full_amount = volume.add(amount);
} else {
full_amount = amount;
}
volumes.insert(token, full_amount);
}
}
}
}
}

for (token, amount) in volumes.into_iter() {
if let Ok(Some(price)) = storage
.tokens_schema()
.get_historical_ticker_price(token)
.await
{
let labels = vec![("token", format!("{}", token.0))];
let usd_amount = Ratio::from(amount) * price.usd_price;
metrics::gauge!("txs_volume", usd_amount.to_f64().unwrap(), &labels);
}
}
}

pub fn run_prometheus_exporter(port: u16) -> JoinHandle<()> {
let addr = ([0, 0, 0, 0], port);
let (recorder, exporter) = PrometheusBuilder::new()
Expand Down
28 changes: 2 additions & 26 deletions core/lib/storage/src/chain/block/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
// Built-in deps
use num::rational::Ratio;
use num::ToPrimitive;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
// External imports
// Workspace imports
Expand Down Expand Up @@ -45,22 +43,6 @@ pub mod records;
#[derive(Debug)]
pub struct BlockSchema<'a, 'c>(pub &'a mut StorageProcessor<'c>);

pub async fn txs_volume_metrics(storage: &mut StorageProcessor<'_>, op: &ZkSyncOp) {
if let Some(data) = op.get_amount_info() {
for (token, amount) in data {
if let Ok(Some(price)) = storage
.tokens_schema()
.get_historical_ticker_price(token)
.await
{
let labels = vec![("token", format!("{}", token.0))];
let usd_amount = Ratio::from(amount) * price.usd_price;
metrics::gauge!("txs_volume", usd_amount.to_f64().unwrap(), &labels);
}
}
}
}

impl<'a, 'c> BlockSchema<'a, 'c> {
/// Given a block, stores its transactions in the database.
pub async fn save_block_transactions(
Expand Down Expand Up @@ -106,10 +88,6 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
.set_account_type(tx.account_id, new_account_type)
.await?;
}
// Store the executed operation in the corresponding schema.
if let Some(op) = &tx.op {
txs_volume_metrics(&mut transaction, op).await;
}

let new_tx = NewExecutedTransaction::prepare_stored_tx(
*tx,
Expand All @@ -125,8 +103,6 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
}
ExecutedOperations::PriorityOp(prior_op) => {
// Store the executed operation in the corresponding schema.
txs_volume_metrics(&mut transaction, &prior_op.op).await;

// For priority operation we should only store it in the Operations schema.
let new_priority_op = NewExecutedPriorityOperation::prepare_stored_priority_op(
*prior_op,
Expand Down Expand Up @@ -964,7 +940,7 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
///
/// This method **does not** save block transactions.
/// They are expected to be saved prior, during processing of previous pending blocks.
pub async fn save_incomplete_block(&mut self, block: IncompleteBlock) -> QueryResult<()> {
pub async fn save_incomplete_block(&mut self, block: &IncompleteBlock) -> QueryResult<()> {
let start = Instant::now();
let mut transaction = self.0.start_transaction().await?;

Expand Down Expand Up @@ -1044,7 +1020,7 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
)
.await?;
BlockSchema(&mut transaction)
.save_incomplete_block(incomplete_block)
.save_incomplete_block(&incomplete_block)
.await?;
BlockSchema(&mut transaction)
.finish_incomplete_block(full_block)
Expand Down
4 changes: 2 additions & 2 deletions core/lib/storage/src/tests/chain/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ async fn test_remove_blocks(mut storage: StorageProcessor<'_>) -> QueryResult<()
}
// Insert 1 incomplete block.
BlockSchema(&mut storage)
.save_incomplete_block(gen_sample_incomplete_block(
.save_incomplete_block(&gen_sample_incomplete_block(
BlockNumber(6),
BLOCK_SIZE_CHUNKS,
Default::default(),
Expand Down Expand Up @@ -1402,7 +1402,7 @@ async fn test_incomplete_block_logic(mut storage: StorageProcessor<'_>) -> Query
"Pending block should be saved"
);

schema.save_incomplete_block(incomplete_block).await?;
schema.save_incomplete_block(&incomplete_block).await?;

// Pending block should be removed now.
assert!(
Expand Down

0 comments on commit 3a6a5ff

Please sign in to comment.