diff --git a/node/bft/ledger-service/src/ledger.rs b/node/bft/ledger-service/src/ledger.rs index cae2da6695..d744fed7f8 100644 --- a/node/bft/ledger-service/src/ledger.rs +++ b/node/bft/ledger-service/src/ledger.rs @@ -12,11 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::LedgerService; +use crate::{spawn_blocking, LedgerService}; use snarkvm::{ ledger::{ block::{Block, Transaction}, - coinbase::{ProverSolution, PuzzleCommitment}, + coinbase::{CoinbaseVerifyingKey, ProverSolution, PuzzleCommitment}, committee::Committee, narwhal::{Data, Subdag, Transmission, TransmissionID}, store::ConsensusStorage, @@ -27,17 +27,19 @@ use snarkvm::{ use indexmap::IndexMap; use snarkvm::prelude::narwhal::BatchCertificate; -use std::{fmt, ops::Range}; +use std::{fmt, ops::Range, sync::Arc}; /// A core ledger service. pub struct CoreLedgerService> { ledger: Ledger, + coinbase_verifying_key: Arc>, } impl> CoreLedgerService { /// Initializes a new core ledger service. pub fn new(ledger: Ledger) -> Self { - Self { ledger } + let coinbase_verifying_key = Arc::new(ledger.coinbase_puzzle().coinbase_verifying_key().clone()); + Self { ledger, coinbase_verifying_key } } } @@ -169,22 +171,21 @@ impl> LedgerService for CoreLedgerService< solution: Data>, ) -> Result<()> { // Deserialize the solution. - let solution = tokio::task::spawn_blocking(move || solution.deserialize_blocking()).await??; + let solution = spawn_blocking!(solution.deserialize_blocking())?; // Ensure the puzzle commitment matches in the solution. if puzzle_commitment != solution.commitment() { bail!("Invalid solution - expected {puzzle_commitment}, found {}", solution.commitment()); } // Retrieve the coinbase verifying key. - let coinbase_verifying_key = self.ledger.coinbase_puzzle().coinbase_verifying_key(); + let coinbase_verifying_key = self.coinbase_verifying_key.clone(); // Compute the current epoch challenge. let epoch_challenge = self.ledger.latest_epoch_challenge()?; // Retrieve the current proof target. let proof_target = self.ledger.latest_proof_target(); // Ensure that the prover solution is valid for the given epoch. - // TODO(ljedrz): check if this operation requires a blocking task. - if !solution.verify(coinbase_verifying_key, &epoch_challenge, proof_target)? { + if !spawn_blocking!(solution.verify(&coinbase_verifying_key, &epoch_challenge, proof_target))? { bail!("Invalid prover solution '{puzzle_commitment}' for the current epoch."); } Ok(()) @@ -197,14 +198,14 @@ impl> LedgerService for CoreLedgerService< transaction: Data>, ) -> Result<()> { // Deserialize the transaction. - let transaction = tokio::task::spawn_blocking(move || transaction.deserialize_blocking()).await??; + let transaction = spawn_blocking!(transaction.deserialize_blocking())?; // Ensure the transaction ID matches in the transaction. if transaction_id != transaction.id() { bail!("Invalid transaction - expected {transaction_id}, found {}", transaction.id()); } // Check the transaction is well-formed. - // TODO(ljedrz): check if this operation requires a blocking task. - self.ledger.check_transaction_basic(&transaction, None) + let ledger = self.ledger.clone(); + spawn_blocking!(ledger.check_transaction_basic(&transaction, None)) } /// Checks the given block is valid next block. diff --git a/node/bft/ledger-service/src/lib.rs b/node/bft/ledger-service/src/lib.rs index 74a5a8a0d5..9a19b681df 100644 --- a/node/bft/ledger-service/src/lib.rs +++ b/node/bft/ledger-service/src/lib.rs @@ -49,3 +49,14 @@ pub fn fmt_id(id: impl ToString) -> String { } formatted_id } + +/// A helper macro to spawn a blocking task. +#[macro_export] +macro_rules! spawn_blocking { + ($expr:expr) => { + match tokio::task::spawn_blocking(move || $expr).await { + Ok(value) => value, + Err(error) => Err(snarkvm::prelude::anyhow!("[tokio::spawn_blocking] {error}")), + } + }; +} diff --git a/node/bft/src/helpers/ready.rs b/node/bft/src/helpers/ready.rs index 304c63a55f..09bff73dab 100644 --- a/node/bft/src/helpers/ready.rs +++ b/node/bft/src/helpers/ready.rs @@ -12,7 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use crate::helpers::Storage; use snarkvm::{ console::prelude::*, ledger::{ @@ -28,16 +27,21 @@ use std::sync::Arc; #[derive(Clone, Debug)] pub struct Ready { - /// The storage. - storage: Storage, /// The current map of `(transmission ID, transmission)` entries. transmissions: Arc, Transmission>>>, } +impl Default for Ready { + /// Initializes a new instance of the ready queue. + fn default() -> Self { + Self::new() + } +} + impl Ready { /// Initializes a new instance of the ready queue. - pub fn new(storage: Storage) -> Self { - Self { storage, transmissions: Default::default() } + pub fn new() -> Self { + Self { transmissions: Default::default() } } /// Returns `true` if the ready queue is empty. @@ -107,27 +111,14 @@ impl Ready { /// Returns `true` if the transmission is new, and was added to the ready queue. pub fn insert(&self, transmission_id: impl Into>, transmission: Transmission) -> bool { let transmission_id = transmission_id.into(); - - // TODO (howardwu): Ensure it does not exist in the ledger. Add a ledger service. - // Determine if the transmission is new. - let is_new = !self.contains(transmission_id) && !self.storage.contains_transmission(transmission_id); - // If the transmission is new, insert it. - if is_new { - // Insert the transmission ID. - self.transmissions.write().insert(transmission_id, transmission); - } + // Insert the transmission ID. + let is_new = self.transmissions.write().insert(transmission_id, transmission).is_none(); // Return whether the transmission is new. is_new } - /// Retains the transmissions that satisfy the specified predicate. - pub fn retain(&self, predicate: impl FnMut(&TransmissionID, &mut Transmission) -> bool) { - // Retain the transmissions. - self.transmissions.write().retain(predicate); - } - - /// Removes the specified number of transmissions and returns them. - pub fn take(&self, num_transmissions: usize) -> IndexMap, Transmission> { + /// Removes up to the specified number of transmissions and returns them. + pub fn drain(&self, num_transmissions: usize) -> IndexMap, Transmission> { // Acquire the write lock. let mut transmissions = self.transmissions.write(); // Determine the number of transmissions to drain. @@ -140,8 +131,6 @@ impl Ready { #[cfg(test)] mod tests { use super::*; - use crate::helpers::Storage; - use snarkos_node_bft_ledger_service::MockLedgerService; use snarkvm::ledger::{coinbase::PuzzleCommitment, narwhal::Data}; use ::bytes::Bytes; @@ -155,14 +144,8 @@ mod tests { // Sample random fake bytes. let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::()).collect::>())); - // Sample a committee. - let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); - // Initialize the ledger. - let ledger = Arc::new(MockLedgerService::new(committee)); - // Initialize the storage. - let storage = Storage::::new(ledger, 1); // Initialize the ready queue. - let ready = Ready::::new(storage); + let ready = Ready::::new(); // Initialize the commitments. let commitment_1 = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen())); @@ -198,7 +181,7 @@ mod tests { assert_eq!(ready.get(commitment_unknown), None); // Drain the ready queue. - let transmissions = ready.take(3); + let transmissions = ready.drain(3); // Check the number of transmissions. assert!(ready.is_empty()); @@ -224,14 +207,8 @@ mod tests { rng.fill_bytes(&mut vec); let data = Data::Buffer(Bytes::from(vec)); - // Sample a committee. - let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng); - // Initialize the ledger. - let ledger = Arc::new(MockLedgerService::new(committee)); - // Initialize the storage. - let storage = Storage::::new(ledger, 1); // Initialize the ready queue. - let ready = Ready::::new(storage); + let ready = Ready::::new(); // Initialize the commitments. let commitment = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen())); diff --git a/node/bft/src/primary.rs b/node/bft/src/primary.rs index 1fbd22faf3..068334173a 100644 --- a/node/bft/src/primary.rs +++ b/node/bft/src/primary.rs @@ -373,7 +373,42 @@ impl Primary { // Take the transmissions from the workers. let mut transmissions: IndexMap<_, _> = Default::default(); for worker in self.workers.iter() { - transmissions.extend(worker.take_candidates(num_transmissions_per_worker).await); + for (id, transmission) in worker.drain(num_transmissions_per_worker) { + // Check if the transmission has been stored already. + if self.storage.contains_transmission(id) { + trace!("Proposing - Skipping transmission '{}' - Already in a certificate", fmt_id(id)); + continue; + } + // Check if the ledger already contains the transmission. + if self.ledger.contains_transmission(&id).unwrap_or(true) { + trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id)); + continue; + } + // Check the transmission is still valid. + match (id, transmission.clone()) { + (TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => { + // Check if the solution is still valid. + if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await { + trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id)); + continue; + } + } + (TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => { + // Check if the transaction is still valid. + if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { + trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id)); + continue; + } + } + // Note: We explicitly forbid including ratifications, + // as the protocol currently does not support ratifications. + (TransmissionID::Ratification, Transmission::Ratification) => continue, + // All other combinations are clearly invalid. + _ => continue, + } + // Insert the transmission into the map. + transmissions.insert(id, transmission); + } } trace!("Proposing - {} transmissions", transmissions.len()); // Determine if there is at least one unconfirmed transaction to propose. diff --git a/node/bft/src/worker.rs b/node/bft/src/worker.rs index e2fc66d20d..0fad7207ae 100644 --- a/node/bft/src/worker.rs +++ b/node/bft/src/worker.rs @@ -32,7 +32,6 @@ use snarkvm::{ }, }; -use futures::StreamExt; use indexmap::{IndexMap, IndexSet}; use parking_lot::Mutex; use std::{future::Future, net::SocketAddr, sync::Arc, time::Duration}; @@ -75,10 +74,10 @@ impl Worker { Ok(Self { id, gateway, - storage: storage.clone(), + storage, ledger, proposed_batch, - ready: Ready::new(storage), + ready: Default::default(), pending: Default::default(), handles: Default::default(), }) @@ -192,44 +191,9 @@ impl Worker { Ok((transmission_id, transmission)) } - /// Removes the specified number of transmissions from the ready queue, and returns them. - pub(crate) async fn take_candidates( - &self, - num_transmissions: usize, - ) -> impl Iterator, Transmission)> { - // Iterate through the ready transmissions, and determine which should be retained. - let keep = futures::stream::iter(self.ready.transmissions()) - .filter_map(|(id, transmission)| async move { - // Check if the transmission has been stored already. - if self.storage.contains_transmission(id) { - return None; - } - // Check if the proposed batch already contains the transmission. - if let Some(proposed_batch) = self.proposed_batch.read().as_ref() { - if proposed_batch.contains_transmission(id) { - return None; - } - } - // Check if the ledger already contains the transmission. - if self.ledger.contains_transmission(&id).unwrap_or(false) { - return None; - } - // If the transmission is a solution, ensure the solution is still valid. - if let (TransmissionID::Solution(commitment), Transmission::Solution(solution)) = (id, transmission) { - // Check if the solution is still valid. - if self.ledger.check_solution_basic(commitment, solution).await.is_err() { - return None; - } - } - Some(id) - }) - .collect::>() - .await; - - // Retain the transmissions that are not in the storage or ledger. - self.ready.retain(|id, _| keep.contains(id)); - // Remove the specified number of transmissions from the ready queue. - self.ready.take(num_transmissions).into_iter() + /// Removes up to the specified number of transmissions from the ready queue, and returns them. + pub(crate) fn drain(&self, num_transmissions: usize) -> impl Iterator, Transmission)> { + self.ready.drain(num_transmissions).into_iter() } /// Reinserts the specified transmission into the ready queue. @@ -271,12 +235,13 @@ impl Worker { if self.ready.num_transmissions() > MAX_TRANSMISSIONS_PER_WORKER { return Ok(()); } - trace!("Worker {} - Found a new transmission ID '{}' from '{peer_ip}'", self.id, fmt_id(transmission_id)); // Send an transmission request to the peer. let (candidate_id, transmission) = self.send_transmission_request(peer_ip, transmission_id).await?; // Ensure the transmission ID matches. ensure!(candidate_id == transmission_id, "Invalid transmission ID"); // Insert the transmission into the ready queue. + // Note: This method checks `contains_transmission` again, because by the time the transmission is fetched, + // it could have already been inserted into the ready queue. self.process_transmission_from_peer(peer_ip, transmission_id, transmission); Ok(()) } @@ -288,10 +253,22 @@ impl Worker { transmission_id: TransmissionID, transmission: Transmission, ) { - // Check if the transmission ID exists. - if !self.contains_transmission(transmission_id) { - // Insert the transmission into the ready queue. - self.ready.insert(transmission_id, transmission); + // If the transmission ID already exists, then do not store it. + if self.contains_transmission(transmission_id) { + return; + } + // Ensure the transmission ID and transmission type matches. + let is_well_formed = match (&transmission_id, &transmission) { + (TransmissionID::Solution(_), Transmission::Solution(_)) => true, + (TransmissionID::Transaction(_), Transmission::Transaction(_)) => true, + // Note: We explicitly forbid inserting ratifications into the ready queue, + // as the protocol currently does not support ratifications. + (TransmissionID::Ratification, Transmission::Ratification) => false, + // All other combinations are clearly invalid. + _ => false, + }; + // If the transmission ID and transmission type matches, then insert the transmission into the ready queue. + if is_well_formed && self.ready.insert(transmission_id, transmission) { trace!("Worker {} - Added transmission '{}' from '{peer_ip}'", self.id, fmt_id(transmission_id)); } } @@ -303,22 +280,22 @@ impl Worker { puzzle_commitment: PuzzleCommitment, prover_solution: Data>, ) -> Result<()> { - // Construct the transmission. - let transmission = Transmission::Solution(prover_solution.clone()); - // Remove the puzzle commitment from the pending queue. - self.pending.remove(puzzle_commitment, Some(transmission.clone())); - // Check if the solution exists. if self.contains_transmission(puzzle_commitment) { bail!("Solution '{}' already exists.", fmt_id(puzzle_commitment)); } // Check that the solution is well-formed and unique. - if let Err(e) = self.ledger.check_solution_basic(puzzle_commitment, prover_solution).await { + if let Err(e) = self.ledger.check_solution_basic(puzzle_commitment, prover_solution.clone()).await { bail!("Invalid unconfirmed solution '{}': {e}", fmt_id(puzzle_commitment)); } + // Construct the transmission. + let transmission = Transmission::Solution(prover_solution); + // Remove the puzzle commitment from the pending queue. + self.pending.remove(puzzle_commitment, Some(transmission.clone())); // Adds the prover solution to the ready queue. - self.ready.insert(puzzle_commitment, transmission); - trace!("Worker {} - Added unconfirmed solution '{}'", self.id, fmt_id(puzzle_commitment)); + if self.ready.insert(puzzle_commitment, transmission) { + trace!("Worker {} - Added unconfirmed solution '{}'", self.id, fmt_id(puzzle_commitment)); + } Ok(()) } @@ -328,22 +305,22 @@ impl Worker { transaction_id: N::TransactionID, transaction: Data>, ) -> Result<()> { - // Construct the transmission. - let transmission = Transmission::Transaction(transaction.clone()); - // Remove the transaction from the pending queue. - self.pending.remove(&transaction_id, Some(transmission.clone())); - // Check if the transaction ID exists. if self.contains_transmission(&transaction_id) { bail!("Transaction '{}' already exists.", fmt_id(transaction_id)); } // Check that the transaction is well-formed and unique. - if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await { + if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await { bail!("Invalid unconfirmed transaction '{}': {e}", fmt_id(transaction_id)); } + // Construct the transmission. + let transmission = Transmission::Transaction(transaction); + // Remove the transaction from the pending queue. + self.pending.remove(&transaction_id, Some(transmission.clone())); // Adds the transaction to the ready queue. - self.ready.insert(&transaction_id, transmission); - trace!("Worker {} - Added unconfirmed transaction '{}'", self.id, fmt_id(transaction_id)); + if self.ready.insert(&transaction_id, transmission) { + trace!("Worker {} - Added unconfirmed transaction '{}'", self.id, fmt_id(transaction_id)); + } Ok(()) } } @@ -548,7 +525,7 @@ mod tests { assert!(worker.ready.contains(transmission_id)); assert_eq!(worker.get_transmission(transmission_id), Some(transmission)); // Take the transmission from the ready set. - let transmission: Vec<_> = worker.take_candidates(1).await.collect(); + let transmission: Vec<_> = worker.drain(1).collect(); assert_eq!(transmission.len(), 1); assert!(!worker.ready.contains(transmission_id)); }