From f18c86c9cfb3d50611ae96c51d2a1d6e8330c097 Mon Sep 17 00:00:00 2001 From: Niklas Date: Tue, 7 Sep 2021 13:08:17 +0200 Subject: [PATCH] ref: refactor sync, rpc and miner init into helpers --- bin/snarkos.rs | 136 +++--------------------------------------------- snarkos/init.rs | 133 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 140 insertions(+), 129 deletions(-) diff --git a/bin/snarkos.rs b/bin/snarkos.rs index 656781b406..02d1906f07 100644 --- a/bin/snarkos.rs +++ b/bin/snarkos.rs @@ -22,7 +22,7 @@ use snarkos::{ config::{Config, ConfigCli}, display::{initialize_logger, print_welcome}, errors::NodeError, - init::{init_node, init_storage}, + init::{init_miner, init_node, init_rpc, init_storage, init_sync}, }; use snarkos_consensus::{Consensus, ConsensusParameters, DeserializedLedger, DynLedger, MemoryPool, MerkleLedger}; use snarkos_network::{config::Config as NodeConfig, MinerInstance, Node, NodeType, Sync}; @@ -63,10 +63,9 @@ use tokio::runtime; /// 1. Creates new storage database or uses existing. /// 2. Creates new memory pool or uses existing from storage. /// 3. Creates sync parameters. -/// 4. Creates network server. +/// 4. Creates network server and starts the listener. /// 5. Starts rpc server thread. /// 6. Starts miner thread. -/// 7. Starts network server listener. /// async fn start_server(config: Config) -> anyhow::Result<()> { initialize_logger(&config); @@ -78,113 +77,15 @@ async fn start_server(config: Config) -> anyhow::Result<()> { None => return Ok(()), // Return if no storage was returned (usually in case of validation). }; + let sync = init_sync(&config, storage.clone()).await?; + // Construct the node instance. Note this does not start the network services. // This is done early on, so that the local address can be discovered // before any other object (miner, RPC) needs to use it. let mut node = init_node(&config, storage.clone()).await?; // Enable the sync layer. - { - let memory_pool = MemoryPool::new(); // from_storage(&storage).await?; - - debug!("Loading Aleo parameters..."); - let dpc = >>::load(!config.miner.is_miner)?; - info!("Loaded Aleo parameters"); - - // Fetch the set of valid inner circuit IDs. - let inner_snark_vk: <::InnerSNARK as SNARK>::VerifyingKey = - dpc.inner_snark_parameters.1.clone().into(); - let inner_snark_id = dpc - .system_parameters - .inner_circuit_id_crh - .hash(&to_bytes_le![inner_snark_vk]?)?; - - let authorized_inner_snark_ids = vec![to_bytes_le![inner_snark_id]?]; - - // Set the initial sync parameters. - let consensus_params = ConsensusParameters { - max_block_size: 1_000_000_000usize, - max_nonce: u32::max_value(), - target_block_time: 10i64, - network_id: Network::from_id(config.aleo.network_id), - verifier: PoswMarlin::verify_only().expect("could not instantiate PoSW verifier"), - authorized_inner_snark_ids, - }; - - let ledger_parameters = { - type Parameters = ::MerkleParameters; - let parameters: <::H as CRH>::Parameters = - FromBytes::read_le(&LedgerMerkleTreeParameters::load_bytes()?[..])?; - let crh = ::H::from(parameters); - Arc::new(Parameters::from(crh)) - }; - info!("Loading Ledger"); - let ledger_digests = storage.get_ledger_digests().await?; - let commitments = storage.get_commitments().await?; - let serial_numbers = storage.get_serial_numbers().await?; - let memos = storage.get_memos().await?; - info!("Initializing Ledger"); - let ledger = DynLedger(Box::new(MerkleLedger::new( - ledger_parameters, - &ledger_digests[..], - &commitments[..], - &serial_numbers[..], - &memos[..], - )?)); - - let genesis_block: Block = FromBytes::read_le(GenesisBlock::load_bytes().as_slice())?; - let genesis_block: SerialBlock = as VMBlock>::serialize(&genesis_block)?; - - let consensus = Consensus::new( - consensus_params, - Arc::new(dpc), - genesis_block, - ledger, - storage.clone(), - memory_pool, - ); - info!("Loaded Ledger"); - - if config.storage.scan_for_forks { - storage - .scan_forks(snarkos_consensus::OLDEST_FORK_THRESHOLD as u32) - .await?; - } - - if let Some(import_path) = config.storage.import { - info!("Importing canon blocks from {}", import_path.display()); - - let now = std::time::Instant::now(); - let mut blocks = std::io::Cursor::new(std::fs::read(import_path)?); - - let mut processed = 0usize; - let mut imported = 0usize; - while let Ok(block) = Block::::read_le(&mut blocks) { - let block = as VMBlock>::serialize(&block)?; - // Skip possible duplicate blocks etc. - if consensus.receive_block(block).await { - imported += 1; - } - processed += 1; - } - - info!( - "Processed {} canon blocks ({} imported) in {}ms", - processed, - imported, - now.elapsed().as_millis() - ); - } - - let sync = Sync::new( - consensus, - config.miner.is_miner, - Duration::from_secs(config.p2p.block_sync_interval.into()), - Duration::from_secs(config.p2p.mempool_sync_interval.into()), - ); - - node.set_sync(sync); - } + node.set_sync(sync); // Initialize metrics framework. node.initialize_metrics().await?; @@ -194,37 +95,16 @@ async fn start_server(config: Config) -> anyhow::Result<()> { // Start RPC thread, if the RPC configuration is enabled. if config.rpc.json_rpc { - let rpc_address = format!("{}:{}", config.rpc.ip, config.rpc.port) - .parse() - .expect("Invalid RPC server address!"); - - let rpc_handle = start_rpc_server( - rpc_address, - storage, - node.clone(), - config.rpc.username, - config.rpc.password, - ); - node.register_task(rpc_handle); - - info!("Listening for RPC requests on port {}", config.rpc.port); + let rpc_handle = init_rpc(&config, node.clone(), storage); } // Start the network services node.start_services().await; // Start the miner task if mining configuration is enabled. - tokio::time::sleep(std::time::Duration::from_secs(5)).await; if config.miner.is_miner { - match Address::::from_str(&config.miner.miner_address) { - Ok(miner_address) => { - let handle = MinerInstance::new(miner_address, node.clone()).spawn(); - node.register_task(handle); - } - Err(_) => info!( - "Miner not started. Please specify a valid miner address in your ~/.snarkOS/config.toml file or by using the --miner-address option in the CLI." - ), - } + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + init_miner(&config, node); } std::future::pending::<()>().await; diff --git a/snarkos/init.rs b/snarkos/init.rs index 649c56beac..224a2a2929 100644 --- a/snarkos/init.rs +++ b/snarkos/init.rs @@ -127,6 +127,108 @@ pub async fn init_storage(config: &Config) -> anyhow::Result> Ok(Some(storage)) } +pub async fn init_sync(config: &Config, storage: DynStorage) -> anyhow::Result { + let memory_pool = MemoryPool::new(); // from_storage(&storage).await?; + + debug!("Loading Aleo parameters..."); + let dpc = >>::load(!config.miner.is_miner)?; + info!("Loaded Aleo parameters"); + + // Fetch the set of valid inner circuit IDs. + let inner_snark_vk: <::InnerSNARK as SNARK>::VerifyingKey = + dpc.inner_snark_parameters.1.clone().into(); + let inner_snark_id = dpc + .system_parameters + .inner_circuit_id_crh + .hash(&to_bytes_le![inner_snark_vk]?)?; + + let authorized_inner_snark_ids = vec![to_bytes_le![inner_snark_id]?]; + + // Set the initial sync parameters. + let consensus_params = ConsensusParameters { + max_block_size: 1_000_000_000usize, + max_nonce: u32::max_value(), + target_block_time: 10i64, + network_id: Network::from_id(config.aleo.network_id), + verifier: PoswMarlin::verify_only().expect("could not instantiate PoSW verifier"), + authorized_inner_snark_ids, + }; + + let ledger_parameters = { + type Parameters = ::MerkleParameters; + let parameters: <::H as CRH>::Parameters = + FromBytes::read_le(&LedgerMerkleTreeParameters::load_bytes()?[..])?; + let crh = ::H::from(parameters); + Arc::new(Parameters::from(crh)) + }; + info!("Loading Ledger"); + let ledger_digests = storage.get_ledger_digests().await?; + let commitments = storage.get_commitments().await?; + let serial_numbers = storage.get_serial_numbers().await?; + let memos = storage.get_memos().await?; + info!("Initializing Ledger"); + let ledger = DynLedger(Box::new(MerkleLedger::new( + ledger_parameters, + &ledger_digests[..], + &commitments[..], + &serial_numbers[..], + &memos[..], + )?)); + + let genesis_block: Block = FromBytes::read_le(GenesisBlock::load_bytes().as_slice())?; + let genesis_block: SerialBlock = as VMBlock>::serialize(&genesis_block)?; + + let consensus = Consensus::new( + consensus_params, + Arc::new(dpc), + genesis_block, + ledger, + storage.clone(), + memory_pool, + ); + info!("Loaded Ledger"); + + if config.storage.scan_for_forks { + storage + .scan_forks(snarkos_consensus::OLDEST_FORK_THRESHOLD as u32) + .await?; + } + + if let Some(import_path) = &config.storage.import { + info!("Importing canon blocks from {}", import_path.display()); + + let now = std::time::Instant::now(); + let mut blocks = std::io::Cursor::new(std::fs::read(import_path)?); + + let mut processed = 0usize; + let mut imported = 0usize; + while let Ok(block) = Block::::read_le(&mut blocks) { + let block = as VMBlock>::serialize(&block)?; + // Skip possible duplicate blocks etc. + if consensus.receive_block(block).await { + imported += 1; + } + processed += 1; + } + + info!( + "Processed {} canon blocks ({} imported) in {}ms", + processed, + imported, + now.elapsed().as_millis() + ); + } + + let sync = Sync::new( + consensus, + config.miner.is_miner, + Duration::from_secs(config.p2p.block_sync_interval.into()), + Duration::from_secs(config.p2p.mempool_sync_interval.into()), + ); + + Ok(sync) +} + pub async fn init_node(config: &Config, storage: Option) -> anyhow::Result { let address = format!("{}:{}", config.node.ip, config.node.port); let desired_address = address.parse::()?; @@ -147,4 +249,33 @@ pub async fn init_node(config: &Config, storage: Option) -> anyhow:: Ok(node) } -// pub async fn init_sync(config: &Config) +pub async fn init_rpc(config: &Config, node: Node, storage: Option) -> anyhow::Result<()> { + let rpc_address = format!("{}:{}", config.rpc.ip, config.rpc.port) + .parse() + .expect("Invalid RPC server address!"); + + let rpc_handle = start_rpc_server( + rpc_address, + storage, + node.clone(), + config.rpc.username.clone(), + config.rpc.password.clone(), + ); + node.register_task(rpc_handle); + + info!("Listening for RPC requests on port {}", config.rpc.port); + + Ok(()) +} + +pub fn init_miner(config: &Config, node: Node) { + match Address::::from_str(&config.miner.miner_address) { + Ok(miner_address) => { + let handle = MinerInstance::new(miner_address, node.clone()).spawn(); + node.register_task(handle); + } + Err(_) => info!( + "Miner not started. Please specify a valid miner address in your ~/.snarkOS/config.toml file or by using the --miner-address option in the CLI." + ), + } +}