Skip to content

Commit

Permalink
Merge #2200
Browse files Browse the repository at this point in the history
2200: Reorganize core private api r=Deniallugo a=Deniallugo



Co-authored-by: deniallugo <[email protected]>
Co-authored-by: Danil <[email protected]>
  • Loading branch information
bors-matterlabs-dev[bot] and Deniallugo authored Mar 22, 2022
2 parents cbd242c + 207df37 commit 6701628
Show file tree
Hide file tree
Showing 78 changed files with 1,878 additions and 1,939 deletions.
23 changes: 23 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
# Libraries
"core/lib/token_db_cache",
"core/lib/circuit",
"core/lib/mempool",
"core/lib/eth_client",
"core/lib/eth_signer",
"core/lib/gateway_watcher",
Expand Down
6 changes: 6 additions & 0 deletions core/bin/block_revert/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ async fn revert_blocks_in_storage(
.return_executed_txs_to_mempool(last_block)
.await?;
println!("`mempool_txs`, `executed_transactions` tables are updated");
transaction
.chain()
.state_schema()
.clear_current_nonce_table(last_block)
.await?;
println!("`committed_nonce` table is updated");
transaction
.chain()
.block_schema()
Expand Down
2 changes: 2 additions & 0 deletions core/bin/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ zksync_forced_exit_requests = { path = "../zksync_forced_exit_requests", version

zksync_prometheus_exporter = { path = "../../lib/prometheus_exporter", version = "1.0" }
zksync_config = { path = "../../lib/config", version = "1.0" }

zksync_mempool = { path = "../../lib/mempool", version = "1.0" }
zksync_storage = { path = "../../lib/storage", version = "1.0" }
zksync_gateway_watcher = { path = "../../lib/gateway_watcher", version = "1.0" }
zksync_utils = { path = "../../lib/utils", version = "1.0" }
Expand Down
78 changes: 55 additions & 23 deletions core/bin/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,19 @@ use zksync_gateway_watcher::run_gateway_watcher_if_multiplexed;
use zksync_witness_generator::run_prover_server;

use tokio::task::JoinHandle;
use zksync_config::configs::api::PrometheusConfig;
use zksync_config::configs::api::{PrivateApiConfig, PrometheusConfig};
use zksync_config::{
configs::api::{
CommonApiConfig, JsonRpcConfig, PrivateApiConfig, ProverApiConfig, RestApiConfig,
Web3Config,
},
configs::api::{CommonApiConfig, JsonRpcConfig, ProverApiConfig, RestApiConfig, Web3Config},
ChainConfig, ContractsConfig, DBConfig, ETHClientConfig, ETHSenderConfig, ETHWatchConfig,
ForcedExitRequestsConfig, GatewayWatcherConfig, ProverConfig, TickerConfig, ZkSyncConfig,
};
use zksync_core::rejected_tx_cleaner::run_rejected_tx_cleaner;
use zksync_mempool::run_mempool_tx_handler;
use zksync_prometheus_exporter::{run_operation_counter, run_prometheus_exporter};
use zksync_storage::ConnectionPool;

const DEFAULT_CHANNEL_CAPACITY: usize = 32_768;

#[derive(Debug, Clone, Copy)]
pub enum ServerCommand {
Genesis,
Expand Down Expand Up @@ -152,8 +152,8 @@ async fn main() -> anyhow::Result<()> {

async fn run_server(components: &ComponentsToRun) {
let connection_pool = ConnectionPool::new(None);
let read_only_connection_pool = ConnectionPool::new_readonly_pool(None);
let (stop_signal_sender, mut stop_signal_receiver) = mpsc::channel(256);
let channel_size = 32768;

let mut tasks = vec![];

Expand Down Expand Up @@ -192,58 +192,81 @@ async fn run_server(components: &ComponentsToRun) {
}

// Run signer
let (sign_check_sender, sign_check_receiver) = mpsc::channel(channel_size);
let (sign_check_sender, sign_check_receiver) = mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
tasks.push(zksync_api::signature_checker::start_sign_checker(
eth_gateway,
sign_check_receiver,
));

let private_config = PrivateApiConfig::from_env();
let contracts_config = ContractsConfig::from_env();
let common_config = CommonApiConfig::from_env();
let chain_config = ChainConfig::from_env();
let fee_ticker_config = TickerConfig::from_env();
let ticker_info = Box::new(TickerInfo::new(connection_pool.clone()));
let ticker_info = Box::new(TickerInfo::new(read_only_connection_pool.clone()));

let ticker = FeeTicker::new_with_default_validator(
ticker_info,
fee_ticker_config,
chain_config.max_blocks_to_aggregate(),
connection_pool.clone(),
read_only_connection_pool.clone(),
);

if components.0.contains(&Component::RpcWebSocketApi) {
tasks.push(zksync_api::api_server::rpc_subscriptions::start_ws_server(
let (mempool_tx_request_sender, mempool_tx_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
tasks.push(run_mempool_tx_handler(
connection_pool.clone(),
mempool_tx_request_receiver,
chain_config.state_keeper.block_chunk_sizes.clone(),
));
tasks.push(zksync_api::api_server::rpc_subscriptions::start_ws_server(
read_only_connection_pool.clone(),
sign_check_sender.clone(),
ticker.clone(),
&common_config,
&JsonRpcConfig::from_env(),
chain_config.state_keeper.miniblock_iteration_interval(),
private_config.url.clone(),
mempool_tx_request_sender,
eth_watch_config.confirmations_for_eth_event,
));
}

if components.0.contains(&Component::RpcApi) {
tasks.push(zksync_api::api_server::rpc_server::start_rpc_server(
let (mempool_tx_request_sender, mempool_tx_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
tasks.push(run_mempool_tx_handler(
connection_pool.clone(),
mempool_tx_request_receiver,
chain_config.state_keeper.block_chunk_sizes.clone(),
));
tasks.push(zksync_api::api_server::rpc_server::start_rpc_server(
read_only_connection_pool.clone(),
sign_check_sender.clone(),
ticker.clone(),
&JsonRpcConfig::from_env(),
&common_config,
private_config.url,
mempool_tx_request_sender,
eth_watch_config.confirmations_for_eth_event,
));
}

if components.0.contains(&Component::RestApi) {
zksync_api::api_server::rest::start_server_thread_detached(
let (mempool_tx_request_sender, mempool_tx_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
tasks.push(run_mempool_tx_handler(
connection_pool.clone(),
mempool_tx_request_receiver,
chain_config.state_keeper.block_chunk_sizes,
));
let private_config = PrivateApiConfig::from_env();
zksync_api::api_server::rest::start_server_thread_detached(
read_only_connection_pool.clone(),
RestApiConfig::from_env().bind_addr(),
contracts_config.contract_addr,
ticker,
sign_check_sender,
mempool_tx_request_sender,
private_config.url,
);
}
}
Expand All @@ -258,6 +281,7 @@ async fn run_server(components: &ComponentsToRun) {
tasks.append(
&mut run_core(
connection_pool.clone(),
read_only_connection_pool.clone(),
&ZkSyncConfig::from_env(),
eth_gateway.clone(),
)
Expand All @@ -277,13 +301,13 @@ async fn run_server(components: &ComponentsToRun) {
tasks.push(prometheus_task_handle);
// We can run them only with active prometheus
if components.0.contains(&Component::PrometheusPeriodicMetrics) {
let counter_task_handle = run_operation_counter(connection_pool.clone());
let counter_task_handle = run_operation_counter(read_only_connection_pool.clone());
tasks.push(counter_task_handle);
}
}

if components.0.contains(&Component::ForcedExit) {
tasks.push(run_forced_exit(connection_pool.clone()));
tasks.append(&mut run_forced_exit(connection_pool.clone()));
}

if components.0.contains(&Component::RejectedTaskCleaner) {
Expand All @@ -310,22 +334,30 @@ async fn run_server(components: &ComponentsToRun) {
};
}

pub fn run_forced_exit(connection_pool: ConnectionPool) -> JoinHandle<()> {
pub fn run_forced_exit(connection_pool: ConnectionPool) -> Vec<JoinHandle<()>> {
vlog::info!("Starting the ForcedExitRequests actors");
let config = ForcedExitRequestsConfig::from_env();
let common_config = CommonApiConfig::from_env();
let private_api_config = PrivateApiConfig::from_env();
let contract_config = ContractsConfig::from_env();
let eth_client_config = ETHClientConfig::from_env();

run_forced_exit_requests_actors(
let chain_config = ChainConfig::from_env();

let (mempool_tx_request_sender, mempool_tx_request_receiver) =
mpsc::channel(DEFAULT_CHANNEL_CAPACITY);
let mempool_task = run_mempool_tx_handler(
connection_pool.clone(),
mempool_tx_request_receiver,
chain_config.state_keeper.block_chunk_sizes,
);
let forced_exit_task = run_forced_exit_requests_actors(
connection_pool,
private_api_config.url,
mempool_tx_request_sender,
config,
common_config,
contract_config,
eth_client_config.web3_url(),
)
);
vec![mempool_task, forced_exit_task]
}

pub fn run_witness_generator(connection_pool: ConnectionPool) -> JoinHandle<()> {
Expand Down
1 change: 1 addition & 0 deletions core/bin/zksync_api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ zksync_storage = { path = "../../lib/storage", version = "1.0" }
zksync_token_db_cache = { path = "../../lib/token_db_cache", version = "1.0" }

zksync_crypto = { path = "../../lib/crypto", version = "1.0" }
zksync_mempool = { path = "../../lib/mempool", version = "1.0" }
zksync_config = { path = "../../lib/config", version = "1.0" }
zksync_utils = { path = "../../lib/utils", version = "1.0" }
zksync_contracts = { path = "../../lib/contracts", version = "1.0" }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl SumbitErrorCode {
SubmitError::IncorrectTx(_) => Self::IncorrectTx,
SubmitError::TxAdd(_) => Self::TxAdd,
SubmitError::InappropriateFeeToken => Self::InappropriateFeeToken,
SubmitError::CommunicationCoreServer(_) => Self::CommunicationCoreServer,
SubmitError::MempoolCommunication(_) => Self::CommunicationCoreServer,
SubmitError::Internal(_) => Self::Internal,
SubmitError::Other(_) => Self::Other,
SubmitError::Toggle2FA(_) => Self::Other,
Expand Down
20 changes: 17 additions & 3 deletions core/bin/zksync_api/src/api_server/rest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ use crate::signature_checker::VerifySignatureRequest;

use super::tx_sender::TxSender;

use crate::api_server::rest::network_status::SharedNetworkStatus;
use crate::fee_ticker::FeeTicker;
use tokio::task::JoinHandle;
use zksync_config::ZkSyncConfig;
use zksync_mempool::MempoolTransactionRequest;

mod forced_exit_requests;
mod helpers;
Expand All @@ -27,6 +29,7 @@ async fn start_server(
fee_ticker: FeeTicker,
sign_verifier: mpsc::Sender<VerifySignatureRequest>,
bind_to: SocketAddr,
mempool_tx_sender: mpsc::Sender<MempoolTransactionRequest>,
) {
HttpServer::new(move || {
let api_v01 = api_v01.clone();
Expand All @@ -48,7 +51,7 @@ async fn start_server(
sign_verifier.clone(),
fee_ticker.clone(),
&api_v01.config.api.common,
api_v01.config.api.private.url.clone(),
mempool_tx_sender.clone(),
);
v02::api_scope(tx_sender, &api_v01.config, api_v01.network_status.clone())
};
Expand Down Expand Up @@ -87,6 +90,8 @@ pub fn start_server_thread_detached(
contract_address: H160,
fee_ticker: FeeTicker,
sign_verifier: mpsc::Sender<VerifySignatureRequest>,
mempool_tx_sender: mpsc::Sender<MempoolTransactionRequest>,
core_address: String,
) -> JoinHandle<()> {
let (handler, panic_sender) = spawn_panic_handler();

Expand All @@ -99,10 +104,19 @@ pub fn start_server_thread_detached(
// TODO remove this config ZKS-815
let config = ZkSyncConfig::from_env();

let api_v01 = ApiV01::new(connection_pool, contract_address, config);
let network_status = SharedNetworkStatus::new(core_address);
let api_v01 =
ApiV01::new(connection_pool, contract_address, config, network_status);
api_v01.spawn_network_status_updater(panic_sender);

start_server(api_v01, fee_ticker, sign_verifier, listen_addr).await;
start_server(
api_v01,
fee_ticker,
sign_verifier,
listen_addr,
mempool_tx_sender.clone(),
)
.await;
});
})
.expect("Api server thread");
Expand Down
Loading

0 comments on commit 6701628

Please sign in to comment.