Skip to content

Commit

Permalink
ref: refactor sync, rpc and miner init into helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
niklaslong committed Sep 16, 2021
1 parent ec06d66 commit f18c86c
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 129 deletions.
136 changes: 8 additions & 128 deletions bin/snarkos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use snarkos::{
config::{Config, ConfigCli},
display::{initialize_logger, print_welcome},
errors::NodeError,
init::{init_node, init_storage},
init::{init_miner, init_node, init_rpc, init_storage, init_sync},
};
use snarkos_consensus::{Consensus, ConsensusParameters, DeserializedLedger, DynLedger, MemoryPool, MerkleLedger};
use snarkos_network::{config::Config as NodeConfig, MinerInstance, Node, NodeType, Sync};
Expand Down Expand Up @@ -63,10 +63,9 @@ use tokio::runtime;
/// 1. Creates new storage database or uses existing.
/// 2. Creates new memory pool or uses existing from storage.
/// 3. Creates sync parameters.
/// 4. Creates network server.
/// 4. Creates network server and starts the listener.
/// 5. Starts rpc server thread.
/// 6. Starts miner thread.
/// 7. Starts network server listener.
///
async fn start_server(config: Config) -> anyhow::Result<()> {
initialize_logger(&config);
Expand All @@ -78,113 +77,15 @@ async fn start_server(config: Config) -> anyhow::Result<()> {
None => return Ok(()), // Return if no storage was returned (usually in case of validation).
};

let sync = init_sync(&config, storage.clone()).await?;

// Construct the node instance. Note this does not start the network services.
// This is done early on, so that the local address can be discovered
// before any other object (miner, RPC) needs to use it.
let mut node = init_node(&config, storage.clone()).await?;

// Enable the sync layer.
{
let memory_pool = MemoryPool::new(); // from_storage(&storage).await?;

debug!("Loading Aleo parameters...");
let dpc = <Testnet1DPC as DPCScheme<DeserializedLedger<'_, Components>>>::load(!config.miner.is_miner)?;
info!("Loaded Aleo parameters");

// Fetch the set of valid inner circuit IDs.
let inner_snark_vk: <<Components as Testnet1Components>::InnerSNARK as SNARK>::VerifyingKey =
dpc.inner_snark_parameters.1.clone().into();
let inner_snark_id = dpc
.system_parameters
.inner_circuit_id_crh
.hash(&to_bytes_le![inner_snark_vk]?)?;

let authorized_inner_snark_ids = vec![to_bytes_le![inner_snark_id]?];

// Set the initial sync parameters.
let consensus_params = ConsensusParameters {
max_block_size: 1_000_000_000usize,
max_nonce: u32::max_value(),
target_block_time: 10i64,
network_id: Network::from_id(config.aleo.network_id),
verifier: PoswMarlin::verify_only().expect("could not instantiate PoSW verifier"),
authorized_inner_snark_ids,
};

let ledger_parameters = {
type Parameters = <Components as Testnet1Components>::MerkleParameters;
let parameters: <<Parameters as MerkleParameters>::H as CRH>::Parameters =
FromBytes::read_le(&LedgerMerkleTreeParameters::load_bytes()?[..])?;
let crh = <Parameters as MerkleParameters>::H::from(parameters);
Arc::new(Parameters::from(crh))
};
info!("Loading Ledger");
let ledger_digests = storage.get_ledger_digests().await?;
let commitments = storage.get_commitments().await?;
let serial_numbers = storage.get_serial_numbers().await?;
let memos = storage.get_memos().await?;
info!("Initializing Ledger");
let ledger = DynLedger(Box::new(MerkleLedger::new(
ledger_parameters,
&ledger_digests[..],
&commitments[..],
&serial_numbers[..],
&memos[..],
)?));

let genesis_block: Block<Testnet1Transaction> = FromBytes::read_le(GenesisBlock::load_bytes().as_slice())?;
let genesis_block: SerialBlock = <Block<Testnet1Transaction> as VMBlock>::serialize(&genesis_block)?;

let consensus = Consensus::new(
consensus_params,
Arc::new(dpc),
genesis_block,
ledger,
storage.clone(),
memory_pool,
);
info!("Loaded Ledger");

if config.storage.scan_for_forks {
storage
.scan_forks(snarkos_consensus::OLDEST_FORK_THRESHOLD as u32)
.await?;
}

if let Some(import_path) = config.storage.import {
info!("Importing canon blocks from {}", import_path.display());

let now = std::time::Instant::now();
let mut blocks = std::io::Cursor::new(std::fs::read(import_path)?);

let mut processed = 0usize;
let mut imported = 0usize;
while let Ok(block) = Block::<Testnet1Transaction>::read_le(&mut blocks) {
let block = <Block<Testnet1Transaction> as VMBlock>::serialize(&block)?;
// Skip possible duplicate blocks etc.
if consensus.receive_block(block).await {
imported += 1;
}
processed += 1;
}

info!(
"Processed {} canon blocks ({} imported) in {}ms",
processed,
imported,
now.elapsed().as_millis()
);
}

let sync = Sync::new(
consensus,
config.miner.is_miner,
Duration::from_secs(config.p2p.block_sync_interval.into()),
Duration::from_secs(config.p2p.mempool_sync_interval.into()),
);

node.set_sync(sync);
}
node.set_sync(sync);

// Initialize metrics framework.
node.initialize_metrics().await?;
Expand All @@ -194,37 +95,16 @@ async fn start_server(config: Config) -> anyhow::Result<()> {

// Start RPC thread, if the RPC configuration is enabled.
if config.rpc.json_rpc {
let rpc_address = format!("{}:{}", config.rpc.ip, config.rpc.port)
.parse()
.expect("Invalid RPC server address!");

let rpc_handle = start_rpc_server(
rpc_address,
storage,
node.clone(),
config.rpc.username,
config.rpc.password,
);
node.register_task(rpc_handle);

info!("Listening for RPC requests on port {}", config.rpc.port);
let rpc_handle = init_rpc(&config, node.clone(), storage);
}

// Start the network services
node.start_services().await;

// Start the miner task if mining configuration is enabled.
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if config.miner.is_miner {
match Address::<Components>::from_str(&config.miner.miner_address) {
Ok(miner_address) => {
let handle = MinerInstance::new(miner_address, node.clone()).spawn();
node.register_task(handle);
}
Err(_) => info!(
"Miner not started. Please specify a valid miner address in your ~/.snarkOS/config.toml file or by using the --miner-address option in the CLI."
),
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
init_miner(&config, node);
}

std::future::pending::<()>().await;
Expand Down
133 changes: 132 additions & 1 deletion snarkos/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,108 @@ pub async fn init_storage(config: &Config) -> anyhow::Result<Option<DynStorage>>
Ok(Some(storage))
}

pub async fn init_sync(config: &Config, storage: DynStorage) -> anyhow::Result<Sync> {
let memory_pool = MemoryPool::new(); // from_storage(&storage).await?;

debug!("Loading Aleo parameters...");
let dpc = <Testnet1DPC as DPCScheme<DeserializedLedger<'_, Components>>>::load(!config.miner.is_miner)?;
info!("Loaded Aleo parameters");

// Fetch the set of valid inner circuit IDs.
let inner_snark_vk: <<Components as Testnet1Components>::InnerSNARK as SNARK>::VerifyingKey =
dpc.inner_snark_parameters.1.clone().into();
let inner_snark_id = dpc
.system_parameters
.inner_circuit_id_crh
.hash(&to_bytes_le![inner_snark_vk]?)?;

let authorized_inner_snark_ids = vec![to_bytes_le![inner_snark_id]?];

// Set the initial sync parameters.
let consensus_params = ConsensusParameters {
max_block_size: 1_000_000_000usize,
max_nonce: u32::max_value(),
target_block_time: 10i64,
network_id: Network::from_id(config.aleo.network_id),
verifier: PoswMarlin::verify_only().expect("could not instantiate PoSW verifier"),
authorized_inner_snark_ids,
};

let ledger_parameters = {
type Parameters = <Components as Testnet1Components>::MerkleParameters;
let parameters: <<Parameters as MerkleParameters>::H as CRH>::Parameters =
FromBytes::read_le(&LedgerMerkleTreeParameters::load_bytes()?[..])?;
let crh = <Parameters as MerkleParameters>::H::from(parameters);
Arc::new(Parameters::from(crh))
};
info!("Loading Ledger");
let ledger_digests = storage.get_ledger_digests().await?;
let commitments = storage.get_commitments().await?;
let serial_numbers = storage.get_serial_numbers().await?;
let memos = storage.get_memos().await?;
info!("Initializing Ledger");
let ledger = DynLedger(Box::new(MerkleLedger::new(
ledger_parameters,
&ledger_digests[..],
&commitments[..],
&serial_numbers[..],
&memos[..],
)?));

let genesis_block: Block<Testnet1Transaction> = FromBytes::read_le(GenesisBlock::load_bytes().as_slice())?;
let genesis_block: SerialBlock = <Block<Testnet1Transaction> as VMBlock>::serialize(&genesis_block)?;

let consensus = Consensus::new(
consensus_params,
Arc::new(dpc),
genesis_block,
ledger,
storage.clone(),
memory_pool,
);
info!("Loaded Ledger");

if config.storage.scan_for_forks {
storage
.scan_forks(snarkos_consensus::OLDEST_FORK_THRESHOLD as u32)
.await?;
}

if let Some(import_path) = &config.storage.import {
info!("Importing canon blocks from {}", import_path.display());

let now = std::time::Instant::now();
let mut blocks = std::io::Cursor::new(std::fs::read(import_path)?);

let mut processed = 0usize;
let mut imported = 0usize;
while let Ok(block) = Block::<Testnet1Transaction>::read_le(&mut blocks) {
let block = <Block<Testnet1Transaction> as VMBlock>::serialize(&block)?;
// Skip possible duplicate blocks etc.
if consensus.receive_block(block).await {
imported += 1;
}
processed += 1;
}

info!(
"Processed {} canon blocks ({} imported) in {}ms",
processed,
imported,
now.elapsed().as_millis()
);
}

let sync = Sync::new(
consensus,
config.miner.is_miner,
Duration::from_secs(config.p2p.block_sync_interval.into()),
Duration::from_secs(config.p2p.mempool_sync_interval.into()),
);

Ok(sync)
}

pub async fn init_node(config: &Config, storage: Option<DynStorage>) -> anyhow::Result<Node> {
let address = format!("{}:{}", config.node.ip, config.node.port);
let desired_address = address.parse::<SocketAddr>()?;
Expand All @@ -147,4 +249,33 @@ pub async fn init_node(config: &Config, storage: Option<DynStorage>) -> anyhow::
Ok(node)
}

// pub async fn init_sync(config: &Config)
pub async fn init_rpc(config: &Config, node: Node, storage: Option<DynStorage>) -> anyhow::Result<()> {
let rpc_address = format!("{}:{}", config.rpc.ip, config.rpc.port)
.parse()
.expect("Invalid RPC server address!");

let rpc_handle = start_rpc_server(
rpc_address,
storage,
node.clone(),
config.rpc.username.clone(),
config.rpc.password.clone(),
);
node.register_task(rpc_handle);

info!("Listening for RPC requests on port {}", config.rpc.port);

Ok(())
}

pub fn init_miner(config: &Config, node: Node) {
match Address::<Components>::from_str(&config.miner.miner_address) {
Ok(miner_address) => {
let handle = MinerInstance::new(miner_address, node.clone()).spawn();
node.register_task(handle);
}
Err(_) => info!(
"Miner not started. Please specify a valid miner address in your ~/.snarkOS/config.toml file or by using the --miner-address option in the CLI."
),
}
}

0 comments on commit f18c86c

Please sign in to comment.