Skip to content

Commit

Permalink
Refactor running server
Browse files Browse the repository at this point in the history
Signed-off-by: deniallugo <[email protected]>
  • Loading branch information
Deniallugo committed Dec 2, 2021
1 parent 3c0d025 commit 156c507
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 101 deletions.
201 changes: 103 additions & 98 deletions core/bin/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use futures::{channel::mpsc, SinkExt, StreamExt};
use futures::{channel::mpsc, executor::block_on, SinkExt, StreamExt};
use std::cell::RefCell;
use std::str::FromStr;

use structopt::StructOpt;

use serde::{Deserialize, Serialize};

use zksync_api::fee_ticker::run_ticker_task;
use zksync_api::fee_ticker::{run_ticker_task, TickerRequest};
use zksync_core::{genesis_init, run_core, wait_for_tasks};
use zksync_eth_client::EthereumGateway;
use zksync_eth_sender::run_eth_sender;
use zksync_forced_exit_requests::run_forced_exit_requests_actors;
use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed;
use zksync_witness_generator::run_prover_server;

use futures::executor::block_on;
use std::cell::RefCell;

use tokio::task::JoinHandle;
use zksync_config::{
configs::api::{
CommonApiConfig, JsonRpcConfig, PrivateApiConfig, ProverApiConfig, RestApiConfig,
Expand All @@ -35,18 +33,20 @@ pub enum ServerCommand {

#[derive(Serialize, Deserialize, Debug, Eq, PartialEq)]
pub enum Component {
// Api components
RestApi,
Web3Api,
RpcApi,
RpcWebSocketApi,

// Core components
EthSender,
Core,

WitnessGenerator,
ForcedExit,
Prometheus,

// Additional components
Prometheus,
RejectedTaskCleaner,
}

Expand All @@ -72,6 +72,7 @@ impl FromStr for Component {

#[derive(Debug)]
struct ComponentsToRun(Vec<Component>);

impl Default for ComponentsToRun {
fn default() -> Self {
Self(vec![
Expand Down Expand Up @@ -115,10 +116,6 @@ struct Opt {
components: ComponentsToRun,
}

trait Runner {
fn run(&self, stop_signal_sender: mpsc::Sender<bool>);
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let opt = Opt::from_args();
Expand Down Expand Up @@ -148,14 +145,15 @@ async fn main() -> anyhow::Result<()> {
async fn run_server(components: &ComponentsToRun) {
let connection_pool = ConnectionPool::new(None);
let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256);
let channel_size = 32768;

let mut tasks = vec![];

if components.0.contains(&Component::Web3Api) {
let config = Web3Config::from_env();
// Run web3 api
tasks.push(zksync_api::api_server::web3::start_rpc_server(
connection_pool.clone(),
&config,
&Web3Config::from_env(),
));
}

Expand All @@ -165,87 +163,64 @@ async fn run_server(components: &ComponentsToRun) {
Component::RpcWebSocketApi | Component::RpcApi | Component::RestApi
)
}) {
let eth_client_config = ETHClientConfig::from_env();
let eth_sender_config = ETHSenderConfig::from_env();
let eth_watch_config = ETHWatchConfig::from_env();
let contracts = ContractsConfig::from_env();
let eth_gateway = EthereumGateway::from_config(
&eth_client_config,
&eth_sender_config,
contracts.contract_addr,
);
// Create gateway
let eth_gateway = create_eth_gateway();

let eth_watch_config = ETHWatchConfig::from_env();
let gateway_watcher_config = GatewayWatcherConfig::from_env();

// Run eth multiplexer
if let Some(task) =
run_gateway_watcher_if_multiplexed(eth_gateway.clone(), &gateway_watcher_config)
{
tasks.push(task);
}

let channel_size = 32768;

let (ticker_request_sender, ticker_request_receiver) = mpsc::channel(channel_size);
let chain_config = ChainConfig::from_env();

let max_blocks_to_aggregate = std::cmp::max(
chain_config.state_keeper.max_aggregated_blocks_to_commit,
chain_config.state_keeper.max_aggregated_blocks_to_execute,
) as u32;
let ticker_config = TickerConfig::from_env();
// Run ticker
let (task, ticker_request_sender) = run_ticker(connection_pool.clone(), channel_size);
tasks.push(task);

let ticker_task = run_ticker_task(
connection_pool.clone(),
ticker_request_receiver,
&ticker_config,
max_blocks_to_aggregate,
);

tasks.push(ticker_task);
// Run signer
let (sign_check_sender, sign_check_receiver) = mpsc::channel(channel_size);

tasks.push(zksync_api::signature_checker::start_sign_checker(
eth_gateway,
sign_check_receiver,
));

let private_config = PrivateApiConfig::from_env();

let contracts_config = ContractsConfig::from_env();
let common_config = CommonApiConfig::from_env();
let chain_config = ChainConfig::from_env();

if components.0.contains(&Component::RpcWebSocketApi) {
let config = JsonRpcConfig::from_env();
tasks.push(zksync_api::api_server::rpc_subscriptions::start_ws_server(
connection_pool.clone(),
sign_check_sender.clone(),
ticker_request_sender.clone(),
&common_config,
&config,
&JsonRpcConfig::from_env(),
chain_config.state_keeper.miniblock_iteration_interval(),
private_config.url.clone(),
eth_watch_config.confirmations_for_eth_event,
));
}

if components.0.contains(&Component::RpcApi) {
let config = JsonRpcConfig::from_env();
tasks.push(zksync_api::api_server::rpc_server::start_rpc_server(
connection_pool.clone(),
sign_check_sender.clone(),
ticker_request_sender.clone(),
&config,
&JsonRpcConfig::from_env(),
&common_config,
private_config.url.clone(),
eth_watch_config.confirmations_for_eth_event,
));
}

if components.0.contains(&Component::RestApi) {
let config = RestApiConfig::from_env();
zksync_api::api_server::rest::start_server_thread_detached(
connection_pool.clone(),
config.bind_addr(),
RestApiConfig::from_env().bind_addr(),
contracts_config.contract_addr,
ticker_request_sender,
sign_check_sender,
Expand All @@ -255,67 +230,29 @@ async fn run_server(components: &ComponentsToRun) {
}

if components.0.contains(&Component::EthSender) {
// Run Ethereum sender actors.
vlog::info!("Starting the Ethereum sender actors");
let eth_client_config = ETHClientConfig::from_env();
let eth_sender_config = ETHSenderConfig::from_env();
let contracts = ContractsConfig::from_env();
let eth_gateway = EthereumGateway::from_config(
&eth_client_config,
&eth_sender_config,
contracts.contract_addr,
);

tasks.push(run_eth_sender(
connection_pool.clone(),
eth_gateway,
eth_sender_config,
));
tasks.push(run_eth_sender(connection_pool.clone()))
}

if components.0.contains(&Component::Core) {
let all_config = ZkSyncConfig::from_env();
let eth_gateway = EthereumGateway::from_config(
&all_config.eth_client,
&all_config.eth_sender,
all_config.contracts.contract_addr,
);
let eth_gateway = create_eth_gateway();

tasks.append(
&mut run_core(connection_pool.clone(), &all_config, eth_gateway.clone())
.await
.unwrap(),
&mut run_core(
connection_pool.clone(),
&ZkSyncConfig::from_env(),
eth_gateway.clone(),
)
.await
.unwrap(),
);
}

if components.0.contains(&Component::WitnessGenerator) {
vlog::info!("Starting the Prover server actors");
let prover_api_config = ProverApiConfig::from_env();
let prover_config = ProverConfig::from_env();
let database = zksync_witness_generator::database::Database::new(connection_pool.clone());
tasks.push(run_prover_server(
database,
prover_api_config,
prover_config,
));
tasks.push(run_witness_generator(connection_pool.clone()))
}

if components.0.contains(&Component::ForcedExit) {
vlog::info!("Starting the ForcedExitRequests actors");
let config = ForcedExitRequestsConfig::from_env();
let common_config = CommonApiConfig::from_env();
let private_api_config = PrivateApiConfig::from_env();
let contract_config = ContractsConfig::from_env();
let eth_client_config = ETHClientConfig::from_env();

tasks.push(run_forced_exit_requests_actors(
connection_pool.clone(),
private_api_config.url,
config,
common_config,
contract_config,
eth_client_config.web3_url(),
))
tasks.push(run_forced_exit(connection_pool.clone()));
}

if components.0.contains(&Component::RejectedTaskCleaner) {
Expand All @@ -341,3 +278,71 @@ async fn run_server(components: &ComponentsToRun) {
}
};
}

pub fn run_forced_exit(connection_pool: ConnectionPool) -> JoinHandle<()> {
vlog::info!("Starting the ForcedExitRequests actors");
let config = ForcedExitRequestsConfig::from_env();
let common_config = CommonApiConfig::from_env();
let private_api_config = PrivateApiConfig::from_env();
let contract_config = ContractsConfig::from_env();
let eth_client_config = ETHClientConfig::from_env();

run_forced_exit_requests_actors(
connection_pool,
private_api_config.url,
config,
common_config,
contract_config,
eth_client_config.web3_url(),
)
}

pub fn run_witness_generator(connection_pool: ConnectionPool) -> JoinHandle<()> {
vlog::info!("Starting the Prover server actors");
let prover_api_config = ProverApiConfig::from_env();
let prover_config = ProverConfig::from_env();
let database = zksync_witness_generator::database::Database::new(connection_pool);
run_prover_server(database, prover_api_config, prover_config)
}

pub fn run_eth_sender(connection_pool: ConnectionPool) -> JoinHandle<()> {
vlog::info!("Starting the Ethereum sender actors");
let eth_client_config = ETHClientConfig::from_env();
let eth_sender_config = ETHSenderConfig::from_env();
let contracts = ContractsConfig::from_env();
let eth_gateway = EthereumGateway::from_config(
&eth_client_config,
&eth_sender_config,
contracts.contract_addr,
);

zksync_eth_sender::run_eth_sender(connection_pool, eth_gateway, eth_sender_config)
}

pub fn run_ticker(
connection_pool: ConnectionPool,
channel_size: usize,
) -> (JoinHandle<()>, mpsc::Sender<TickerRequest>) {
vlog::info!("Starting Ticker actors");
let (ticker_request_sender, ticker_request_receiver) = mpsc::channel(channel_size);
let chain_config = ChainConfig::from_env();
let ticker_config = TickerConfig::from_env();
let task = run_ticker_task(
connection_pool,
ticker_request_receiver,
&ticker_config,
chain_config.max_blocks_to_aggregate(),
);
(task, ticker_request_sender)
}

pub fn create_eth_gateway() -> EthereumGateway {
let eth_client_config = ETHClientConfig::from_env();
let eth_sender_config = ETHSenderConfig::from_env();
let contracts = ContractsConfig::from_env();
EthereumGateway::from_config(
&eth_client_config,
&eth_sender_config,
contracts.contract_addr,
)
}
6 changes: 3 additions & 3 deletions core/bin/zksync_witness_generator/src/tests/prover_server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Built-in deps
use std::{thread, time::Duration};
use std::time::Duration;
// External deps

use num::BigUint;
Expand Down Expand Up @@ -40,12 +40,12 @@ impl Default for MockProverOptions {
async fn spawn_server(database: MockDatabase) {
let prover_options = MockProverOptions::default();

thread::spawn(move || {
tokio::spawn({
run_prover_server(
database,
prover_options.0.api.prover,
prover_options.0.prover,
);
)
});
}

Expand Down
6 changes: 6 additions & 0 deletions core/lib/config/src/configs/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ impl ChainConfig {
state_keeper: envy_load!("state_keeper", "CHAIN_STATE_KEEPER_"),
}
}
pub fn max_blocks_to_aggregate(&self) -> u32 {
std::cmp::max(
self.state_keeper.max_aggregated_blocks_to_commit,
self.state_keeper.max_aggregated_blocks_to_execute,
) as u32
}
}

#[derive(Debug, Deserialize, Clone, PartialEq)]
Expand Down

0 comments on commit 156c507

Please sign in to comment.