Skip to content

Commit

Permalink
Merge pull request AleoNet#2550 from ljedrz/narwhal_tweaks
Browse files Browse the repository at this point in the history
[Narwhal] Another round of perf tweaks
  • Loading branch information
howardwu authored Jul 18, 2023
2 parents 1bf3695 + f129a4e commit a5952b1
Show file tree
Hide file tree
Showing 9 changed files with 40 additions and 34 deletions.
2 changes: 1 addition & 1 deletion node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl<N: Network, C: ConsensusStorage<N>> Consensus<N, C> {
// Initialize the Narwhal storage.
let storage = NarwhalStorage::new(committee, MAX_GC_ROUNDS);
// Initialize the ledger service.
let ledger_service = Box::new(CoreLedgerService::<N, C>::new(ledger.clone()));
let ledger_service = Arc::new(CoreLedgerService::<N, C>::new(ledger.clone()));
// Initialize the BFT.
let bft = BFT::new(account, storage, ledger_service, None, dev)?;
// Return the consensus.
Expand Down
4 changes: 2 additions & 2 deletions node/narwhal/examples/simple_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ pub async fn start_bft(
// Initialize the components.
let (storage, account) = initialize_components(node_id, num_nodes)?;
// Initialize the mock ledger service.
let ledger = Box::new(MockLedgerService::new());
let ledger = Arc::new(MockLedgerService::new());
// Initialize the gateway IP and dev mode.
let (ip, dev) = match peers.get(&node_id) {
Some(ip) => (Some(*ip), None),
Expand Down Expand Up @@ -139,7 +139,7 @@ pub async fn start_primary(
// Initialize the components.
let (storage, account) = initialize_components(node_id, num_nodes)?;
// Initialize the mock ledger service.
let ledger = Box::new(MockLedgerService::new());
let ledger = Arc::new(MockLedgerService::new());
// Initialize the gateway IP and dev mode.
let (ip, dev) = match peers.get(&node_id) {
Some(ip) => (Some(*ip), None),
Expand Down
2 changes: 2 additions & 0 deletions node/narwhal/ledger-service/src/ledger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
let proof_target = self.ledger.latest_proof_target();

// Ensure that the prover solution is valid for the given epoch.
// TODO(ljedrz): check if this operation requires a blocking task.
if !solution.verify(coinbase_verifying_key, &epoch_challenge, proof_target)? {
bail!("Invalid prover solution '{puzzle_commitment}' for the current epoch.");
}
Expand All @@ -96,6 +97,7 @@ impl<N: Network, C: ConsensusStorage<N>> LedgerService<N> for CoreLedgerService<
bail!("Invalid transaction - expected {transaction_id}, found {}", transaction.id());
}
// Check the transaction is well-formed.
// TODO(ljedrz): check if this operation requires a blocking task.
self.ledger.check_transaction_basic(&transaction, None)
}
}
7 changes: 4 additions & 3 deletions node/narwhal/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,8 @@ impl<N: Network> Gateway<N> {
}

/// Broadcasts the given event to all connected peers.
// TODO(ljedrz): the event should be checked for the presence of Data::Object, and
// serialized in advance if it's there.
pub(crate) fn broadcast(&self, event: Event<N>) {
// Ensure there are connected peers.
if self.number_of_connected_peers() > 0 {
Expand Down Expand Up @@ -988,10 +990,9 @@ pub mod prop_tests {
// Construct the worker channels.
let (tx_worker, rx_worker) = init_worker_channels();
// Construct the worker instance.
let ledger = Box::new(MockLedgerService::new());
let ledger = Arc::new(MockLedgerService::new());
let worker =
Worker::new(id, gateway.clone(), worker_storage.clone(), Arc::new(ledger), Default::default())
.unwrap();
Worker::new(id, gateway.clone(), worker_storage.clone(), ledger, Default::default()).unwrap();
// Run the worker instance.
worker.run(rx_worker);

Expand Down
26 changes: 14 additions & 12 deletions node/narwhal/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use snarkvm::{
prelude::{bail, ensure, Address, Field, Network, Result},
};

use indexmap::{indexmap, IndexMap, IndexSet};
use indexmap::{indexmap, map::Entry, IndexMap, IndexSet};
use parking_lot::RwLock;
use std::{
collections::{HashMap, HashSet},
Expand Down Expand Up @@ -249,7 +249,7 @@ impl<N: Network> Storage<N> {
/// If the batch ID does not exist in storage, `None` is returned.
pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
// Get the round.
self.batch_ids.read().get(&batch_id).cloned()
self.batch_ids.read().get(&batch_id).copied()
}

/// Returns the certificate round for the given `certificate ID`.
Expand Down Expand Up @@ -567,16 +567,18 @@ impl<N: Network> Storage<N> {
// If this is the last certificate ID for the transmission ID, remove the transmission.
for transmission_id in certificate.transmission_ids() {
// Remove the certificate ID for the transmission ID, and determine if there are any more certificate IDs.
let is_empty = transmissions.get_mut(transmission_id).map_or(false, |(_, certificate_ids)| {
// Remove the certificate ID for the transmission ID.
certificate_ids.remove(&certificate_id);
// Determine if there are any more certificate IDs for the transmission ID.
certificate_ids.is_empty()
});
// If there are no more certificate IDs for the transmission ID, remove the transmission.
if is_empty {
// Remove the entry for the transmission ID.
transmissions.remove(transmission_id);
match transmissions.entry(*transmission_id) {
Entry::Occupied(mut occupied_entry) => {
let (_, certificate_ids) = occupied_entry.get_mut();
// Remove the certificate ID for the transmission ID.
certificate_ids.remove(&certificate_id);
// If there are no more certificate IDs for the transmission ID, remove the transmission.
if certificate_ids.is_empty() {
// Remove the entry for the transmission ID.
occupied_entry.remove();
}
}
Entry::Vacant(_) => {}
}
}
// Return successfully.
Expand Down
17 changes: 9 additions & 8 deletions node/narwhal/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use tokio::{
/// A helper type for an optional proposed batch.
pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
/// A helper type for the ledger service.
pub type Ledger<N> = Box<dyn LedgerService<N>>;
pub type Ledger<N> = Arc<dyn LedgerService<N>>;

#[derive(Clone)]
pub struct Primary<N: Network> {
Expand All @@ -74,9 +74,9 @@ pub struct Primary<N: Network> {
/// The storage.
storage: Storage<N>,
/// The ledger service.
ledger: Arc<Ledger<N>>,
ledger: Ledger<N>,
/// The workers.
workers: Arc<Vec<Worker<N>>>,
workers: Arc<[Worker<N>]>,
/// The BFT sender.
bft_sender: Arc<OnceCell<BFTSender<N>>>,
/// The batch proposal, if the primary is currently proposing a batch.
Expand All @@ -99,8 +99,8 @@ impl<N: Network> Primary<N> {
Ok(Self {
gateway: Gateway::new(account, storage.clone(), ip, dev)?,
storage,
ledger: Arc::from(ledger),
workers: Default::default(),
ledger,
workers: Arc::from(vec![]),
bft_sender: Default::default(),
proposed_batch: Default::default(),
pending: Default::default(),
Expand Down Expand Up @@ -148,7 +148,7 @@ impl<N: Network> Primary<N> {
tx_workers.insert(id, tx_worker);
}
// Set the workers.
self.workers = Arc::new(workers);
self.workers = Arc::from(workers);

// Initialize the gateway.
self.gateway.run(tx_workers).await;
Expand Down Expand Up @@ -190,12 +190,12 @@ impl<N: Network> Primary<N> {
}

/// Returns the workers.
pub const fn workers(&self) -> &Arc<Vec<Worker<N>>> {
pub const fn workers(&self) -> &Arc<[Worker<N>]> {
&self.workers
}

/// Returns the batch proposal of our primary, if one currently exists.
pub fn proposed_batch(&self) -> &Arc<RwLock<Option<Proposal<N>>>> {
pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
&self.proposed_batch
}
}
Expand Down Expand Up @@ -259,6 +259,7 @@ impl<N: Network> Primary<N> {
// rebroadcast the batch header to the non-signers, and return early.
if let Some(proposal) = self.proposed_batch.read().as_ref() {
// Construct the event.
// TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
let event = Event::BatchPropose(proposal.batch_header().clone().into());
// Iterate through the non-signers.
for address in proposal.nonsigners() {
Expand Down
12 changes: 6 additions & 6 deletions node/narwhal/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ pub struct Worker<N: Network> {
/// The storage.
storage: Storage<N>,
/// The ledger service.
ledger: Arc<Ledger<N>>,
ledger: Ledger<N>,
/// The proposed batch.
proposed_batch: Arc<ProposedBatch<N>>,
/// The ready queue.
Expand All @@ -64,7 +64,7 @@ impl<N: Network> Worker<N> {
id: u8,
gateway: Gateway<N>,
storage: Storage<N>,
ledger: Arc<Ledger<N>>,
ledger: Ledger<N>,
proposed_batch: Arc<ProposedBatch<N>>,
) -> Result<Self> {
// Ensure the worker ID is valid.
Expand Down Expand Up @@ -465,8 +465,8 @@ mod prop_tests {
gateway: Gateway<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
) {
let ledger: Ledger<CurrentNetwork> = Box::new(MockLedgerService::new());
let worker = Worker::new(id, gateway, storage, Arc::new(ledger), Default::default()).unwrap();
let ledger: Ledger<CurrentNetwork> = Arc::new(MockLedgerService::new());
let worker = Worker::new(id, gateway, storage, ledger, Default::default()).unwrap();
assert_eq!(worker.id(), id);
}

Expand All @@ -476,8 +476,8 @@ mod prop_tests {
gateway: Gateway<CurrentNetwork>,
storage: Storage<CurrentNetwork>,
) {
let ledger: Ledger<CurrentNetwork> = Box::new(MockLedgerService::new());
let worker = Worker::new(id, gateway, storage, Arc::new(ledger), Default::default());
let ledger: Ledger<CurrentNetwork> = Arc::new(MockLedgerService::new());
let worker = Worker::new(id, gateway, storage, ledger, Default::default());
// TODO once Worker implements Debug, simplify this with `unwrap_err`
if let Err(error) = worker {
assert_eq!(error.to_string(), format!("Invalid worker ID '{}'", id));
Expand Down
2 changes: 1 addition & 1 deletion node/narwhal/tests/common/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl TestNetwork {
let mut validators = HashMap::with_capacity(config.num_nodes as usize);
for (id, account) in accounts.into_iter().enumerate() {
let storage = Storage::new(committee.clone(), MAX_GC_ROUNDS);
let ledger = Box::new(MockLedgerService::new());
let ledger = Arc::new(MockLedgerService::new());
let primary = Primary::<CurrentNetwork>::new(account, storage, ledger, None, Some(id as u16)).unwrap();

let test_validator = TestValidator { id: id as u16, primary, sender: None, handles: Default::default() };
Expand Down
2 changes: 1 addition & 1 deletion node/narwhal/tests/e2e.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn test_state_coherence() {
fire_cannons: true,

// Set this to Some(0..=4) to see the logs.
log_level: None,
log_level: Some(0),
log_connections: true,
});

Expand Down

0 comments on commit a5952b1

Please sign in to comment.