Skip to content

Commit

Permalink
Merge pull request ProvableHQ#3247 from AleoHQ/feat/store-pending-cer…
Browse files Browse the repository at this point in the history
…tificates

[Optimize] Store pending certificates to proposal cache file
  • Loading branch information
howardwu authored May 9, 2024
2 parents ec2d7fa + 7ed91da commit 9a6ac11
Show file tree
Hide file tree
Showing 6 changed files with 306 additions and 35 deletions.
2 changes: 1 addition & 1 deletion devnet.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ if [[ $clear_ledger == "y" ]]; then

for ((index = 0; index < $((total_validators + total_clients)); index++)); do
# Run 'snarkos clean' for each node in the background
snarkos clean --dev $index &
snarkos clean --network $network_id --dev $index &

# Store the process ID of the background task
clean_processes+=($!)
Expand Down
51 changes: 40 additions & 11 deletions node/bft/src/helpers/proposal_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
use crate::helpers::{Proposal, SignedProposals};

use snarkvm::{
console::{account::Address, network::Network},
prelude::{anyhow, bail, FromBytes, IoResult, Read, Result, ToBytes, Write},
console::{account::Address, network::Network, program::SUBDAG_CERTIFICATES_DEPTH},
ledger::narwhal::BatchCertificate,
prelude::{anyhow, bail, error, FromBytes, IoResult, Read, Result, ToBytes, Write},
};

use aleo_std::{aleo_ledger_dir, StorageMode};
use indexmap::IndexSet;
use std::{fs, path::PathBuf};

// Returns the path where a proposal cache file may be stored.
/// Returns the path where a proposal cache file may be stored.
pub fn proposal_cache_path(network: u16, dev: Option<u16>) -> PathBuf {
const PROPOSAL_CACHE_FILE_NAME: &str = "current-proposal-cache";

Expand All @@ -48,12 +50,19 @@ pub struct ProposalCache<N: Network> {
proposal: Option<Proposal<N>>,
/// The signed proposals this node has received.
signed_proposals: SignedProposals<N>,
/// The pending certificates in storage that have not been included in the ledger.
pending_certificates: IndexSet<BatchCertificate<N>>,
}

impl<N: Network> ProposalCache<N> {
/// Initializes a new instance of the proposal cache.
pub fn new(latest_round: u64, proposal: Option<Proposal<N>>, signed_proposals: SignedProposals<N>) -> Self {
Self { latest_round, proposal, signed_proposals }
pub fn new(
latest_round: u64,
proposal: Option<Proposal<N>>,
signed_proposals: SignedProposals<N>,
pending_certificates: IndexSet<BatchCertificate<N>>,
) -> Self {
Self { latest_round, proposal, signed_proposals, pending_certificates }
}

/// Ensure that the proposal and every signed proposal is associated with the `expected_signer`.
Expand Down Expand Up @@ -110,9 +119,9 @@ impl<N: Network> ProposalCache<N> {
Ok(())
}

/// Returns the latest round, proposal and signed proposals.
pub fn into(self) -> (u64, Option<Proposal<N>>, SignedProposals<N>) {
(self.latest_round, self.proposal, self.signed_proposals)
/// Returns the latest round, proposal, signed proposals, and pending certificates.
pub fn into(self) -> (u64, Option<Proposal<N>>, SignedProposals<N>, IndexSet<BatchCertificate<N>>) {
(self.latest_round, self.proposal, self.signed_proposals, self.pending_certificates)
}
}

Expand All @@ -127,6 +136,12 @@ impl<N: Network> ToBytes for ProposalCache<N> {
}
// Serialize the `signed_proposals`.
self.signed_proposals.write_le(&mut writer)?;
// Write the number of pending certificates.
u32::try_from(self.pending_certificates.len()).map_err(error)?.write_le(&mut writer)?;
// Serialize the pending certificates.
for certificate in &self.pending_certificates {
certificate.write_le(&mut writer)?;
}

Ok(())
}
Expand All @@ -144,15 +159,27 @@ impl<N: Network> FromBytes for ProposalCache<N> {
};
// Deserialize `signed_proposals`.
let signed_proposals = SignedProposals::read_le(&mut reader)?;
// Read the number of pending certificates.
let num_certificates = u32::read_le(&mut reader)?;
// Ensure the number of certificates is within bounds.
if num_certificates > 2u32.saturating_pow(SUBDAG_CERTIFICATES_DEPTH as u32) {
return Err(error(format!(
"Number of certificates ({num_certificates}) exceeds the maximum ({})",
2u32.saturating_pow(SUBDAG_CERTIFICATES_DEPTH as u32)
)));
};
// Deserialize the pending certificates.
let pending_certificates =
(0..num_certificates).map(|_| BatchCertificate::read_le(&mut reader)).collect::<IoResult<IndexSet<_>>>()?;

Ok(Self::new(latest_round, proposal, signed_proposals))
Ok(Self::new(latest_round, proposal, signed_proposals, pending_certificates))
}
}

impl<N: Network> Default for ProposalCache<N> {
/// Initializes a new instance of the proposal cache.
fn default() -> Self {
Self::new(0, None, Default::default())
Self::new(0, None, Default::default(), Default::default())
}
}

Expand All @@ -162,6 +189,7 @@ mod tests {
use crate::helpers::{proposal::tests::sample_proposal, signed_proposals::tests::sample_signed_proposals};
use snarkvm::{
console::{account::PrivateKey, network::MainnetV0},
ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificates,
utilities::TestRng,
};

Expand All @@ -176,8 +204,9 @@ mod tests {
let proposal = sample_proposal(rng);
let signed_proposals = sample_signed_proposals(signer, rng);
let round = proposal.round();
let pending_certificates = sample_batch_certificates(rng);

ProposalCache::new(round, Some(proposal), signed_proposals)
ProposalCache::new(round, Some(proposal), signed_proposals, pending_certificates)
}

#[test]
Expand Down
29 changes: 29 additions & 0 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,35 @@ impl<N: Network> Storage<N> {
}
}

/// Returns the certificates that have not yet been included in the ledger.
/// Note that the order of this set is by round and then insertion.
pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
let mut pending_certificates = IndexSet::new();

// Obtain the read locks.
let rounds = self.rounds.read();
let certificates = self.certificates.read();

// Iterate over the rounds.
for (_, certificates_for_round) in rounds.clone().sorted_by(|a, _, b, _| a.cmp(b)) {
// Iterate over the certificates for the round.
for (certificate_id, _, _) in certificates_for_round {
// Skip the certificate if it already exists in the ledger.
if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
continue;
}

// Add the certificate to the pending certificates.
match certificates.get(&certificate_id).cloned() {
Some(certificate) => pending_certificates.insert(certificate),
None => continue,
};
}
}

pending_certificates
}

/// Checks the given `batch_header` for validity, returning the missing transmissions from storage.
///
/// This method ensures the following invariants:
Expand Down
70 changes: 51 additions & 19 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use crate::{
use snarkos_account::Account;
use snarkos_node_bft_events::PrimaryPing;
use snarkos_node_bft_ledger_service::LedgerService;
use snarkos_node_sync::DUMMY_SELF_IP;
use snarkvm::{
console::{
prelude::*,
Expand Down Expand Up @@ -120,19 +121,6 @@ impl<N: Network> Primary<N> {
// Initialize the sync module.
let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());

// Fetch the signed proposals from the file system if it exists.
let proposal_cache = match ProposalCache::<N>::exists(dev) {
true => match ProposalCache::<N>::load(gateway.account().address(), dev) {
Ok(proposal) => proposal,
Err(err) => {
bail!("Failed to read the signed proposals from the file system - {err}.");
}
},
false => ProposalCache::default(),
};
// Extract the proposal and signed proposals.
let (latest_certificate_round, proposed_batch, signed_proposals) = proposal_cache.into();

// Initialize the primary instance.
Ok(Self {
sync,
Expand All @@ -141,14 +129,53 @@ impl<N: Network> Primary<N> {
ledger,
workers: Arc::from(vec![]),
bft_sender: Default::default(),
proposed_batch: Arc::new(RwLock::new(proposed_batch)),
proposed_batch: Default::default(),
latest_proposed_batch_timestamp: Default::default(),
signed_proposals: Arc::new(RwLock::new(signed_proposals)),
signed_proposals: Default::default(),
handles: Default::default(),
propose_lock: Arc::new(TMutex::new(latest_certificate_round)),
propose_lock: Default::default(),
})
}

/// Load the proposal cache file and update the Primary state with the stored data.
async fn load_proposal_cache(&self) -> Result<()> {
// Fetch the signed proposals from the file system if it exists.
match ProposalCache::<N>::exists(self.gateway.dev()) {
// If the proposal cache exists, then process the proposal cache.
true => match ProposalCache::<N>::load(self.gateway.account().address(), self.gateway.dev()) {
Ok(proposal_cache) => {
// Extract the proposal and signed proposals.
let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
proposal_cache.into();

// Write the proposed batch.
*self.proposed_batch.write() = proposed_batch;
// Write the signed proposals.
*self.signed_proposals.write() = signed_proposals;
// Writ the propose lock.
*self.propose_lock.lock().await = latest_certificate_round;

// Update the storage with the pending certificates.
for certificate in pending_certificates {
let batch_id = certificate.batch_id();
// We use a dummy IP because the node should not need to request from any peers.
// The storage should have stored all the transmissions. If not, we simply
// skip the certificate.
if let Err(err) = self.sync_with_certificate_from_peer(DUMMY_SELF_IP, certificate).await {
warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
}
}
Ok(())
}
Err(err) => {
bail!("Failed to read the signed proposals from the file system - {err}.");
}
},
// If the proposal cache does not exist, then return early.
false => Ok(()),
}
}

/// Run the primary instance.
pub async fn run(
&mut self,
Expand Down Expand Up @@ -192,8 +219,12 @@ impl<N: Network> Primary<N> {

// First, initialize the sync channels.
let (sync_sender, sync_receiver) = init_sync_channels();
// Next, initialize the sync module.
self.sync.run(bft_sender, sync_receiver).await?;
// Next, initialize the sync module and sync the storage from ledger.
self.sync.initialize(bft_sender).await?;
// Next, load and process the proposal cache before running the sync module.
self.load_proposal_cache().await?;
// Next, run the sync module.
self.sync.run(sync_receiver).await?;
// Next, initialize the gateway.
self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
// Lastly, start the primary handlers.
Expand Down Expand Up @@ -1627,7 +1658,8 @@ impl<N: Network> Primary<N> {
let proposal = self.proposed_batch.write().take();
let signed_proposals = self.signed_proposals.read().clone();
let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
ProposalCache::new(latest_round, proposal, signed_proposals)
let pending_certificates = self.storage.get_pending_certificates();
ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
};
if let Err(err) = proposal_cache.store(self.gateway.dev()) {
error!("Failed to store the current proposal cache: {err}");
Expand Down
Loading

0 comments on commit 9a6ac11

Please sign in to comment.