Skip to content

Commit

Permalink
Fix some small review issues
Browse files Browse the repository at this point in the history
Signed-off-by: deniallugo <[email protected]>
  • Loading branch information
Deniallugo committed Mar 22, 2022
1 parent c0f16d6 commit cb1e330
Show file tree
Hide file tree
Showing 12 changed files with 221 additions and 186 deletions.
2 changes: 1 addition & 1 deletion core/bin/block_revert/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn revert_blocks_in_storage(
transaction
.chain()
.state_schema()
.clean_current_nonce_table(last_block)
.clear_current_nonce_table(last_block)
.await?;
println!("`committed_nonce` table is updated");
transaction
Expand Down
27 changes: 22 additions & 5 deletions core/bin/zksync_core/src/private_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
//! for correctness.
use std::thread;
use std::time::{Duration, Instant};

use actix_web::{web, App, HttpResponse, HttpServer};
use futures::{channel::mpsc, StreamExt};
use tokio::sync::RwLock;
use tokio::task::JoinHandle;
use zksync_api_types::CoreStatus;

Expand All @@ -19,17 +21,30 @@ use zksync_eth_client::EthereumGateway;
use zksync_storage::ConnectionPool;
use zksync_utils::panic_notify::ThreadPanicNotify;

#[derive(Debug, Clone)]
const STATUS_INVALIDATION_PERIOD: Duration = Duration::from_secs(60);

#[derive(Debug)]
struct AppState {
connection_pool: ConnectionPool,
read_only_connection_pool: ConnectionPool,
eth_client: EthereumGateway,
status_cache: RwLock<Option<(CoreStatus, Instant)>>,
}

/// Health check.
/// The core actor should have a connection to main and replica database and have connection to eth
/// The core actor is expected have connection to web3 and both main/replica databases
#[actix_web::get("/status")]
async fn status(data: web::Data<AppState>) -> actix_web::Result<HttpResponse> {
if let Some((status, data)) = data.status_cache.read().await.as_ref() {
if data.elapsed() < STATUS_INVALIDATION_PERIOD {
return Ok(HttpResponse::Ok().json(status.clone()));
}
}

// We need to get a lock here so we don't abuse the database and eth node connections
// with multiple requests from other API nodes when the cache has been invalidated.

let mut status = data.status_cache.write().await;
let main_database_status = data.connection_pool.access_storage().await.is_ok();
let replica_database_status = data
.read_only_connection_pool
Expand All @@ -39,10 +54,11 @@ async fn status(data: web::Data<AppState>) -> actix_web::Result<HttpResponse> {
let eth_status = data.eth_client.block_number().await.is_ok();

let response = CoreStatus {
main_database_status,
replica_database_status,
eth_status,
main_database_available: main_database_status,
replica_database_available: replica_database_status,
web3_available: eth_status,
};
*status = Some((response.clone(), Instant::now()));

Ok(HttpResponse::Ok().json(response))
}
Expand All @@ -68,6 +84,7 @@ pub fn start_private_core_api(
connection_pool: connection_pool.clone(),
read_only_connection_pool: read_only_connection_pool.clone(),
eth_client: eth_client.clone(),
status_cache: Default::default(),
};

// By calling `register_data` instead of `data` we're avoiding double
Expand Down
27 changes: 13 additions & 14 deletions core/bin/zksync_core/src/state_keeper/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,25 +286,22 @@ impl ZkSyncStateKeeper {
self.root_hash_queue.throttle().await;
metrics::histogram!("state_keeper.throttle", start.elapsed());

self.commit_new_tx_mini_batch().await;
}
}
let block_timestamp = self.pending_block.timestamp;
let proposed_block = self.propose_new_block(block_timestamp).await;

async fn commit_new_tx_mini_batch(&mut self) {
let block_timestamp = self.pending_block.timestamp;
let proposed_block = self.propose_new_block(block_timestamp).await;

self.execute_proposed_block(proposed_block).await;
self.execute_proposed_block(proposed_block).await;
}
}

async fn propose_new_block(&mut self, block_timestamp: u64) -> ProposedBlock {
let start = Instant::now();

let (response_sender, receiver) = oneshot::channel();

// These txs will be excluded from query result as already executed.
// Using these hashes we avoid the situation, when tx is still in mempool,
// but was executed in mempool

let tx_hashes = self
// By giving these hashes to the mempool,
// we won't receive back transactions that we already executed in the current block,
let executed_txs = self
.pending_block
.failed_txs
.iter()
Expand All @@ -321,15 +318,17 @@ impl ZkSyncStateKeeper {
last_priority_op_number: self.pending_block.unprocessed_priority_op_current,
block_timestamp,
response_sender,
executed_txs: tx_hashes,
executed_txs,
});

self.tx_for_mempool
.send(mempool_req)
.await
.expect("mempool receiver dropped");

receiver.await.expect("Mempool new block request failed")
let block = receiver.await.expect("Mempool new block request failed");
metrics::histogram!("state_keeper.propose_new_block", start.elapsed());
block
}

async fn execute_incomplete_block(&mut self, block: IncompleteBlock) {
Expand Down
6 changes: 3 additions & 3 deletions core/lib/api_types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum PriorityOpLookupQuery {
/// and connection to the ethereum node
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct CoreStatus {
pub main_database_status: bool,
pub replica_database_status: bool,
pub eth_status: bool,
pub main_database_available: bool,
pub replica_database_available: bool,
pub web3_available: bool,
}
Loading

0 comments on commit cb1e330

Please sign in to comment.