From 156c507b763699c31ff6921a0adff05cf6872d17 Mon Sep 17 00:00:00 2001 From: deniallugo Date: Thu, 2 Dec 2021 16:02:40 +0100 Subject: [PATCH] Refactor running server Signed-off-by: deniallugo --- core/bin/server/src/main.rs | 201 +++++++++--------- .../src/tests/prover_server.rs | 6 +- core/lib/config/src/configs/chain.rs | 6 + 3 files changed, 112 insertions(+), 101 deletions(-) diff --git a/core/bin/server/src/main.rs b/core/bin/server/src/main.rs index e686088b59..d536140797 100644 --- a/core/bin/server/src/main.rs +++ b/core/bin/server/src/main.rs @@ -1,21 +1,19 @@ -use futures::{channel::mpsc, SinkExt, StreamExt}; +use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt}; +use std::cell::RefCell; use std::str::FromStr; use structopt::StructOpt; use serde::{Deserialize, Serialize}; -use zksync_api::fee_ticker::run_ticker_task; +use zksync_api::fee_ticker::{run_ticker_task, TickerRequest}; use zksync_core::{genesis_init, run_core, wait_for_tasks}; use zksync_eth_client::EthereumGateway; -use zksync_eth_sender::run_eth_sender; use zksync_forced_exit_requests::run_forced_exit_requests_actors; use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed; use zksync_witness_generator::run_prover_server; -use futures::executor::block_on; -use std::cell::RefCell; - +use tokio::task::JoinHandle; use zksync_config::{ configs::api::{ CommonApiConfig, JsonRpcConfig, PrivateApiConfig, ProverApiConfig, RestApiConfig, @@ -35,18 +33,20 @@ pub enum ServerCommand { #[derive(Serialize, Deserialize, Debug, Eq, PartialEq)] pub enum Component { + // Api components RestApi, Web3Api, RpcApi, RpcWebSocketApi, + // Core components EthSender, Core, - WitnessGenerator, ForcedExit, - Prometheus, + // Additional components + Prometheus, RejectedTaskCleaner, } @@ -72,6 +72,7 @@ impl FromStr for Component { #[derive(Debug)] struct ComponentsToRun(Vec); + impl Default for ComponentsToRun { fn default() -> Self { Self(vec![ @@ -115,10 +116,6 @@ struct Opt { components: ComponentsToRun, } -trait Runner { - fn run(&self, stop_signal_sender: mpsc::Sender); -} - #[tokio::main] async fn main() -> anyhow::Result<()> { let opt = Opt::from_args(); @@ -148,14 +145,15 @@ async fn main() -> anyhow::Result<()> { async fn run_server(components: &ComponentsToRun) { let connection_pool = ConnectionPool::new(None); let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256); + let channel_size = 32768; let mut tasks = vec![]; if components.0.contains(&Component::Web3Api) { - let config = Web3Config::from_env(); + // Run web3 api tasks.push(zksync_api::api_server::web3::start_rpc_server( connection_pool.clone(), - &config, + &Web3Config::from_env(), )); } @@ -165,63 +163,42 @@ async fn run_server(components: &ComponentsToRun) { Component::RpcWebSocketApi | Component::RpcApi | Component::RestApi ) }) { - let eth_client_config = ETHClientConfig::from_env(); - let eth_sender_config = ETHSenderConfig::from_env(); - let eth_watch_config = ETHWatchConfig::from_env(); - let contracts = ContractsConfig::from_env(); - let eth_gateway = EthereumGateway::from_config( - ð_client_config, - ð_sender_config, - contracts.contract_addr, - ); + // Create gateway + let eth_gateway = create_eth_gateway(); + let eth_watch_config = ETHWatchConfig::from_env(); let gateway_watcher_config = GatewayWatcherConfig::from_env(); + // Run eth multiplexer if let Some(task) = run_gateway_watcher_if_multiplexed(eth_gateway.clone(), &gateway_watcher_config) { tasks.push(task); } - let channel_size = 32768; - - let (ticker_request_sender, ticker_request_receiver) = mpsc::channel(channel_size); - let chain_config = ChainConfig::from_env(); - - let max_blocks_to_aggregate = std::cmp::max( - chain_config.state_keeper.max_aggregated_blocks_to_commit, - chain_config.state_keeper.max_aggregated_blocks_to_execute, - ) as u32; - let ticker_config = TickerConfig::from_env(); + // Run ticker + let (task, ticker_request_sender) = run_ticker(connection_pool.clone(), channel_size); + tasks.push(task); - let ticker_task = run_ticker_task( - connection_pool.clone(), - ticker_request_receiver, - &ticker_config, - max_blocks_to_aggregate, - ); - - tasks.push(ticker_task); + // Run signer let (sign_check_sender, sign_check_receiver) = mpsc::channel(channel_size); - tasks.push(zksync_api::signature_checker::start_sign_checker( eth_gateway, sign_check_receiver, )); let private_config = PrivateApiConfig::from_env(); - let contracts_config = ContractsConfig::from_env(); let common_config = CommonApiConfig::from_env(); + let chain_config = ChainConfig::from_env(); if components.0.contains(&Component::RpcWebSocketApi) { - let config = JsonRpcConfig::from_env(); tasks.push(zksync_api::api_server::rpc_subscriptions::start_ws_server( connection_pool.clone(), sign_check_sender.clone(), ticker_request_sender.clone(), &common_config, - &config, + &JsonRpcConfig::from_env(), chain_config.state_keeper.miniblock_iteration_interval(), private_config.url.clone(), eth_watch_config.confirmations_for_eth_event, @@ -229,12 +206,11 @@ async fn run_server(components: &ComponentsToRun) { } if components.0.contains(&Component::RpcApi) { - let config = JsonRpcConfig::from_env(); tasks.push(zksync_api::api_server::rpc_server::start_rpc_server( connection_pool.clone(), sign_check_sender.clone(), ticker_request_sender.clone(), - &config, + &JsonRpcConfig::from_env(), &common_config, private_config.url.clone(), eth_watch_config.confirmations_for_eth_event, @@ -242,10 +218,9 @@ async fn run_server(components: &ComponentsToRun) { } if components.0.contains(&Component::RestApi) { - let config = RestApiConfig::from_env(); zksync_api::api_server::rest::start_server_thread_detached( connection_pool.clone(), - config.bind_addr(), + RestApiConfig::from_env().bind_addr(), contracts_config.contract_addr, ticker_request_sender, sign_check_sender, @@ -255,67 +230,29 @@ async fn run_server(components: &ComponentsToRun) { } if components.0.contains(&Component::EthSender) { - // Run Ethereum sender actors. - vlog::info!("Starting the Ethereum sender actors"); - let eth_client_config = ETHClientConfig::from_env(); - let eth_sender_config = ETHSenderConfig::from_env(); - let contracts = ContractsConfig::from_env(); - let eth_gateway = EthereumGateway::from_config( - ð_client_config, - ð_sender_config, - contracts.contract_addr, - ); - - tasks.push(run_eth_sender( - connection_pool.clone(), - eth_gateway, - eth_sender_config, - )); + tasks.push(run_eth_sender(connection_pool.clone())) } if components.0.contains(&Component::Core) { - let all_config = ZkSyncConfig::from_env(); - let eth_gateway = EthereumGateway::from_config( - &all_config.eth_client, - &all_config.eth_sender, - all_config.contracts.contract_addr, - ); + let eth_gateway = create_eth_gateway(); tasks.append( - &mut run_core(connection_pool.clone(), &all_config, eth_gateway.clone()) - .await - .unwrap(), + &mut run_core( + connection_pool.clone(), + &ZkSyncConfig::from_env(), + eth_gateway.clone(), + ) + .await + .unwrap(), ); } if components.0.contains(&Component::WitnessGenerator) { - vlog::info!("Starting the Prover server actors"); - let prover_api_config = ProverApiConfig::from_env(); - let prover_config = ProverConfig::from_env(); - let database = zksync_witness_generator::database::Database::new(connection_pool.clone()); - tasks.push(run_prover_server( - database, - prover_api_config, - prover_config, - )); + tasks.push(run_witness_generator(connection_pool.clone())) } if components.0.contains(&Component::ForcedExit) { - vlog::info!("Starting the ForcedExitRequests actors"); - let config = ForcedExitRequestsConfig::from_env(); - let common_config = CommonApiConfig::from_env(); - let private_api_config = PrivateApiConfig::from_env(); - let contract_config = ContractsConfig::from_env(); - let eth_client_config = ETHClientConfig::from_env(); - - tasks.push(run_forced_exit_requests_actors( - connection_pool.clone(), - private_api_config.url, - config, - common_config, - contract_config, - eth_client_config.web3_url(), - )) + tasks.push(run_forced_exit(connection_pool.clone())); } if components.0.contains(&Component::RejectedTaskCleaner) { @@ -341,3 +278,71 @@ async fn run_server(components: &ComponentsToRun) { } }; } + +pub fn run_forced_exit(connection_pool: ConnectionPool) -> JoinHandle<()> { + vlog::info!("Starting the ForcedExitRequests actors"); + let config = ForcedExitRequestsConfig::from_env(); + let common_config = CommonApiConfig::from_env(); + let private_api_config = PrivateApiConfig::from_env(); + let contract_config = ContractsConfig::from_env(); + let eth_client_config = ETHClientConfig::from_env(); + + run_forced_exit_requests_actors( + connection_pool, + private_api_config.url, + config, + common_config, + contract_config, + eth_client_config.web3_url(), + ) +} + +pub fn run_witness_generator(connection_pool: ConnectionPool) -> JoinHandle<()> { + vlog::info!("Starting the Prover server actors"); + let prover_api_config = ProverApiConfig::from_env(); + let prover_config = ProverConfig::from_env(); + let database = zksync_witness_generator::database::Database::new(connection_pool); + run_prover_server(database, prover_api_config, prover_config) +} + +pub fn run_eth_sender(connection_pool: ConnectionPool) -> JoinHandle<()> { + vlog::info!("Starting the Ethereum sender actors"); + let eth_client_config = ETHClientConfig::from_env(); + let eth_sender_config = ETHSenderConfig::from_env(); + let contracts = ContractsConfig::from_env(); + let eth_gateway = EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ); + + zksync_eth_sender::run_eth_sender(connection_pool, eth_gateway, eth_sender_config) +} + +pub fn run_ticker( + connection_pool: ConnectionPool, + channel_size: usize, +) -> (JoinHandle<()>, mpsc::Sender) { + vlog::info!("Starting Ticker actors"); + let (ticker_request_sender, ticker_request_receiver) = mpsc::channel(channel_size); + let chain_config = ChainConfig::from_env(); + let ticker_config = TickerConfig::from_env(); + let task = run_ticker_task( + connection_pool, + ticker_request_receiver, + &ticker_config, + chain_config.max_blocks_to_aggregate(), + ); + (task, ticker_request_sender) +} + +pub fn create_eth_gateway() -> EthereumGateway { + let eth_client_config = ETHClientConfig::from_env(); + let eth_sender_config = ETHSenderConfig::from_env(); + let contracts = ContractsConfig::from_env(); + EthereumGateway::from_config( + ð_client_config, + ð_sender_config, + contracts.contract_addr, + ) +} diff --git a/core/bin/zksync_witness_generator/src/tests/prover_server.rs b/core/bin/zksync_witness_generator/src/tests/prover_server.rs index 930b846fee..1035673fd5 100644 --- a/core/bin/zksync_witness_generator/src/tests/prover_server.rs +++ b/core/bin/zksync_witness_generator/src/tests/prover_server.rs @@ -1,5 +1,5 @@ // Built-in deps -use std::{thread, time::Duration}; +use std::time::Duration; // External deps use num::BigUint; @@ -40,12 +40,12 @@ impl Default for MockProverOptions { async fn spawn_server(database: MockDatabase) { let prover_options = MockProverOptions::default(); - thread::spawn(move || { + tokio::spawn({ run_prover_server( database, prover_options.0.api.prover, prover_options.0.prover, - ); + ) }); } diff --git a/core/lib/config/src/configs/chain.rs b/core/lib/config/src/configs/chain.rs index c05e030c18..c7e638109b 100644 --- a/core/lib/config/src/configs/chain.rs +++ b/core/lib/config/src/configs/chain.rs @@ -26,6 +26,12 @@ impl ChainConfig { state_keeper: envy_load!("state_keeper", "CHAIN_STATE_KEEPER_"), } } + pub fn max_blocks_to_aggregate(&self) -> u32 { + std::cmp::max( + self.state_keeper.max_aggregated_blocks_to_commit, + self.state_keeper.max_aggregated_blocks_to_execute, + ) as u32 + } } #[derive(Debug, Deserialize, Clone, PartialEq)]