Skip to content

Commit

Permalink
Move transmission filtering into proposal
Browse files Browse the repository at this point in the history
  • Loading branch information
howardwu committed Oct 22, 2023
1 parent a829212 commit e22cb34
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 114 deletions.
23 changes: 12 additions & 11 deletions node/bft/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::LedgerService;
use crate::{spawn_blocking, LedgerService};
use snarkvm::{
ledger::{
block::{Block, Transaction},
coinbase::{ProverSolution, PuzzleCommitment},
coinbase::{CoinbaseVerifyingKey, ProverSolution, PuzzleCommitment},
committee::Committee,
narwhal::{Data, Subdag, Transmission, TransmissionID},
store::ConsensusStorage,
Expand All @@ -27,17 +27,19 @@ use snarkvm::{

use indexmap::IndexMap;
use snarkvm::prelude::narwhal::BatchCertificate;
use std::{fmt, ops::Range};
use std::{fmt, ops::Range, sync::Arc};

/// A core ledger service.
pub struct CoreLedgerService<N: Network, C: ConsensusStorage<N>> {
ledger: Ledger<N, C>,
coinbase_verifying_key: Arc<CoinbaseVerifyingKey<N>>,
}

impl<N: Network, C: ConsensusStorage<N>> CoreLedgerService<N, C> {
/// Initializes a new core ledger service.
pub fn new(ledger: Ledger<N, C>) -> Self {
Self { ledger }
let coinbase_verifying_key = Arc::new(ledger.coinbase_puzzle().coinbase_verifying_key().clone());
Self { ledger, coinbase_verifying_key }
}
}

Expand Down Expand Up @@ -169,22 +171,21 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
solution: Data<ProverSolution<N>>,
) -> Result<()> {
// Deserialize the solution.
let solution = tokio::task::spawn_blocking(move || solution.deserialize_blocking()).await??;
let solution = spawn_blocking!(solution.deserialize_blocking())?;
// Ensure the puzzle commitment matches in the solution.
if puzzle_commitment != solution.commitment() {
bail!("Invalid solution - expected {puzzle_commitment}, found {}", solution.commitment());
}

// Retrieve the coinbase verifying key.
let coinbase_verifying_key = self.ledger.coinbase_puzzle().coinbase_verifying_key();
let coinbase_verifying_key = self.coinbase_verifying_key.clone();
// Compute the current epoch challenge.
let epoch_challenge = self.ledger.latest_epoch_challenge()?;
// Retrieve the current proof target.
let proof_target = self.ledger.latest_proof_target();

// Ensure that the prover solution is valid for the given epoch.
// TODO(ljedrz): check if this operation requires a blocking task.
if !solution.verify(coinbase_verifying_key, &epoch_challenge, proof_target)? {
if !spawn_blocking!(solution.verify(&coinbase_verifying_key, &epoch_challenge, proof_target))? {
bail!("Invalid prover solution '{puzzle_commitment}' for the current epoch.");
}
Ok(())
Expand All @@ -197,14 +198,14 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
transaction: Data<Transaction<N>>,
) -> Result<()> {
// Deserialize the transaction.
let transaction = tokio::task::spawn_blocking(move || transaction.deserialize_blocking()).await??;
let transaction = spawn_blocking!(transaction.deserialize_blocking())?;
// Ensure the transaction ID matches in the transaction.
if transaction_id != transaction.id() {
bail!("Invalid transaction - expected {transaction_id}, found {}", transaction.id());
}
// Check the transaction is well-formed.
// TODO(ljedrz): check if this operation requires a blocking task.
self.ledger.check_transaction_basic(&transaction, None)
let ledger = self.ledger.clone();
spawn_blocking!(ledger.check_transaction_basic(&transaction, None))
}

/// Checks the given block is valid next block.
Expand Down
11 changes: 11 additions & 0 deletions node/bft/ledger-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,14 @@ pub fn fmt_id(id: impl ToString) -> String {
}
formatted_id
}

/// A helper macro to spawn a blocking task.
#[macro_export]
macro_rules! spawn_blocking {
($expr:expr) => {
match tokio::task::spawn_blocking(move || $expr).await {
Ok(value) => value,
Err(error) => Err(snarkvm::prelude::anyhow!("[tokio::spawn_blocking] {error}")),
}
};
}
55 changes: 16 additions & 39 deletions node/bft/src/helpers/ready.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::helpers::Storage;
use snarkvm::{
console::prelude::*,
ledger::{
Expand All @@ -28,16 +27,21 @@ use std::sync::Arc;

#[derive(Clone, Debug)]
pub struct Ready<N: Network> {
/// The storage.
storage: Storage<N>,
/// The current map of `(transmission ID, transmission)` entries.
transmissions: Arc<RwLock<IndexMap<TransmissionID<N>, Transmission<N>>>>,
}

impl<N: Network> Default for Ready<N> {
/// Initializes a new instance of the ready queue.
fn default() -> Self {
Self::new()
}
}

impl<N: Network> Ready<N> {
/// Initializes a new instance of the ready queue.
pub fn new(storage: Storage<N>) -> Self {
Self { storage, transmissions: Default::default() }
pub fn new() -> Self {
Self { transmissions: Default::default() }
}

/// Returns `true` if the ready queue is empty.
Expand Down Expand Up @@ -107,27 +111,14 @@ impl<N: Network> Ready<N> {
/// Returns `true` if the transmission is new, and was added to the ready queue.
pub fn insert(&self, transmission_id: impl Into<TransmissionID<N>>, transmission: Transmission<N>) -> bool {
let transmission_id = transmission_id.into();

// TODO (howardwu): Ensure it does not exist in the ledger. Add a ledger service.
// Determine if the transmission is new.
let is_new = !self.contains(transmission_id) && !self.storage.contains_transmission(transmission_id);
// If the transmission is new, insert it.
if is_new {
// Insert the transmission ID.
self.transmissions.write().insert(transmission_id, transmission);
}
// Insert the transmission ID.
let is_new = self.transmissions.write().insert(transmission_id, transmission).is_none();
// Return whether the transmission is new.
is_new
}

/// Retains the transmissions that satisfy the specified predicate.
pub fn retain(&self, predicate: impl FnMut(&TransmissionID<N>, &mut Transmission<N>) -> bool) {
// Retain the transmissions.
self.transmissions.write().retain(predicate);
}

/// Removes the specified number of transmissions and returns them.
pub fn take(&self, num_transmissions: usize) -> IndexMap<TransmissionID<N>, Transmission<N>> {
/// Removes up to the specified number of transmissions and returns them.
pub fn drain(&self, num_transmissions: usize) -> IndexMap<TransmissionID<N>, Transmission<N>> {
// Acquire the write lock.
let mut transmissions = self.transmissions.write();
// Determine the number of transmissions to drain.
Expand All @@ -140,8 +131,6 @@ impl<N: Network> Ready<N> {
#[cfg(test)]
mod tests {
use super::*;
use crate::helpers::Storage;
use snarkos_node_bft_ledger_service::MockLedgerService;
use snarkvm::ledger::{coinbase::PuzzleCommitment, narwhal::Data};

use ::bytes::Bytes;
Expand All @@ -155,14 +144,8 @@ mod tests {
// Sample random fake bytes.
let data = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.gen::<u8>()).collect::<Vec<_>>()));

// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
// Initialize the ledger.
let ledger = Arc::new(MockLedgerService::new(committee));
// Initialize the storage.
let storage = Storage::<CurrentNetwork>::new(ledger, 1);
// Initialize the ready queue.
let ready = Ready::<CurrentNetwork>::new(storage);
let ready = Ready::<CurrentNetwork>::new();

// Initialize the commitments.
let commitment_1 = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));
Expand Down Expand Up @@ -198,7 +181,7 @@ mod tests {
assert_eq!(ready.get(commitment_unknown), None);

// Drain the ready queue.
let transmissions = ready.take(3);
let transmissions = ready.drain(3);

// Check the number of transmissions.
assert!(ready.is_empty());
Expand All @@ -224,14 +207,8 @@ mod tests {
rng.fill_bytes(&mut vec);
let data = Data::Buffer(Bytes::from(vec));

// Sample a committee.
let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
// Initialize the ledger.
let ledger = Arc::new(MockLedgerService::new(committee));
// Initialize the storage.
let storage = Storage::<CurrentNetwork>::new(ledger, 1);
// Initialize the ready queue.
let ready = Ready::<CurrentNetwork>::new(storage);
let ready = Ready::<CurrentNetwork>::new();

// Initialize the commitments.
let commitment = TransmissionID::Solution(PuzzleCommitment::from_g1_affine(rng.gen()));
Expand Down
37 changes: 36 additions & 1 deletion node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,42 @@ impl<N: Network> Primary<N> {
// Take the transmissions from the workers.
let mut transmissions: IndexMap<_, _> = Default::default();
for worker in self.workers.iter() {
transmissions.extend(worker.take_candidates(num_transmissions_per_worker).await);
for (id, transmission) in worker.drain(num_transmissions_per_worker) {
// Check if the transmission has been stored already.
if self.storage.contains_transmission(id) {
trace!("Proposing - Skipping transmission '{}' - Already in a certificate", fmt_id(id));
continue;
}
// Check if the ledger already contains the transmission.
if self.ledger.contains_transmission(&id).unwrap_or(true) {
trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
continue;
}
// Check the transmission is still valid.
match (id, transmission.clone()) {
(TransmissionID::Solution(solution_id), Transmission::Solution(solution)) => {
// Check if the solution is still valid.
if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
continue;
}
}
(TransmissionID::Transaction(transaction_id), Transmission::Transaction(transaction)) => {
// Check if the transaction is still valid.
if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
continue;
}
}
// Note: We explicitly forbid including ratifications,
// as the protocol currently does not support ratifications.
(TransmissionID::Ratification, Transmission::Ratification) => continue,
// All other combinations are clearly invalid.
_ => continue,
}
// Insert the transmission into the map.
transmissions.insert(id, transmission);
}
}
trace!("Proposing - {} transmissions", transmissions.len());
// Determine if there is at least one unconfirmed transaction to propose.
Expand Down
Loading

0 comments on commit e22cb34

Please sign in to comment.