Skip to content

Commit

Permalink
Merge #2182
Browse files Browse the repository at this point in the history
2182: Define way of gathering product metrics r=Deniallugo a=Deniallugo



Co-authored-by: deniallugo <[email protected]>
  • Loading branch information
bors-matterlabs-dev[bot] and Deniallugo authored Mar 4, 2022
2 parents 39c2391 + 7485003 commit 5a11ba5
Show file tree
Hide file tree
Showing 11 changed files with 153 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion core/bin/zksync_core/src/committer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async fn seal_incomplete_block(
transaction
.chain()
.block_schema()
.save_incomplete_block(block)
.save_incomplete_block(&block)
.await
.expect("committer must commit the op into db");

Expand All @@ -218,6 +218,10 @@ async fn seal_incomplete_block(
.await
.expect("Unable to commit DB transaction");

// We do this outside of a transaction,
// because we want the incomplete block data to be available as soon as possible.
// If something happened to the metric count, it won't affect the block data
zksync_prometheus_exporter::calculate_volume_for_block(&mut storage, &block).await;
metrics::histogram!("committer.seal_incomplete_block", start.elapsed());
}

Expand Down
1 change: 1 addition & 0 deletions core/lib/prometheus_exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { version = "1", features = ["full"] }
futures = "0.3"
anyhow = "1.0"

num = { version = "0.3.1", features = ["serde"] }
metrics = "0.17"
metrics-exporter-prometheus = "0.6"
metrics-macros = "0.4"
Expand Down
126 changes: 82 additions & 44 deletions core/lib/prometheus_exporter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,105 @@
//! This module handles metric export to the Prometheus server
use metrics_exporter_prometheus::PrometheusBuilder;
use num::rational::Ratio;
use num::{BigUint, ToPrimitive};
use std::collections::HashMap;
use std::ops::Add;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use zksync_storage::ConnectionPool;
use zksync_storage::{ConnectionPool, QueryResult, StorageProcessor};
use zksync_types::aggregated_operations::AggregatedActionType::*;
use zksync_types::block::IncompleteBlock;
use zksync_types::TokenId;

const QUERY_INTERVAL: Duration = Duration::from_secs(30);

pub fn run_operation_counter(connection_pool: ConnectionPool) -> JoinHandle<()> {
tokio::spawn(async move {
let mut storage = connection_pool
.access_storage()
.await
.expect("unable to access storage");

loop {
let mut transaction = storage
.start_transaction()
.await
.expect("unable to start db transaction");
let mut block_schema = transaction.chain().block_schema();

for &action in &[CommitBlocks, ExecuteBlocks] {
for &is_confirmed in &[false, true] {
let result = block_schema
.count_aggregated_operations(action, is_confirmed)
.await;
if let Ok(result) = result {
metrics::gauge!(
"count_operations",
result as f64,
"action" => action.to_string(),
"confirmed" => is_confirmed.to_string()
);
}
}
if let Err(e) = prometheus_exporter_iteration(connection_pool.clone()).await {
vlog::error!("Prometheus error: {}", e);
}
sleep(QUERY_INTERVAL).await;
}
})
}

let rejected_txs = block_schema.count_rejected_txs().await;
async fn prometheus_exporter_iteration(connection_pool: ConnectionPool) -> QueryResult<()> {
let mut storage = connection_pool.access_storage().await?;
let mut transaction = storage.start_transaction().await?;

if let Ok(result) = rejected_txs {
metrics::gauge!("stored_rejected_txs", result as f64);
}
let mut block_schema = transaction.chain().block_schema();

let mempool_size = transaction
.chain()
.mempool_schema()
.get_mempool_size()
.await;
if let Ok(result) = mempool_size {
metrics::gauge!("mempool_size", result as f64);
}
for &action in &[CommitBlocks, ExecuteBlocks] {
for &is_confirmed in &[false, true] {
let result = block_schema
.count_aggregated_operations(action, is_confirmed)
.await?;
metrics::gauge!(
"count_operations",
result as f64,
"action" => action.to_string(),
"confirmed" => is_confirmed.to_string()
);
}
}

transaction
.commit()
.await
.expect("unable to commit db transaction");
let rejected_txs = block_schema.count_rejected_txs().await?;

sleep(QUERY_INTERVAL).await;
metrics::gauge!("stored_rejected_txs", rejected_txs as f64);

let mempool_size = transaction
.chain()
.mempool_schema()
.get_mempool_size()
.await?;
metrics::gauge!("mempool_size", mempool_size as f64);

transaction.commit().await?;
Ok(())
}

/// Extract volumes from block
fn get_volumes(block: &IncompleteBlock) -> HashMap<TokenId, BigUint> {
let mut volumes: HashMap<TokenId, BigUint> = HashMap::new();

// Iterator over tx amounts in the block.
let amounts_iter = block
.block_transactions
.iter()
.filter(|executed_op| executed_op.is_successful()) // Only process successful operations.
.filter_map(|executed_op| executed_op.get_executed_op()) // Obtain transaction.
.filter_map(|tx| tx.get_amount_info()) // Process transactions with amounts.
.flatten(); // Each transaction can have multiple amounts, process one by one.

for (token, amount) in amounts_iter {
volumes
.entry(token)
.and_modify(|volume| *volume = volume.clone().add(amount.clone()))
.or_insert(amount);
}
volumes
}

/// Send volume of all transactions in block in usd to prometheus
pub async fn calculate_volume_for_block(
storage: &mut StorageProcessor<'_>,
block: &IncompleteBlock,
) {
let volumes = get_volumes(block);
for (token, amount) in volumes.into_iter() {
if let Ok(Some(price)) = storage
.tokens_schema()
.get_historical_ticker_price(token)
.await
{
let labels = vec![("token", format!("{}", token.0))];
let usd_amount = Ratio::from(amount) * price.usd_price;
metrics::gauge!("txs_volume", usd_amount.to_f64().unwrap(), &labels);
}
})
}
}

pub fn run_prometheus_exporter(port: u16) -> JoinHandle<()> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP PROCEDURE unique_users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
create or replace function unique_users(after timestamp, before timestamp)
returns table
(
total bigint
)
language plpgsql
as
$$
begin
return query (
select count(distinct address)
from tx_filters
where tx_hash in (
select tx_hash
from executed_transactions
where success = true
and created_at BETWEEN after AND before
union
select tx_hash
from executed_priority_operations
where created_at BETWEEN after AND before
));
end;
$$;
9 changes: 6 additions & 3 deletions core/lib/storage/src/chain/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
.set_account_type(tx.account_id, new_account_type)
.await?;
}
// Store the executed operation in the corresponding schema.

let new_tx = NewExecutedTransaction::prepare_stored_tx(
*tx,
block_number,
Expand All @@ -102,11 +102,14 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
.await?;
}
ExecutedOperations::PriorityOp(prior_op) => {
// Store the executed operation in the corresponding schema.
// For priority operation we should only store it in the Operations schema.
let new_priority_op = NewExecutedPriorityOperation::prepare_stored_priority_op(
*prior_op,
block_number,
);

// Store the executed operation in the corresponding schema.
transaction
.chain()
.operations_schema()
Expand Down Expand Up @@ -940,7 +943,7 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
///
/// This method **does not** save block transactions.
/// They are expected to be saved prior, during processing of previous pending blocks.
pub async fn save_incomplete_block(&mut self, block: IncompleteBlock) -> QueryResult<()> {
pub async fn save_incomplete_block(&mut self, block: &IncompleteBlock) -> QueryResult<()> {
let start = Instant::now();
let mut transaction = self.0.start_transaction().await?;

Expand Down Expand Up @@ -1020,7 +1023,7 @@ impl<'a, 'c> BlockSchema<'a, 'c> {
)
.await?;
BlockSchema(&mut transaction)
.save_incomplete_block(incomplete_block)
.save_incomplete_block(&incomplete_block)
.await?;
BlockSchema(&mut transaction)
.finish_incomplete_block(full_block)
Expand Down
1 change: 1 addition & 0 deletions core/lib/storage/src/chain/state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ impl<'a, 'c> StateSchema<'a, 'c> {
)
.execute(self.0.conn())
.await?;
metrics::increment_counter!("new_accounts");
}
StorageAccountDiff::Delete(upd) => {
sqlx::query!(
Expand Down
4 changes: 2 additions & 2 deletions core/lib/storage/src/tests/chain/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1066,7 +1066,7 @@ async fn test_remove_blocks(mut storage: StorageProcessor<'_>) -> QueryResult<()
}
// Insert 1 incomplete block.
BlockSchema(&mut storage)
.save_incomplete_block(gen_sample_incomplete_block(
.save_incomplete_block(&gen_sample_incomplete_block(
BlockNumber(6),
BLOCK_SIZE_CHUNKS,
Default::default(),
Expand Down Expand Up @@ -1402,7 +1402,7 @@ async fn test_incomplete_block_logic(mut storage: StorageProcessor<'_>) -> Query
"Pending block should be saved"
);

schema.save_incomplete_block(incomplete_block).await?;
schema.save_incomplete_block(&incomplete_block).await?;

// Pending block should be removed now.
assert!(
Expand Down
4 changes: 4 additions & 0 deletions core/lib/types/src/operations/full_exit_op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,8 @@ impl FullExitOp {
pub fn get_updated_account_ids(&self) -> Vec<AccountId> {
vec![self.priority_op.account_id]
}

pub fn withdraw_amount(&self) -> Option<BigUint> {
self.withdraw_amount.as_ref().map(|a| a.0.clone())
}
}
27 changes: 26 additions & 1 deletion core/lib/types/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
use super::ZkSyncTx;
use crate::ZkSyncPriorityOp;
use num::BigUint;
use serde::{Deserialize, Serialize};
use zksync_basic_types::AccountId;
use zksync_basic_types::{AccountId, TokenId};
use zksync_crypto::params::{CHUNK_BYTES, LEGACY_CHUNK_BYTES};

mod change_pubkey_op;
Expand Down Expand Up @@ -71,6 +72,30 @@ impl ZkSyncOp {
ZkSyncOp::WithdrawNFT(_) => WithdrawNFTOp::CHUNKS,
}
}
/// Get information about amounts in operation
pub fn get_amount_info(&self) -> Option<Vec<(TokenId, BigUint)>> {
match self {
ZkSyncOp::Transfer(tx) => Some(vec![(tx.tx.token, tx.tx.amount.clone())]),
ZkSyncOp::Withdraw(tx) => Some(vec![(tx.tx.token, tx.tx.amount.clone())]),
ZkSyncOp::Close(_) => None,
ZkSyncOp::ChangePubKeyOffchain(_) => None,
ZkSyncOp::ForcedExit(_) => None,
ZkSyncOp::MintNFTOp(_) => None,
ZkSyncOp::Swap(tx) => Some(vec![
(tx.tx.orders.0.token_buy, tx.tx.amounts.0.clone()),
(tx.tx.orders.1.token_buy, tx.tx.amounts.1.clone()),
]),
ZkSyncOp::Deposit(tx) => {
Some(vec![(tx.priority_op.token, tx.priority_op.amount.clone())])
}
ZkSyncOp::TransferToNew(tx) => Some(vec![(tx.tx.token, tx.tx.amount.clone())]),
ZkSyncOp::WithdrawNFT(_) => None,
ZkSyncOp::FullExit(tx) => tx
.withdraw_amount()
.map(|amount| vec![(tx.priority_op.token, amount)]),
ZkSyncOp::Noop(_) => None,
}
}

/// Returns the public data required for the Ethereum smart contract to commit the operation.
pub fn public_data(&self) -> Vec<u8> {
Expand Down

0 comments on commit 5a11ba5

Please sign in to comment.