Skip to content

Commit

Permalink
[Consensus] resend last block to peer when there is no new proposal (#…
Browse files Browse the repository at this point in the history
…16753)

## Description 

Blocks get acknowledged before persistence, so it is useful to resend
the latest block in case the peer restarted.

## Test Plan 

CI

---
If your changes are not user-facing and do not break anything, you can
skip the following section. Otherwise, please briefly describe what has
changed under the Release Notes section.

### Type of Change (Check all that apply)

- [ ] protocol change
- [ ] user-visible impact
- [ ] breaking change for a client SDKs
- [ ] breaking change for FNs (FN binary must upgrade)
- [ ] breaking change for validators or node operators (must upgrade
binaries)
- [ ] breaking change for on-chain data layout
- [ ] necessitate either a data wipe or data migration

### Release notes
  • Loading branch information
mwtian authored Mar 20, 2024
1 parent 597f3e7 commit 2b8ba9b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 13 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ mime = "0.3"
mockall = "0.11.4"
moka = { version = "0.12", default-features = false, features = ["sync", "atomic64"] }
more-asserts = "0.3.1"
msim = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "4d9040e5c6bb264bfe14900d09a5da4000154da8", package = "msim" }
msim-macros = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "4d9040e5c6bb264bfe14900d09a5da4000154da8", package = "msim-macros" }
msim = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "6f88ec84644cb1a6809c010f1f534d0d09e0cd89", package = "msim" }
msim-macros = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "6f88ec84644cb1a6809c010f1f534d0d09e0cd89", package = "msim-macros" }
multiaddr = "0.17.0"
nexlint = { git = "https://github.com/nextest-rs/nexlint.git", rev = "94da5c787636dad779c340affa65219134d127f5" }
nexlint-lints = { git = "https://github.com/nextest-rs/nexlint.git", rev = "94da5c787636dad779c340affa65219134d127f5" }
Expand Down
65 changes: 60 additions & 5 deletions consensus/core/src/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use tokio::{
use tracing::{trace, warn};

use crate::{
block::VerifiedBlock, context::Context, core::CoreSignalsReceivers, error::ConsensusResult,
block::{BlockAPI as _, VerifiedBlock},
context::Context,
core::CoreSignalsReceivers,
error::ConsensusResult,
network::NetworkClient,
};

Expand All @@ -34,6 +37,8 @@ pub(crate) struct Broadcaster {
}

impl Broadcaster {
const LAST_BLOCK_RETRY_INTERVAL: Duration = Duration::from_secs(2);

pub(crate) fn new<C: NetworkClient>(
context: Arc<Context>,
network_client: Arc<C>,
Expand Down Expand Up @@ -73,6 +78,16 @@ impl Broadcaster {
) {
let peer_hostname = context.committee.authority(peer).hostname.clone();

// Record the last block to be broadcasted, to retry in case no new block is produced for awhile.
// Even if the peer has acknowledged the last block, the block might have been dropped afterwards
// if the peer crashed.
let mut last_block: Option<VerifiedBlock> = None;

// Retry last block with an interval.
let mut retry_timer = tokio::time::interval(Self::LAST_BLOCK_RETRY_INTERVAL);
retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
retry_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

// Use a simple exponential-decay RTT estimator to adjust the timeout for each block sent.
// The estimation logic will be removed once the underlying transport switches to use
// streaming and the streaming implementation can be relied upon for retries.
Expand Down Expand Up @@ -118,13 +133,21 @@ impl Broadcaster {
continue;
}
};
requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
requests.push(send_block(network_client.clone(), peer, rtt_estimate, block.clone()));
if last_block.is_none() || last_block.as_ref().unwrap().round() < block.round() {
last_block = Some(block);
}
}

Some((resp, start, block)) = requests.next() => {
match resp {
Ok(Ok(_)) => {
let now = Instant::now();
rtt_estimate = rtt_estimate.mul_f64(RTT_ESTIMATE_DECAY) + (now - start).mul_f64(1.0 - RTT_ESTIMATE_DECAY);
// Avoid immediately retrying a successfully sent block.
// Resetting timer is unnecessary otherwise because there are
// additional inflight requests.
retry_timer.reset_after(Self::LAST_BLOCK_RETRY_INTERVAL);
},
Err(Elapsed { .. }) => {
rtt_estimate = rtt_estimate.mul_f64(TIMEOUT_RTT_INCREMENT_FACTOR);
Expand All @@ -135,7 +158,16 @@ impl Broadcaster {
},
};
}

_ = retry_timer.tick() => {
if requests.is_empty() {
if let Some(block) = last_block.clone() {
requests.push(send_block(network_client.clone(), peer, rtt_estimate, block));
}
}
}
};

// Limit RTT estimate to be between 5ms and 5s.
rtt_estimate = min(rtt_estimate, Duration::from_secs(5));
rtt_estimate = max(rtt_estimate, Duration::from_millis(5));
Expand All @@ -151,7 +183,7 @@ impl Broadcaster {

#[cfg(test)]
mod test {
use std::{collections::BTreeMap, time::Duration};
use std::{collections::BTreeMap, ops::DerefMut, time::Duration};

use async_trait::async_trait;
use bytes::Bytes;
Expand All @@ -176,7 +208,10 @@ mod test {
}

fn blocks_sent(&self) -> BTreeMap<AuthorityIndex, Vec<Bytes>> {
self.blocks_sent.lock().clone()
let mut blocks_sent = self.blocks_sent.lock();
let result = std::mem::take(blocks_sent.deref_mut());
blocks_sent.clear();
result
}
}

Expand Down Expand Up @@ -217,8 +252,28 @@ mod test {
"No subscriber active to receive the block"
);

sleep(Duration::from_secs(1)).await;
// block should be broadcasted immediately to all peers.
sleep(Duration::from_millis(1)).await;
let blocks_sent = network_client.blocks_sent();
for (index, _) in context.committee.authorities() {
if index == context.own_index {
continue;
}
assert_eq!(blocks_sent.get(&index).unwrap(), &vec![block.serialized()]);
}

// block should not be re-broadcasted ...
sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
let blocks_sent = network_client.blocks_sent();
for (index, _) in context.committee.authorities() {
if index == context.own_index {
continue;
}
assert!(blocks_sent.get(&index).is_none());
}

// ... until LAST_BLOCK_RETRY_INTERVAL
sleep(Broadcaster::LAST_BLOCK_RETRY_INTERVAL / 2).await;
let blocks_sent = network_client.blocks_sent();
for (index, _) in context.committee.authorities() {
if index == context.own_index {
Expand Down
4 changes: 2 additions & 2 deletions scripts/simtest/cargo-simtest
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ if [ -n "$LOCAL_MSIM_PATH" ]; then
else
cargo_patch_args=(
--config 'patch.crates-io.tokio.git = "https://github.com/MystenLabs/mysten-sim.git"'
--config 'patch.crates-io.tokio.rev = "4d9040e5c6bb264bfe14900d09a5da4000154da8"'
--config 'patch.crates-io.tokio.rev = "6f88ec84644cb1a6809c010f1f534d0d09e0cd89"'
--config 'patch.crates-io.futures-timer.git = "https://github.com/MystenLabs/mysten-sim.git"'
--config 'patch.crates-io.futures-timer.rev = "4d9040e5c6bb264bfe14900d09a5da4000154da8"'
--config 'patch.crates-io.futures-timer.rev = "6f88ec84644cb1a6809c010f1f534d0d09e0cd89"'
)
fi

Expand Down
4 changes: 2 additions & 2 deletions scripts/simtest/config-patch
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ index 3b1ca4591c..e4ff4d61d2 100644
include_dir = "0.7.3"
+
+[patch.crates-io]
+tokio = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "4d9040e5c6bb264bfe14900d09a5da4000154da8" }
+futures-timer = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "4d9040e5c6bb264bfe14900d09a5da4000154da8" }
+tokio = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "6f88ec84644cb1a6809c010f1f534d0d09e0cd89" }
+futures-timer = { git = "https://github.com/MystenLabs/mysten-sim.git", rev = "6f88ec84644cb1a6809c010f1f534d0d09e0cd89" }

0 comments on commit 2b8ba9b

Please sign in to comment.