Skip to content

Commit

Permalink
Revert "Revert "Merge pull request AleoNet#3256 from AleoNet/fix/is-s…
Browse files Browse the repository at this point in the history
…yncing-check""

This reverts commit 223a1b5.
  • Loading branch information
vicsn committed Jun 6, 2024
1 parent 81ca9cf commit 431bbc2
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 11 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
wasm/Cargo.lock
**/build
**.ledger-*
**.current-proposal-cache-*
**.logs-*
validator-*
**.bft-storage-*/
Expand Down
1 change: 1 addition & 0 deletions node/bft/src/helpers/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ impl<N: Network> Storage<N> {
/// Inserts the given `certificate` into storage.
///
/// Note: Do NOT use this in production. This is for **testing only**.
#[cfg(test)]
#[doc(hidden)]
pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
// Retrieve the round.
Expand Down
76 changes: 69 additions & 7 deletions node/bft/src/primary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ impl<N: Network> Primary<N> {
// We use a dummy IP because the node should not need to request from any peers.
// The storage should have stored all the transmissions. If not, we simply
// skip the certificate.
if let Err(err) = self.sync_with_certificate_from_peer(DUMMY_SELF_IP, certificate).await {
if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
{
warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
}
}
Expand Down Expand Up @@ -695,7 +696,7 @@ impl<N: Network> Primary<N> {
}

// If the peer is ahead, use the batch header to sync up to the peer.
let mut transmissions = self.sync_with_batch_header_from_peer(peer_ip, &batch_header).await?;
let mut transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;

// Check that the transmission ids match and are not fee transactions.
if let Err(err) = cfg_iter_mut!(transmissions).try_for_each(|(transmission_id, transmission)| {
Expand Down Expand Up @@ -907,7 +908,7 @@ impl<N: Network> Primary<N> {
}

// Store the certificate, after ensuring it is valid.
self.sync_with_certificate_from_peer(peer_ip, certificate).await?;
self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;

// If there are enough certificates to reach quorum threshold for the certificate round,
// then proceed to advance to the next round.
Expand Down Expand Up @@ -1426,7 +1427,7 @@ impl<N: Network> Primary<N> {
/// - Ensure the previous certificates have reached the quorum threshold.
/// - Ensure we have not already signed the batch ID.
#[async_recursion::async_recursion]
async fn sync_with_certificate_from_peer(
async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
&self,
peer_ip: SocketAddr,
certificate: BatchCertificate<N>,
Expand All @@ -1445,8 +1446,16 @@ impl<N: Network> Primary<N> {
return Ok(());
}

// If node is not in sync mode and the node is not synced. Then return an error.
if !IS_SYNCING && !self.is_synced() {
bail!(
"Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
fmt_id(certificate.id())
);
}

// If the peer is ahead, use the batch header to sync up to the peer.
let missing_transmissions = self.sync_with_batch_header_from_peer(peer_ip, batch_header).await?;
let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;

// Check if the certificate needs to be stored.
if !self.storage.contains_certificate(certificate.id()) {
Expand All @@ -1467,7 +1476,7 @@ impl<N: Network> Primary<N> {
}

/// Recursively syncs using the given batch header.
async fn sync_with_batch_header_from_peer(
async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
&self,
peer_ip: SocketAddr,
batch_header: &BatchHeader<N>,
Expand All @@ -1480,6 +1489,14 @@ impl<N: Network> Primary<N> {
bail!("Round {batch_round} is too far in the past")
}

// If node is not in sync mode and the node is not synced. Then return an error.
if !IS_SYNCING && !self.is_synced() {
bail!(
"Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
fmt_id(batch_header.batch_id())
);
}

// Determine if quorum threshold is reached on the batch round.
let is_quorum_threshold_reached = {
let certificates = self.storage.get_certificates_for_round(batch_round);
Expand Down Expand Up @@ -1515,7 +1532,7 @@ impl<N: Network> Primary<N> {
// Iterate through the missing previous certificates.
for batch_certificate in missing_previous_certificates {
// Store the batch certificate (recursively fetching any missing previous certificates).
self.sync_with_certificate_from_peer(peer_ip, batch_certificate).await?;
self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
}
Ok(missing_transmissions)
}
Expand Down Expand Up @@ -2070,13 +2087,48 @@ mod tests {

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;

// Try to process the batch proposal from the peer, should succeed.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_when_not_synced() {
let mut rng = TestRng::default();
let (primary, accounts) = primary_without_handlers(&mut rng).await;

// Create a valid proposal with an author that isn't the primary.
let round = 1;
let peer_account = &accounts[1];
let peer_ip = peer_account.0;
let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
let proposal = create_test_proposal(
&peer_account.1,
primary.ledger.current_committee().unwrap(),
round,
Default::default(),
timestamp,
&mut rng,
);

// Make sure the primary is aware of the transmissions in the proposal.
for (transmission_id, transmission) in proposal.transmissions() {
primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
}

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());

// Try to process the batch proposal from the peer, should fail.
assert!(
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
);
}

#[tokio::test]
async fn test_batch_propose_from_peer_in_round() {
let round = 2;
Expand Down Expand Up @@ -2106,6 +2158,8 @@ mod tests {

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;

// Try to process the batch proposal from the peer, should succeed.
primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
Expand Down Expand Up @@ -2137,6 +2191,8 @@ mod tests {

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;

// Try to process the batch proposal from the peer, should error.
assert!(
Expand Down Expand Up @@ -2179,6 +2235,8 @@ mod tests {

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;

// Try to process the batch proposal from the peer, should error.
assert!(
Expand Down Expand Up @@ -2221,6 +2279,8 @@ mod tests {

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;

// Try to process the batch proposal from the peer, should error.
assert!(
Expand Down Expand Up @@ -2257,6 +2317,8 @@ mod tests {

// The author must be known to resolver to pass propose checks.
primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
// The primary must be considered synced.
primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;

// Try to process the batch proposal from the peer, should error.
assert!(
Expand Down
7 changes: 7 additions & 0 deletions node/bft/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,13 @@ impl<N: Network> Sync<N> {
pub fn get_block_locators(&self) -> Result<BlockLocators<N>> {
self.block_sync.get_block_locators()
}

/// Returns the block sync module.
#[cfg(test)]
#[doc(hidden)]
pub(super) fn block_sync(&self) -> &BlockSync<N> {
&self.block_sync
}
}

// Methods to assist with fetching batch certificates from peers.
Expand Down
4 changes: 3 additions & 1 deletion node/bft/storage-service/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ impl<N: Network> StorageService<N> for BFTMemoryService<N> {
Entry::Vacant(vacant_entry) => {
// Retrieve the missing transmission.
let Some(transmission) = missing_transmissions.remove(&transmission_id) else {
if !aborted_transmission_ids.contains(&transmission_id) {
if !aborted_transmission_ids.contains(&transmission_id)
&& !self.contains_transmission(transmission_id)
{
error!("Failed to provide a missing transmission {transmission_id}");
}
continue 'outer;
Expand Down
4 changes: 3 additions & 1 deletion node/bft/storage-service/src/persistent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,9 @@ impl<N: Network> StorageService<N> for BFTPersistentStorage<N> {
Ok(None) => {
// Retrieve the missing transmission.
let Some(transmission) = missing_transmissions.remove(&transmission_id) else {
if !aborted_transmission_ids.contains(&transmission_id) {
if !aborted_transmission_ids.contains(&transmission_id)
&& !self.contains_transmission(transmission_id)
{
error!("Failed to provide a missing transmission {transmission_id}");
}
continue 'outer;
Expand Down
7 changes: 5 additions & 2 deletions node/sync/src/block_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,11 @@ impl<N: Network> BlockSync<N> {
// Return the list of block requests.
(self.construct_requests(&sync_peers, min_common_ancestor), sync_peers)
} else {
// Update the state of `is_block_synced` for the sync module.
self.update_is_block_synced(0, MAX_BLOCKS_BEHIND);
// Update `is_block_synced` if there are no pending requests or responses.
if self.requests.read().is_empty() && self.responses.read().is_empty() {
// Update the state of `is_block_synced` for the sync module.
self.update_is_block_synced(0, MAX_BLOCKS_BEHIND);
}
// Return an empty list of block requests.
(Default::default(), Default::default())
}
Expand Down

0 comments on commit 431bbc2

Please sign in to comment.