Skip to content

Commit

Permalink
Add prover storage functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
howardwu committed Nov 30, 2021
1 parent 598a81c commit 1e28753
Show file tree
Hide file tree
Showing 15 changed files with 87 additions and 57 deletions.
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
**/target
**/.DS_Store
**/.ledger*
**/.prover*
**inner.proving*
**outer.proving*
**posw.proving*
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
**/target
**/.DS_Store
**/.ledger*
**/.prover*
**inner.proving*
**outer.proving*
**posw.proving*
Expand Down
34 changes: 17 additions & 17 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ default = []
test = []

[dependencies]
snarkvm = { git = "https://github.com/AleoHQ/snarkVM.git", rev = "29bfb3a" }
snarkvm = { git = "https://github.com/AleoHQ/snarkVM.git", rev = "8937da0" }
#snarkvm = { path = "../snarkVM" }

bytes = "1.0.0"
Expand All @@ -38,7 +38,7 @@ path = "./storage"
version = "2.0.0"

[dependencies.aleo-std]
version = "0.1.1"
version = "0.1.4"

[dependencies.anyhow]
version = "1"
Expand Down
2 changes: 1 addition & 1 deletion src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub trait Environment: 'static + Clone + Debug + Default + Send + Sync {
const SYNC_NODES: [&'static str; 13] = ["127.0.0.1:4131", "127.0.0.1:4133", "127.0.0.1:4134", "127.0.0.1:4135", "127.0.0.1:4136", "127.0.0.1:4137", "127.0.0.1:4138", "127.0.0.1:4139", "127.0.0.1:4140", "127.0.0.1:4141", "127.0.0.1:4142", "127.0.0.1:4143", "127.0.0.1:4144"];

/// The duration in seconds to sleep in between heartbeat executions.
const HEARTBEAT_IN_SECS: u64 = 9;
const HEARTBEAT_IN_SECS: u64 = 12;
/// The maximum duration in seconds permitted for establishing a connection with a node,
/// before dropping the connection; it should be no greater than the `HEARTBEAT_IN_SECS`.
const CONNECTION_TIMEOUT_IN_SECS: u64 = 1;
Expand Down
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ fn main() -> Result<()> {
.enable_all()
.thread_stack_size(8 * 1024 * 1024)
.worker_threads((num_cpus::get() / 8 * 2).max(1))
.max_blocking_threads((num_cpus::get() / 8).max(1))
.max_blocking_threads((num_cpus::get() / 8 * 2).max(1))
.build()?;

// Initialize the parallelization parameters.
rayon::ThreadPoolBuilder::new()
.stack_size(8 * 1024 * 1024)
.num_threads((num_cpus::get() / 8 * 3).max(1))
.num_threads((num_cpus::get() / 8 * 5).max(1))
.build_global()
.unwrap();

Expand Down
18 changes: 15 additions & 3 deletions src/network/prover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ use crate::{
PeersRequest,
PeersRouter,
};
use snarkos_storage::{storage::Storage, ProverState};
use snarkvm::dpc::prelude::*;

use anyhow::Result;
use rand::thread_rng;
use rayon::{ThreadPool, ThreadPoolBuilder};
use std::{
net::SocketAddr,
path::Path,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
Expand Down Expand Up @@ -65,6 +67,8 @@ pub enum ProverRequest<N: Network> {
///
#[derive(Debug)]
pub struct Prover<N: Network, E: Environment> {
/// The state storage of the prover.
state: Arc<ProverState<N>>,
/// The thread pool for the miner.
miner: Arc<ThreadPool>,
/// The prover router of the node.
Expand All @@ -85,8 +89,9 @@ pub struct Prover<N: Network, E: Environment> {

impl<N: Network, E: Environment> Prover<N, E> {
/// Initializes a new instance of the prover.
pub async fn new(
pub async fn open<S: Storage, P: AsRef<Path> + Copy>(
tasks: &mut Tasks<JoinHandle<()>>,
path: P,
miner: Option<Address<N>>,
local_ip: SocketAddr,
status: &Status,
Expand All @@ -105,6 +110,7 @@ impl<N: Network, E: Environment> Prover<N, E> {

// Initialize the prover.
let prover = Arc::new(Self {
state: Arc::new(ProverState::open_writer::<S, P>(path)?),
miner: Arc::new(pool),
prover_router,
memory_pool: RwLock::new(MemoryPool::new()),
Expand Down Expand Up @@ -149,6 +155,7 @@ impl<N: Network, E: Environment> Prover<N, E> {
prover.status.update(State::Mining);

// Prepare the unconfirmed transactions, terminator, and status.
let state = prover.state.clone();
let miner = prover.miner.clone();
let canon = prover.ledger_reader.clone(); // This is *safe* as the ledger only reads.
let unconfirmed_transactions = prover.memory_pool.read().await.transactions();
Expand Down Expand Up @@ -177,12 +184,17 @@ impl<N: Network, E: Environment> Prover<N, E> {
status.update(State::Ready);

match result {
Ok(Ok(block)) => {
Ok(Ok((block, coinbase_record))) => {
debug!("Miner has found an unconfirmed candidate for block {}", block.height());
// Store the coinbase record.
if let Err(error) = state.add_coinbase_record(&coinbase_record) {
warn!("[Miner] Failed to store coinbase record - {}", error);
}

// Broadcast the next block.
let request = LedgerRequest::UnconfirmedBlock(local_ip, block, prover_router.clone());
if let Err(error) = ledger_router.send(request).await {
warn!("Failed to broadcast mined block: {}", error);
warn!("Failed to broadcast mined block - {}", error);
}
}
Ok(Err(error)) | Err(error) => trace!("{}", error),
Expand Down
9 changes: 6 additions & 3 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@ impl<N: Network, E: Environment> Server<N, E> {
};

// Initialize the ledger storage path.
let storage_path = node.storage_path(local_ip);
let ledger_storage_path = node.ledger_storage_path(local_ip);
// Initialize the prover storage path.
let prover_storage_path = node.prover_storage_path(local_ip);
// Initialize the status indicator.
let status = Status::new();
// Initialize the terminator bit.
Expand All @@ -77,10 +79,11 @@ impl<N: Network, E: Environment> Server<N, E> {
// Initialize a new instance for managing peers.
let peers = Peers::new(&mut tasks, local_ip, None, &status).await;
// Initialize a new instance for managing the ledger.
let ledger = Ledger::<N, E>::open::<RocksDB, _>(&mut tasks, &storage_path, &status, &terminator, peers.router()).await?;
let ledger = Ledger::<N, E>::open::<RocksDB, _>(&mut tasks, &ledger_storage_path, &status, &terminator, peers.router()).await?;
// Initialize a new instance for managing the prover.
let prover = Prover::new(
let prover = Prover::open::<RocksDB, _>(
&mut tasks,
&prover_storage_path,
miner,
local_ip,
&status,
Expand Down
17 changes: 15 additions & 2 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ impl Node {
}
}

/// Returns the storage path of the node.
pub(crate) fn storage_path(&self, _local_ip: SocketAddr) -> PathBuf {
/// Returns the storage path of the ledger.
pub(crate) fn ledger_storage_path(&self, _local_ip: SocketAddr) -> PathBuf {
cfg_if::cfg_if! {
if #[cfg(feature = "test")] {
// Tests may use any available ports, and removes the storage artifacts afterwards,
Expand All @@ -113,6 +113,19 @@ impl Node {
}
}

/// Returns the storage path of the prover.
pub(crate) fn prover_storage_path(&self, _local_ip: SocketAddr) -> PathBuf {
cfg_if::cfg_if! {
if #[cfg(feature = "test")] {
// Tests may use any available ports, and removes the storage artifacts afterwards,
// so that there is no need to adhere to a specific number assignment logic.
PathBuf::from(format!("/tmp/snarkos-test-prover-{}", _local_ip.port()))
} else {
aleo_std::aleo_prover_dir(self.network, self.dev)
}
}
}

async fn start_server<N: Network, E: Environment>(&self) -> Result<()> {
let miner = match (E::NODE_TYPE, &self.miner) {
(NodeType::Miner, Some(address)) => {
Expand Down
8 changes: 4 additions & 4 deletions src/rpc/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ mod tests {
let address = account.address();

// Mine the next block.
let block_1 = ledger_state
let (block_1, _) = ledger_state
.mine_next_block(address, true, &[], &terminator, rng)
.expect("Failed to mine");
ledger_state.add_next_block(&block_1).expect("Failed to add next block to ledger");
Expand Down Expand Up @@ -829,7 +829,7 @@ mod tests {
let address = account.address();

// Mine the next block.
let block_1 = ledger_state
let (block_1, _) = ledger_state
.mine_next_block(address, true, &[], &terminator, rng)
.expect("Failed to mine");
ledger_state.add_next_block(&block_1).expect("Failed to add next block to ledger");
Expand Down Expand Up @@ -989,7 +989,7 @@ mod tests {
let address = account.address();

// Mine the next block.
let block_1 = ledger_state
let (block_1, _) = ledger_state
.mine_next_block(address, true, &[], &terminator, &mut rng)
.expect("Failed to mine");
ledger_state.add_next_block(&block_1).expect("Failed to add next block to ledger");
Expand Down Expand Up @@ -1161,7 +1161,7 @@ mod tests {
let address = account.address();

// Initialize a new transaction.
let transaction = Transaction::<Testnet2>::new_coinbase(address, AleoAmount(1234), true, &mut rng)
let (transaction, _) = Transaction::<Testnet2>::new_coinbase(address, AleoAmount(1234), true, &mut rng)
.expect("Failed to create a coinbase transaction");

// Initialize a new rpc.
Expand Down
2 changes: 1 addition & 1 deletion storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ license = "GPL-3.0"
edition = "2018"

[dependencies]
snarkvm = { git = "https://github.com/AleoHQ/snarkVM.git", rev = "29bfb3a" }
snarkvm = { git = "https://github.com/AleoHQ/snarkVM.git", rev = "8937da0" }
#snarkvm = { path = "../../snarkVM" }

[dependencies.anyhow]
Expand Down
Loading

0 comments on commit 1e28753

Please sign in to comment.