Skip to content

Commit

Permalink
[consensus] return quorum rounds from round prober (#19703)
Browse files Browse the repository at this point in the history
## Description 

To be used in smart ancestor selection
[PR#19605](#19605)

---

## Release notes

Check each box that your changes affect. If none of the boxes relate to
your changes, release notes aren't required.

For each box you select, include information after the relevant heading
that describes the impact of your changes that a user might notice and
any actions they must take to implement updates.

- [ ] Protocol: 
- [ ] Nodes (Validators and Full nodes): 
- [ ] Indexer: 
- [ ] JSON-RPC: 
- [ ] GraphQL: 
- [ ] CLI: 
- [ ] Rust SDK:
- [ ] REST API:
  • Loading branch information
arun-koshy authored Oct 4, 2024
1 parent 9d23ec4 commit 4c1d514
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 25 deletions.
11 changes: 8 additions & 3 deletions consensus/core/src/authority_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,20 +600,21 @@ async fn make_recv_future<T: Clone>(

#[cfg(test)]
mod tests {
use crate::commit::CommitRange;
use crate::test_dag_builder::DagBuilder;
use crate::{
authority_service::AuthorityService,
block::BlockAPI,
block::{BlockRef, SignedBlock, TestBlock, VerifiedBlock},
commit::CommitRange,
commit_vote_monitor::CommitVoteMonitor,
context::Context,
core_thread::{CoreError, CoreThreadDispatcher},
dag_state::DagState,
error::ConsensusResult,
network::{BlockStream, NetworkClient, NetworkService},
round_prober::QuorumRound,
storage::mem_store::MemStore,
synchronizer::Synchronizer,
test_dag_builder::DagBuilder,
Round,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -665,7 +666,11 @@ mod tests {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}

Expand Down
12 changes: 9 additions & 3 deletions consensus/core/src/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::{
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
leader_schedule::LeaderSchedule,
round_prober::QuorumRound,
stake_aggregator::{QuorumThreshold, StakeAggregator},
threshold_clock::ThresholdClock,
transaction::TransactionConsumer,
Expand Down Expand Up @@ -645,7 +646,12 @@ impl Core {
}

/// Sets the delay by round for propagating blocks to a quorum.
pub(crate) fn set_propagation_delay(&mut self, delay: Round) {
// TODO: Will set the quorum round per authority in ancestor state manager.
pub(crate) fn set_propagation_delay_and_quorum_rounds(
&mut self,
delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) {
info!("Propagation round delay set to: {delay}");
self.propagation_delay = delay;
}
Expand Down Expand Up @@ -1726,7 +1732,7 @@ mod test {
);

// Use a large propagation delay to disable proposing.
core.set_propagation_delay(1000);
core.set_propagation_delay_and_quorum_rounds(1000, vec![]);

// Make propagation delay the only reason for not proposing.
core.set_subscriber_exists(true);
Expand All @@ -1735,7 +1741,7 @@ mod test {
assert!(core.try_propose(true).unwrap().is_none());

// Let Core know there is no propagation delay.
core.set_propagation_delay(0);
core.set_propagation_delay_and_quorum_rounds(0, vec![]);

// Proposing now would succeed.
assert!(core.try_propose(true).unwrap().is_some());
Expand Down
49 changes: 33 additions & 16 deletions consensus/core/src/core_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::{
core_thread::CoreError::Shutdown,
dag_state::DagState,
error::{ConsensusError, ConsensusResult},
round_prober::QuorumRound,
BlockAPI as _,
};

Expand Down Expand Up @@ -65,8 +66,13 @@ pub trait CoreThreadDispatcher: Sync + Send + 'static {
/// It is not a guarantee that produced blocks will be accepted by peers.
fn set_subscriber_exists(&self, exists: bool) -> Result<(), CoreError>;

/// Sets the estimated delay to propagate a block to a quorum of peers, in number of rounds.
fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError>;
/// Sets the estimated delay to propagate a block to a quorum of peers, in
/// number of rounds, and the quorum rounds for all authorities.
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError>;

fn set_last_known_proposed_round(&self, round: Round) -> Result<(), CoreError>;

Expand All @@ -91,7 +97,7 @@ struct CoreThread {
core: Core,
receiver: Receiver<CoreThreadCommand>,
rx_subscriber_exists: watch::Receiver<bool>,
rx_propagation_delay: watch::Receiver<Round>,
rx_propagation_delay_and_quorum_rounds: watch::Receiver<(Round, Vec<QuorumRound>)>,
rx_last_known_proposed_round: watch::Receiver<Round>,
context: Arc<Context>,
}
Expand Down Expand Up @@ -141,11 +147,11 @@ impl CoreThread {
self.core.new_block(Round::MAX, true)?;
}
}
_ = self.rx_propagation_delay.changed() => {
let _scope = monitored_scope("CoreThread::loop::set_propagation_delay");
_ = self.rx_propagation_delay_and_quorum_rounds.changed() => {
let _scope = monitored_scope("CoreThread::loop::set_propagation_delay_and_quorum_rounds");
let should_propose_before = self.core.should_propose();
let delay = *self.rx_propagation_delay.borrow();
self.core.set_propagation_delay(delay);
let (delay, quorum_rounds) = self.rx_propagation_delay_and_quorum_rounds.borrow().clone();
self.core.set_propagation_delay_and_quorum_rounds(delay, quorum_rounds);
if !should_propose_before && self.core.should_propose() {
// If core cannnot propose before but can propose now, try to produce a new block to ensure liveness,
// because block proposal could have been skipped.
Expand All @@ -164,7 +170,7 @@ pub(crate) struct ChannelCoreThreadDispatcher {
context: Arc<Context>,
sender: WeakSender<CoreThreadCommand>,
tx_subscriber_exists: Arc<watch::Sender<bool>>,
tx_propagation_delay: Arc<watch::Sender<Round>>,
tx_propagation_delay_and_quorum_rounds: Arc<watch::Sender<(Round, Vec<QuorumRound>)>>,
tx_last_known_proposed_round: Arc<watch::Sender<Round>>,
highest_received_rounds: Arc<Vec<AtomicU32>>,
}
Expand All @@ -190,16 +196,17 @@ impl ChannelCoreThreadDispatcher {
let (sender, receiver) =
channel("consensus_core_commands", CORE_THREAD_COMMANDS_CHANNEL_SIZE);
let (tx_subscriber_exists, mut rx_subscriber_exists) = watch::channel(false);
let (tx_propagation_delay, mut rx_propagation_delay) = watch::channel(0);
let (tx_propagation_delay_and_quorum_rounds, mut rx_propagation_delay_and_quorum_rounds) =
watch::channel((0, vec![(0, 0); context.committee.size()]));
let (tx_last_known_proposed_round, mut rx_last_known_proposed_round) = watch::channel(0);
rx_subscriber_exists.mark_unchanged();
rx_propagation_delay.mark_unchanged();
rx_propagation_delay_and_quorum_rounds.mark_unchanged();
rx_last_known_proposed_round.mark_unchanged();
let core_thread = CoreThread {
core,
receiver,
rx_subscriber_exists,
rx_propagation_delay,
rx_propagation_delay_and_quorum_rounds,
rx_last_known_proposed_round,
context: context.clone(),
};
Expand All @@ -221,7 +228,9 @@ impl ChannelCoreThreadDispatcher {
context,
sender: sender.downgrade(),
tx_subscriber_exists: Arc::new(tx_subscriber_exists),
tx_propagation_delay: Arc::new(tx_propagation_delay),
tx_propagation_delay_and_quorum_rounds: Arc::new(
tx_propagation_delay_and_quorum_rounds,
),
tx_last_known_proposed_round: Arc::new(tx_last_known_proposed_round),
highest_received_rounds: Arc::new(highest_received_rounds),
};
Expand Down Expand Up @@ -279,9 +288,13 @@ impl CoreThreadDispatcher for ChannelCoreThreadDispatcher {
.map_err(|e| Shutdown(e.to_string()))
}

fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError> {
self.tx_propagation_delay
.send(delay)
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
self.tx_propagation_delay_and_quorum_rounds
.send((delay, quorum_rounds))
.map_err(|e| Shutdown(e.to_string()))
}

Expand Down Expand Up @@ -356,7 +369,11 @@ impl CoreThreadDispatcher for MockCoreThreadDispatcher {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}

Expand Down
7 changes: 6 additions & 1 deletion consensus/core/src/leader_timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ mod tests {
use crate::core::CoreSignals;
use crate::core_thread::{CoreError, CoreThreadDispatcher};
use crate::leader_timeout::LeaderTimeoutTask;
use crate::round_prober::QuorumRound;

#[derive(Clone, Default)]
struct MockCoreThreadDispatcher {
Expand Down Expand Up @@ -171,7 +172,11 @@ mod tests {
todo!()
}

fn set_propagation_delay(&self, _delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
_delay: Round,
_quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
todo!()
}

Expand Down
7 changes: 7 additions & 0 deletions consensus/core/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ pub(crate) struct NodeMetrics {
pub(crate) commit_sync_fetch_once_latency: Histogram,
pub(crate) commit_sync_fetch_once_errors: IntCounterVec,
pub(crate) round_prober_quorum_round_gaps: IntGaugeVec,
pub(crate) round_prober_low_quorum_round: IntGaugeVec,
pub(crate) round_prober_current_round_gaps: IntGaugeVec,
pub(crate) round_prober_propagation_delays: Histogram,
pub(crate) round_prober_last_propagation_delay: IntGauge,
Expand Down Expand Up @@ -611,6 +612,12 @@ impl NodeMetrics {
&["authority"],
registry
).unwrap(),
round_prober_low_quorum_round: register_int_gauge_vec_with_registry!(
"round_prober_low_quorum_round",
"Low quorum round among peers for blocks proposed from each authority",
&["authority"],
registry
).unwrap(),
round_prober_current_round_gaps: register_int_gauge_vec_with_registry!(
"round_prober_current_round_gaps",
"Round gaps from local last proposed round to the low quorum round of each peer. Can be negative.",
Expand Down
33 changes: 31 additions & 2 deletions consensus/core/src/round_prober.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ impl<C: NetworkClient> RoundProber<C> {
.round_prober_quorum_round_gaps
.with_label_values(&[&authority.hostname])
.set((high - low) as i64);
node_metrics
.round_prober_low_quorum_round
.with_label_values(&[&authority.hostname])
.set(*low as i64);
// The gap can be negative if this validator is lagging behind the network.
node_metrics
.round_prober_current_round_gaps
Expand All @@ -231,7 +235,7 @@ impl<C: NetworkClient> RoundProber<C> {
.set(propagation_delay as i64);
if let Err(e) = self
.core_thread_dispatcher
.set_propagation_delay(propagation_delay)
.set_propagation_delay_and_quorum_rounds(propagation_delay, quorum_rounds.clone())
{
tracing::warn!(
"Failed to set propagation delay {propagation_delay} on Core: {:?}",
Expand Down Expand Up @@ -293,6 +297,7 @@ mod test {
use consensus_config::AuthorityIndex;
use parking_lot::{Mutex, RwLock};

use super::QuorumRound;
use crate::{
block::BlockRef,
commit::CommitRange,
Expand All @@ -309,19 +314,25 @@ mod test {
struct FakeThreadDispatcher {
highest_received_rounds: Vec<Round>,
propagation_delay: Mutex<Round>,
quorum_rounds: Mutex<Vec<QuorumRound>>,
}

impl FakeThreadDispatcher {
fn new(highest_received_rounds: Vec<Round>) -> Self {
Self {
highest_received_rounds,
propagation_delay: Mutex::new(0),
quorum_rounds: Mutex::new(Vec::new()),
}
}

fn propagation_delay(&self) -> Round {
*self.propagation_delay.lock()
}

fn quorum_rounds(&self) -> Vec<QuorumRound> {
self.quorum_rounds.lock().clone()
}
}

#[async_trait]
Expand All @@ -345,7 +356,13 @@ mod test {
unimplemented!()
}

fn set_propagation_delay(&self, delay: Round) -> Result<(), CoreError> {
fn set_propagation_delay_and_quorum_rounds(
&self,
delay: Round,
quorum_rounds: Vec<QuorumRound>,
) -> Result<(), CoreError> {
let mut quorum_round_per_authority = self.quorum_rounds.lock();
*quorum_round_per_authority = quorum_rounds;
let mut propagation_delay = self.propagation_delay.lock();
*propagation_delay = delay;
Ok(())
Expand Down Expand Up @@ -492,6 +509,18 @@ mod test {
]
);

assert_eq!(
core_thread_dispatcher.quorum_rounds(),
vec![
(100, 105),
(0, 115),
(103, 130),
(0, 0),
(105, 150),
(106, 160),
(107, 170)
]
);
// 110 - 100 = 10
assert_eq!(propagation_delay, 10);
assert_eq!(core_thread_dispatcher.propagation_delay(), 10);
Expand Down

0 comments on commit 4c1d514

Please sign in to comment.