Skip to content

Commit

Permalink
[decoupled-execution] buffer manager main loop logic (w/o message retry)
Browse files Browse the repository at this point in the history
also removed the response from ther persisting phase (assuming state_computer's commit function handles the error by panics)

Closes: aptos-labs#9178
  • Loading branch information
yuxiamit authored and bors-libra committed Sep 15, 2021
1 parent 3387247 commit 1850ccb
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 25 deletions.
83 changes: 67 additions & 16 deletions consensus/src/experimental/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ use diem_types::{
};

use crate::{
counters,
experimental::{
buffer_item::BufferItem,
execution_phase::{ExecutionRequest, ExecutionResponse},
linkedlist::{get_elem, get_elem_mut, get_next, link_eq, set_elem, take_elem, Link, List},
persisting_phase::{PersistingRequest, PersistingResponse},
persisting_phase::PersistingRequest,
signing_phase::{SigningRequest, SigningResponse},
},
network::NetworkSender,
network_interface::ConsensusMsg,
round_manager::VerifiedEvent,
state_replication::StateComputerCommitCallBackType,
};
use diem_logger::prelude::*;
use futures::StreamExt;

pub type SyncAck = ();

Expand Down Expand Up @@ -74,8 +77,8 @@ pub struct StateManager {
commit_msg_tx: NetworkSender,
commit_msg_rx: channel::diem_channel::Receiver<AccountAddress, VerifiedEvent>,

// we don't hear back from the persisting phase
persisting_phase_tx: Sender<PersistingRequest>,
persisting_phase_rx: Receiver<PersistingResponse>,

block_rx: UnboundedReceiver<OrderedBlocks>,
sync_rx: UnboundedReceiver<SyncRequest>,
Expand All @@ -94,7 +97,6 @@ impl StateManager {
commit_msg_tx: NetworkSender,
commit_msg_rx: channel::diem_channel::Receiver<AccountAddress, VerifiedEvent>,
persisting_phase_tx: Sender<PersistingRequest>,
persisting_phase_rx: Receiver<PersistingResponse>,
block_rx: UnboundedReceiver<OrderedBlocks>,
sync_rx: UnboundedReceiver<SyncRequest>,
verifier: ValidatorVerifier,
Expand Down Expand Up @@ -122,7 +124,6 @@ impl StateManager {
commit_msg_rx,

persisting_phase_tx,
persisting_phase_rx,

block_rx,
sync_rx,
Expand All @@ -132,6 +133,8 @@ impl StateManager {
}
}

/// process incoming ordered blocks
/// push them into the buffer and update the roots if they are none.
async fn process_ordered_blocks(
&mut self,
ordered_blocks: OrderedBlocks,
Expand Down Expand Up @@ -164,6 +167,9 @@ impl StateManager {
Ok(())
}

/// check if the items at and after the execution root is already executed
/// if yes, move the execution root to the first *unexecuted* item.
/// if there is no such item, set it to none.
fn try_advance_executed_root(&mut self) {
let mut cursor = self.execution_root.clone();
while cursor.is_some() {
Expand All @@ -178,6 +184,9 @@ impl StateManager {
self.execution_root = cursor;
}

/// check if the items at and after the signing root is already signed
/// if yes, move the signing root to the first *unsigned* item.
/// if there is no such item, set it to none.
fn try_advance_signing_root(&mut self) {
let mut cursor = self.signing_root.clone();
while cursor.is_some() {
Expand All @@ -195,6 +204,8 @@ impl StateManager {
self.signing_root = cursor;
}

/// check if a prefix of the buffer is ready to persist,
/// if yes, send them to the persisting phase and dequeue the buffer items.
fn try_persisting_blocks(&mut self) {
let mut cursor = self.buffer.head.as_ref().cloned();
while cursor.is_some() {
Expand Down Expand Up @@ -222,6 +233,7 @@ impl StateManager {
}
}

/// update the root to make sure that they point to the first *unprocessed* item.
fn reset_all_roots(&mut self) {
// reset all the roots (in a better way)
self.signing_root = self.buffer.head.clone();
Expand All @@ -230,7 +242,11 @@ impl StateManager {
self.try_advance_executed_root();
}

async fn process_sync_req(&mut self, sync_event: SyncRequest) -> anyhow::Result<()> {
/// this function processes a sync request
/// if reconfig flag is set, it stops the main loop
/// otherwise, it empties the buffer till ledger_info, and update the roots
/// finally, it sends back an ack.
async fn process_sync_request(&mut self, sync_event: SyncRequest) -> anyhow::Result<()> {
let SyncRequest {
tx,
ledger_info,
Expand Down Expand Up @@ -326,9 +342,11 @@ impl StateManager {
Ok(())
}

/// this function handles the execution response and updates the buffer
/// if the execution fails: it re-collects a larger batch and retries.
async fn process_execution_resp(
/// this function handles the execution response
/// if the execution succeeded, it calls process_successful_execution_response
/// to update the buffer and sends an signing request.
/// if the execution fails: it re-collects a larger batch and retries an execution request.
async fn process_execution_response(
&mut self,
execution_resp: ExecutionResponse,
) -> anyhow::Result<()> {
Expand Down Expand Up @@ -373,7 +391,9 @@ impl StateManager {
}
}

async fn process_successful_signing_resp(
/// if the signing response is successful, update the signature of
/// the corresponding buffer item, and broadcast a commit vote message
async fn process_successful_signing_response(
&mut self,
sig: Ed25519Signature,
commit_ledger_info: LedgerInfo,
Expand Down Expand Up @@ -418,16 +438,15 @@ impl StateManager {
Ok(())
}

async fn process_signing_response(
&mut self,
signing_resp: SigningResponse,
) -> anyhow::Result<()> {
/// if the signing response is successful, call process_successful_signing_response
/// otherwise, retry the item pointed by the signing root.
async fn process_signing_response(&mut self, response: SigningResponse) -> anyhow::Result<()> {
let SigningResponse {
signature_result,
commit_ledger_info,
} = signing_resp;
} = response;
if let Ok(sig) = signature_result {
self.process_successful_signing_resp(sig, commit_ledger_info)
self.process_successful_signing_response(sig, commit_ledger_info)
.await
} else {
// try next signature if signing failure
Expand Down Expand Up @@ -464,6 +483,9 @@ impl StateManager {
}
}

/// process the commit vote messages
/// it scans the whole buffer for a matching blockinfo
/// if found, try advancing the item to be aggregated
async fn process_commit_msg(&mut self, commit_msg: VerifiedEvent) -> anyhow::Result<()> {
match commit_msg {
VerifiedEvent::CommitVote(cv) => {
Expand Down Expand Up @@ -502,8 +524,37 @@ impl StateManager {
Ok(())
}

async fn start(self) {
async fn start(mut self) {
info!("Buffer manager starts.");

while !self.end_epoch {
// TODO: retry sending commit votes periodically

// process new messages
if let Err(e) = tokio::select! {
Some(blocks) = self.block_rx.next() => {
self.process_ordered_blocks(blocks).await
}
Some(reset_event) = self.sync_rx.next() => {
self.process_sync_request(reset_event).await
}
Some(execution_resp) = self.execution_phase_rx.next() => {
self.process_execution_response(execution_resp).await
}
Some(signing_resp) = self.signing_phase_rx.next() => {
self.process_signing_response(signing_resp).await
}
Some(commit_msg) = self.commit_msg_rx.next() => {
self.process_commit_msg(commit_msg).await
}
else => {
break;
}
} {
counters::ERROR_COUNT.inc();
error!("BufferManager error: {}", e.to_string());
}
}
// loop receving new blocks or reset
// while !self.end_epoch {

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/experimental/execution_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl StatelessPipeline for ExecutionPhase {
};

ResponseWithInstruction {
resp: ExecutionResponse { inner: resp_inner },
response: ExecutionResponse { inner: resp_inner },
instruction,
}
}
Expand Down
12 changes: 8 additions & 4 deletions consensus/src/experimental/persisting_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
};

use crate::{
experimental::pipeline_phase::{ResponseWithInstruction, StatelessPipeline},
experimental::pipeline_phase::{Instruction, ResponseWithInstruction, StatelessPipeline},
state_replication::{StateComputer, StateComputerCommitCallBackType},
};
use async_trait::async_trait;
Expand Down Expand Up @@ -65,10 +65,14 @@ impl StatelessPipeline for PersistingPhase {
callback,
} = req;

ResponseWithInstruction::from(
self.persisting_handle
// we do not return a response
// assuming the commit function panicks if there is any error.
ResponseWithInstruction {
response: self
.persisting_handle
.commit(&blocks, commit_ledger_info, callback)
.await,
)
instruction: Instruction::NoResponse,
}
}
}
11 changes: 8 additions & 3 deletions consensus/src/experimental/pipeline_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,18 @@ use std::hint;
pub enum Instruction {
Ok,
Clear,
NoResponse,
}

pub struct ResponseWithInstruction<T> {
pub resp: T,
pub response: T,
pub instruction: Instruction,
}

impl<T> From<T> for ResponseWithInstruction<T> {
fn from(val: T) -> Self {
Self {
resp: val,
response: val,
instruction: Instruction::Ok,
}
}
Expand Down Expand Up @@ -58,10 +59,14 @@ impl<T: StatelessPipeline> PipelinePhase<T> {
pub async fn start(mut self) {
// main loop
while let Some(req) = self.rx.next().await {
let ResponseWithInstruction { resp, instruction } = self.processor.process(req).await;
let ResponseWithInstruction {
response: resp,
instruction,
} = self.processor.process(req).await;
match instruction {
Instruction::Ok => {}
Instruction::Clear => self.exhaust_requests_non_blocking(),
Instruction::NoResponse => continue,
}
if self.tx.send(resp).await.is_err() {
break;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/experimental/tests/execution_phase_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn test_execution_phase_process() {

timed_block_on(&mut runtime, async move {
let ResponseWithInstruction {
resp,
response: resp,
instruction: _,
} = execution_phase
.process(ExecutionRequest {
Expand Down

0 comments on commit 1850ccb

Please sign in to comment.