Skip to content

Commit

Permalink
Merge branch 'dev' into popzxc-zks-549-check-all-folders-in-zksync-re…
Browse files Browse the repository at this point in the history
…po-and
  • Loading branch information
popzxc authored Mar 4, 2022
2 parents 999f1a3 + 5a11ba5 commit 200442e
Show file tree
Hide file tree
Showing 24 changed files with 766 additions and 369 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 49 additions & 2 deletions core/bin/zksync_api/src/api_server/rest/v02/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use actix_web::{web, Scope};
use zksync_api_types::v02::{
block::{BlockInfo, BlockStatus},
pagination::{parse_query, ApiEither, BlockAndTxHash, Paginated, PaginationQuery},
transaction::{Transaction, TxHashSerializeWrapper},
transaction::{Transaction, TxData, TxHashSerializeWrapper},
};
use zksync_crypto::{convert::FeConvert, Fr};
use zksync_storage::{chain::block::records::StorageBlockDetails, ConnectionPool, QueryResult};
Expand Down Expand Up @@ -126,6 +126,20 @@ impl ApiBlockData {
storage.paginate_checked(&new_query).await
}

async fn tx_data(
&self,
block_number: BlockNumber,
block_index: u64,
) -> Result<Option<TxData>, Error> {
let mut storage = self.pool.access_storage().await.map_err(Error::storage)?;
Ok(storage
.chain()
.operations_ext_schema()
.tx_data_by_block_and_index_api_v02(block_number, block_index)
.await
.map_err(Error::storage)?)
}

async fn get_last_committed_block_number(&self) -> QueryResult<BlockNumber> {
let mut storage = self.pool.access_storage().await?;
storage
Expand Down Expand Up @@ -175,6 +189,15 @@ async fn block_transactions(
data.transaction_page(block_number, query).await.into()
}

async fn transaction_in_block(
data: web::Data<ApiBlockData>,
path: web::Path<(BlockNumber, u64)>,
) -> ApiResult<Option<TxData>> {
let (block_number, block_index) = *path;
let res = api_try!(data.tx_data(block_number, block_index).await);
ApiResult::Ok(res)
}

pub fn api_scope(pool: ConnectionPool, cache: BlockDetailsCache) -> Scope {
let data = ApiBlockData::new(pool, cache);

Expand All @@ -186,6 +209,10 @@ pub fn api_scope(pool: ConnectionPool, cache: BlockDetailsCache) -> Scope {
"{block_position}/transactions",
web::get().to(block_transactions),
)
.route(
"{block_position}/transactions/{block_index}",
web::get().to(transaction_in_block),
)
}

#[cfg(test)]
Expand Down Expand Up @@ -267,7 +294,7 @@ mod tests {
assert_eq!(paginated.pagination.direction, PaginationDirection::Older);
assert_eq!(paginated.pagination.from, tx_hash);

for (tx, expected_tx) in paginated.list.into_iter().zip(expected_txs) {
for (tx, expected_tx) in paginated.list.into_iter().zip(expected_txs.clone()) {
assert_eq!(
tx.tx_hash.to_string().replace("sync-tx:", "0x"),
expected_tx.tx_hash
Expand All @@ -280,6 +307,26 @@ mod tests {
}
}

for expected_tx in expected_txs {
if !expected_tx.success {
continue;
}
let response = client
.transaction_in_block(
expected_tx.block_number as u32,
expected_tx.block_index.unwrap() as u32,
)
.await?;
let tx: Option<TxData> = deserialize_response_result(response)?;
let tx = tx.unwrap().tx;
assert_eq!(tx.created_at, Some(expected_tx.created_at));
assert_eq!(*tx.block_number.unwrap(), expected_tx.block_number as u32);
assert_eq!(tx.fail_reason, expected_tx.fail_reason);
if matches!(tx.op, TransactionData::L2(_)) {
assert_eq!(serde_json::to_value(tx.op).unwrap(), expected_tx.op);
}
}

server.stop().await;
Ok(())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ impl Paginate<PendingOpsRequest> for StorageProcessor<'_> {
);
Transaction {
tx_hash,
block_index: None,
block_number: None,
op: TransactionData::L1(tx),
status: TxInBlockStatus::Queued,
Expand Down
1 change: 1 addition & 0 deletions core/bin/zksync_api/src/api_server/rest/v02/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl ApiTransactionData {
let tx_hash = op.tx_hash();
let tx = Transaction {
tx_hash,
block_index: None,
block_number: None,
op: TransactionData::L1(L1Transaction::from_pending_op(
op.data,
Expand Down
6 changes: 5 additions & 1 deletion core/bin/zksync_core/src/committer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ async fn seal_incomplete_block(
transaction
.chain()
.block_schema()
.save_incomplete_block(block)
.save_incomplete_block(&block)
.await
.expect("committer must commit the op into db");

Expand All @@ -218,6 +218,10 @@ async fn seal_incomplete_block(
.await
.expect("Unable to commit DB transaction");

// We do this outside of a transaction,
// because we want the incomplete block data to be available as soon as possible.
// If something happened to the metric count, it won't affect the block data
zksync_prometheus_exporter::calculate_volume_for_block(&mut storage, &block).await;
metrics::histogram!("committer.seal_incomplete_block", start.elapsed());
}

Expand Down
9 changes: 9 additions & 0 deletions core/lib/api_client/src/rest/v02/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,15 @@ impl Client {
.await
}

pub async fn transaction_in_block(&self, block_number: u32, tx_index: u32) -> Result<Response> {
self.get_with_scope(
super::API_V02_SCOPE,
&format!("blocks/{}/transactions/{}", block_number, tx_index),
)
.send()
.await
}

pub async fn block_transactions(
&self,
pagination_query: &PaginationQuery<ApiEither<TxHash>>,
Expand Down
1 change: 1 addition & 0 deletions core/lib/api_types/src/v02/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ pub enum Receipt {
pub struct Transaction {
#[serde(serialize_with = "ZeroPrefixHexSerde::serialize")]
pub tx_hash: TxHash,
pub block_index: Option<u32>,
pub block_number: Option<BlockNumber>,
pub op: TransactionData,
pub status: TxInBlockStatus,
Expand Down
1 change: 1 addition & 0 deletions core/lib/prometheus_exporter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ tokio = { version = "1", features = ["full"] }
futures = "0.3"
anyhow = "1.0"

num = { version = "0.3.1", features = ["serde"] }
metrics = "0.17"
metrics-exporter-prometheus = "0.6"
metrics-macros = "0.4"
Expand Down
126 changes: 82 additions & 44 deletions core/lib/prometheus_exporter/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,67 +1,105 @@
//! This module handles metric export to the Prometheus server
use metrics_exporter_prometheus::PrometheusBuilder;
use num::rational::Ratio;
use num::{BigUint, ToPrimitive};
use std::collections::HashMap;
use std::ops::Add;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use zksync_storage::ConnectionPool;
use zksync_storage::{ConnectionPool, QueryResult, StorageProcessor};
use zksync_types::aggregated_operations::AggregatedActionType::*;
use zksync_types::block::IncompleteBlock;
use zksync_types::TokenId;

const QUERY_INTERVAL: Duration = Duration::from_secs(30);

pub fn run_operation_counter(connection_pool: ConnectionPool) -> JoinHandle<()> {
tokio::spawn(async move {
let mut storage = connection_pool
.access_storage()
.await
.expect("unable to access storage");

loop {
let mut transaction = storage
.start_transaction()
.await
.expect("unable to start db transaction");
let mut block_schema = transaction.chain().block_schema();

for &action in &[CommitBlocks, ExecuteBlocks] {
for &is_confirmed in &[false, true] {
let result = block_schema
.count_aggregated_operations(action, is_confirmed)
.await;
if let Ok(result) = result {
metrics::gauge!(
"count_operations",
result as f64,
"action" => action.to_string(),
"confirmed" => is_confirmed.to_string()
);
}
}
if let Err(e) = prometheus_exporter_iteration(connection_pool.clone()).await {
vlog::error!("Prometheus error: {}", e);
}
sleep(QUERY_INTERVAL).await;
}
})
}

let rejected_txs = block_schema.count_rejected_txs().await;
async fn prometheus_exporter_iteration(connection_pool: ConnectionPool) -> QueryResult<()> {
let mut storage = connection_pool.access_storage().await?;
let mut transaction = storage.start_transaction().await?;

if let Ok(result) = rejected_txs {
metrics::gauge!("stored_rejected_txs", result as f64);
}
let mut block_schema = transaction.chain().block_schema();

let mempool_size = transaction
.chain()
.mempool_schema()
.get_mempool_size()
.await;
if let Ok(result) = mempool_size {
metrics::gauge!("mempool_size", result as f64);
}
for &action in &[CommitBlocks, ExecuteBlocks] {
for &is_confirmed in &[false, true] {
let result = block_schema
.count_aggregated_operations(action, is_confirmed)
.await?;
metrics::gauge!(
"count_operations",
result as f64,
"action" => action.to_string(),
"confirmed" => is_confirmed.to_string()
);
}
}

transaction
.commit()
.await
.expect("unable to commit db transaction");
let rejected_txs = block_schema.count_rejected_txs().await?;

sleep(QUERY_INTERVAL).await;
metrics::gauge!("stored_rejected_txs", rejected_txs as f64);

let mempool_size = transaction
.chain()
.mempool_schema()
.get_mempool_size()
.await?;
metrics::gauge!("mempool_size", mempool_size as f64);

transaction.commit().await?;
Ok(())
}

/// Extract volumes from block
fn get_volumes(block: &IncompleteBlock) -> HashMap<TokenId, BigUint> {
let mut volumes: HashMap<TokenId, BigUint> = HashMap::new();

// Iterator over tx amounts in the block.
let amounts_iter = block
.block_transactions
.iter()
.filter(|executed_op| executed_op.is_successful()) // Only process successful operations.
.filter_map(|executed_op| executed_op.get_executed_op()) // Obtain transaction.
.filter_map(|tx| tx.get_amount_info()) // Process transactions with amounts.
.flatten(); // Each transaction can have multiple amounts, process one by one.

for (token, amount) in amounts_iter {
volumes
.entry(token)
.and_modify(|volume| *volume = volume.clone().add(amount.clone()))
.or_insert(amount);
}
volumes
}

/// Send volume of all transactions in block in usd to prometheus
pub async fn calculate_volume_for_block(
storage: &mut StorageProcessor<'_>,
block: &IncompleteBlock,
) {
let volumes = get_volumes(block);
for (token, amount) in volumes.into_iter() {
if let Ok(Some(price)) = storage
.tokens_schema()
.get_historical_ticker_price(token)
.await
{
let labels = vec![("token", format!("{}", token.0))];
let usd_amount = Ratio::from(amount) * price.usd_price;
metrics::gauge!("txs_volume", usd_amount.to_f64().unwrap(), &labels);
}
})
}
}

pub fn run_prometheus_exporter(port: u16) -> JoinHandle<()> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP PROCEDURE unique_users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
create or replace function unique_users(after timestamp, before timestamp)
returns table
(
total bigint
)
language plpgsql
as
$$
begin
return query (
select count(distinct address)
from tx_filters
where tx_hash in (
select tx_hash
from executed_transactions
where success = true
and created_at BETWEEN after AND before
union
select tx_hash
from executed_priority_operations
where created_at BETWEEN after AND before
));
end;
$$;
Loading

0 comments on commit 200442e

Please sign in to comment.