Skip to content

Commit

Permalink
[decoupled-execution] Unhappy Path - Retry broadcasting messages
Browse files Browse the repository at this point in the history
  • Loading branch information
Yu Xia authored and bors-libra committed Jul 22, 2021
1 parent 44f91c9 commit c23b768
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 17 deletions.
6 changes: 3 additions & 3 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub struct ConsensusConfig {
pub mempool_poll_count: u64,
// global switch for the decoupling execution feature
// only when decoupled is true, the execution and committing will be pipelined in different phases
pub decoupled: bool,
pub decoupled_execution: bool,
pub channel_size: usize,
pub back_pressure_limit: u64,
}
Expand All @@ -47,8 +47,8 @@ impl Default for ConsensusConfig {
safety_rules: SafetyRulesConfig::default(),
sync_only: false,
mempool_poll_count: 1,
decoupled: false, // by default, we turn of the decoupling execution feature
channel_size: 30, // hard-coded
decoupled_execution: false, // by default, we turn of the decoupling execution feature
channel_size: 30, // hard-coded
back_pressure_limit: 1,
}
}
Expand Down
6 changes: 5 additions & 1 deletion consensus/consensus-types/src/experimental/commit_vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::common::{Author, Round};
use anyhow::Context;
use diem_crypto::ed25519::Ed25519Signature;
use diem_types::{
ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
block_info::BlockInfo, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
validator_verifier::ValidatorVerifier,
};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -91,4 +91,8 @@ impl CommitVote {
.verify(self.author(), &self.ledger_info, &self.signature)
.context("Failed to verify Commit Proposal")
}

pub fn commit_info(&self) -> &BlockInfo {
self.ledger_info().commit_info()
}
}
10 changes: 10 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,13 @@ pub static DECOUPLED_EXECUTION__COMMIT_MESSAGE_CHANNEL: Lazy<IntGauge> = Lazy::n
)
.unwrap()
});

/// Counter for the decoupling execution channel of commit messages
/// from commit phase to itself when a timeout triggers
pub static DECOUPLED_EXECUTION__COMMIT_MESSAGE_TIMEOUT_CHANNEL: Lazy<IntGauge> = Lazy::new(|| {
register_int_gauge!(
"decoupled_execution__commit_message_timeout_channel",
"Number of pending commit phase message timeouts (CommitVote/CommitDecision)"
)
.unwrap()
});
4 changes: 2 additions & 2 deletions consensus/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl EpochManager {
let author = node_config.validator_network.as_ref().unwrap().peer_id();
let config = node_config.consensus.clone();
let sr_config = &node_config.consensus.safety_rules;
if sr_config.decoupled_execution != config.decoupled {
if sr_config.decoupled_execution != config.decoupled_execution {
panic!("Inconsistent decoupled-execution configuration of consensus and safety-rules\nMake sure consensus.decoupled = safety_rules.decoupled_execution.")
}
let safety_rules_manager = SafetyRulesManager::new(sr_config);
Expand Down Expand Up @@ -425,7 +425,7 @@ impl EpochManager {

let safety_rules_container = Arc::new(Mutex::new(safety_rules));

let mut processor = if self.config.decoupled {
let mut processor = if self.config.decoupled_execution {
let (round_manager, execution_phase, commit_phase) = self.prepare_decoupled_execution(
epoch,
recovery_data,
Expand Down
57 changes: 46 additions & 11 deletions consensus/src/experimental/commit_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
network::NetworkSender, network_interface::ConsensusMsg, round_manager::VerifiedEvent,
state_replication::StateComputer,
};
use channel::Receiver;
use channel::{Receiver, Sender};
use consensus_types::{
common::Author,
executed_block::ExecutedBlock,
Expand All @@ -24,12 +24,13 @@ use diem_types::{
validator_verifier::{ValidatorVerifier, VerifyError},
};
use executor_types::Error as ExecutionError;
use futures::{select, StreamExt};
use futures::{select, SinkExt, StreamExt};
use safety_rules::TSafetyRules;
use std::{
collections::BTreeMap,
sync::{atomic::AtomicU64, Arc},
};
use tokio::time;

/*
Commit phase takes in the executed blocks from the execution
Expand All @@ -42,6 +43,8 @@ decision message helps the slower nodes to quickly catch up without
having to collect the signatures.
*/

const COMMIT_PHASE_TIMEOUT_SEC: u64 = 1; // retry timeout in seconds

#[derive(Clone)]
pub struct PendingBlocks {
blocks: Vec<ExecutedBlock>,
Expand Down Expand Up @@ -103,6 +106,8 @@ pub struct CommitPhase {
author: Author,
back_pressure: Arc<AtomicU64>,
network_sender: NetworkSender,
timeout_event_tx: Sender<CommitVote>,
timeout_event_rx: Receiver<CommitVote>,
}

/// Wrapper for ExecutionProxy.commit
Expand Down Expand Up @@ -131,6 +136,22 @@ macro_rules! report_err {
};
}

/// shortcut for sendng a message with a timeout retry event
async fn broadcast_commit_vote_with_retry(
mut network_sender: NetworkSender,
cv: CommitVote,
mut notification: Sender<CommitVote>,
) {
network_sender
.broadcast(ConsensusMsg::CommitVoteMsg(Box::new(cv.clone())))
.await;
time::sleep(time::Duration::from_secs(COMMIT_PHASE_TIMEOUT_SEC)).await;
report_err!(
notification.send(cv).await,
"Error in sending timeout events"
)
}

impl CommitPhase {
pub fn new(
commit_channel_recv: Receiver<(Vec<ExecutedBlock>, LedgerInfoWithSignatures)>,
Expand All @@ -142,6 +163,10 @@ impl CommitPhase {
back_pressure: Arc<AtomicU64>,
network_sender: NetworkSender,
) -> Self {
let (timeout_event_tx, timeout_event_rx) = channel::new::<CommitVote>(
2,
&counters::DECOUPLED_EXECUTION__COMMIT_MESSAGE_TIMEOUT_CHANNEL,
);
Self {
commit_channel_recv,
execution_proxy,
Expand All @@ -152,6 +177,8 @@ impl CommitPhase {
author,
back_pressure,
network_sender,
timeout_event_tx,
timeout_event_rx,
}
}

Expand Down Expand Up @@ -191,7 +218,7 @@ impl CommitPhase {
let commit_ledger_info = commit_decision.ledger_info();

// if the block infos do not match
if commit_ledger_info.ledger_info().commit_info() != pending_blocks.block_info() {
if commit_ledger_info.commit_info() != pending_blocks.block_info() {
return Err(Error::InconsistentBlockInfo(
commit_ledger_info.ledger_info().commit_info().clone(),
pending_blocks.block_info().clone(),
Expand Down Expand Up @@ -254,12 +281,8 @@ impl CommitPhase {
.lock()
.sign_commit_vote(ordered_ledger_info, commit_ledger_info.clone())?;

// if fails, it needs to resend, otherwise the liveness might compromise.
let msg = ConsensusMsg::CommitVoteMsg(Box::new(CommitVote::new_with_signature(
self.author,
commit_ledger_info.clone(),
signature,
)));
let commit_vote =
CommitVote::new_with_signature(self.author, commit_ledger_info.clone(), signature);

let commit_ledger_info_with_sig = LedgerInfoWithSignatures::new(
commit_ledger_info,
Expand All @@ -275,7 +298,12 @@ impl CommitPhase {

// asynchronously broadcast the message.
// note that this message will also reach the node itself
self.network_sender.broadcast(msg).await;
// if the message delivery fails, it needs to resend the message, or otherwise the liveness might compromise.
tokio::spawn(broadcast_commit_vote_with_retry(
self.network_sender.clone(),
commit_vote,
self.timeout_event_tx.clone(),
));

Ok(())
}
Expand Down Expand Up @@ -317,7 +345,14 @@ impl CommitPhase {
}
};
}
// TODO: add a timer to repeat sending commit votes in later PR
retry_cv = self.timeout_event_rx.select_next_some() => {
if let Some(ref pending_blocks) = self.blocks {
if pending_blocks.block_info() == retry_cv.commit_info() {
// retry broadcasting the message if the blocks are still pending
tokio::spawn(broadcast_commit_vote_with_retry(self.network_sender.clone(), retry_cv, self.timeout_event_tx.clone()));
}
}
}
complete => break,
}
report_err!(
Expand Down
42 changes: 42 additions & 0 deletions consensus/src/experimental/tests/commit_phase_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,48 @@ mod commit_phase_e2e_tests {
});
}

/// commit message retry test
#[test]
fn test_retry() {
let mut runtime = consensus_runtime();
let (
mut commit_tx,
_msg_tx,
_commit_result_rx,
mut self_loop_rx,
_safety_rules_container,
signers,
_state_computer,
_validator,
commit_phase,
) = prepare_commit_phase();

runtime.spawn(commit_phase.start());

timed_block_on(&mut runtime, async move {
// send good commit arguments
commit_tx
.send(prepare_executed_blocks_with_ordered_ledger_info(
&signers[0],
))
.await
.ok();

// check the next two messages from the self loop channel
let commit_vote_msg = self_loop_rx.next().await.unwrap();
if let Event::Message(_, ConsensusMsg::CommitVoteMsg(request)) = commit_vote_msg {
let second_commit_vote_msg = self_loop_rx.next().await.unwrap();
if let Event::Message(_, ConsensusMsg::CommitVoteMsg(second_request)) =
second_commit_vote_msg
{
assert_eq!(request, second_request);
return;
}
}
panic!("We expect only commit vote messages from the self loop channel in this test.");
});
}

// [ Attention ]
// These e2e tests below are end-to-end negative tests.
// They might yield false negative results if now_or_never() is called
Expand Down
4 changes: 4 additions & 0 deletions types/src/ledger_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,10 @@ impl LedgerInfoWithV0 {
&self.ledger_info
}

pub fn commit_info(&self) -> &BlockInfo {
self.ledger_info.commit_info()
}

pub fn add_signature(&mut self, validator: AccountAddress, signature: Ed25519Signature) {
self.signatures.entry(validator).or_insert(signature);
}
Expand Down

0 comments on commit c23b768

Please sign in to comment.