Skip to content

Commit

Permalink
[decoupled-execution] We added a new struct OrderingStateComputer to …
Browse files Browse the repository at this point in the history
…bypass the execution and commit in the state computer;

We added two fields in ConsensusConfig:

+ consensus.decoupled: the global switch to turn on and off the decoupled execution feature.
+ consensus.channel_size: the size of the channel used to pass the blocks from ordering state computer to the next phase.

Related unit tests will be added later after we finish integration.

Closes: aptos-labs#8616
  • Loading branch information
Yu Xia authored and bors-libra committed Jun 29, 2021
1 parent 53ed41a commit 7574d45
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 0 deletions.
6 changes: 6 additions & 0 deletions config/src/config/consensus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub struct ConsensusConfig {
pub sync_only: bool,
// how many times to wait for txns from mempool when propose
pub mempool_poll_count: u64,
// global switch for the decoupling execution feature
// only when decoupled is true, the execution and committing will be pipelined in different phases
pub decoupled: bool,
pub channel_size: usize,
}

impl Default for ConsensusConfig {
Expand All @@ -42,6 +46,8 @@ impl Default for ConsensusConfig {
safety_rules: SafetyRulesConfig::default(),
sync_only: false,
mempool_poll_count: 1,
decoupled: false, // by default, we turn of the decoupling execution feature
channel_size: 30, // hard-coded
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ pub fn start_consensus(
node_config.consensus.mempool_executed_txn_timeout_ms,
));
let execution_correctness_manager = ExecutionCorrectnessManager::new(node_config);

let state_computer = Arc::new(ExecutionProxy::new(
execution_correctness_manager.client(),
state_sync_client,
));

let time_service = Arc::new(ClockTimeService::new(runtime.handle().clone()));

let (timeout_sender, timeout_receiver) = channel::new(1_024, &counters::PENDING_ROUND_TIMEOUTS);
Expand Down
4 changes: 4 additions & 0 deletions consensus/src/experimental/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

pub mod ordering_state_computer;
82 changes: 82 additions & 0 deletions consensus/src/experimental/ordering_state_computer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::{error::StateSyncError, state_replication::StateComputer};
use anyhow::Result;
use channel::Sender;
use consensus_types::{block::Block, executed_block::ExecutedBlock};
use diem_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use diem_types::ledger_info::LedgerInfoWithSignatures;
use executor_types::{Error as ExecutionError, StateComputeResult};
use fail::fail_point;
use futures::SinkExt;
use std::{boxed::Box, sync::Arc};

/// Ordering-only execution proxy
/// implements StateComputer traits.
/// Used only when node_config.validator.consensus.decoupled = true.
pub struct OrderingStateComputer {
// the channel to pour vectors of blocks into
// the real execution phase (will be handled in ExecutionPhase).
executor_channel: Sender<(Vec<Block>, LedgerInfoWithSignatures)>,
}

impl OrderingStateComputer {
pub fn new(executor_channel: Sender<(Vec<Block>, LedgerInfoWithSignatures)>) -> Self {
Self { executor_channel }
}
}

#[async_trait::async_trait]
impl StateComputer for OrderingStateComputer {
fn compute(
&self,
// The block to be executed.
_block: &Block,
// The parent block id.
_parent_block_id: HashValue,
) -> Result<StateComputeResult, ExecutionError> {
// Return dummy block and bypass the execution phase.
// This will break the e2e smoke test (for now because
// no one is actually handling the next phase) if the
// decoupled execution feature is turned on.
Ok(StateComputeResult::new(
*ACCUMULATOR_PLACEHOLDER_HASH,
vec![],
0,
vec![],
0,
None,
vec![],
vec![],
vec![],
))
}

/// Send ordered blocks to the real execution phase through the channel.
/// A future is fulfilled right away when the blocks are sent into the channel.
async fn commit(
&self,
blocks: &[Arc<ExecutedBlock>],
finality_proof: LedgerInfoWithSignatures,
) -> Result<(), ExecutionError> {
let ordered_block = blocks.iter().map(|b| b.block().clone()).collect();

self.executor_channel
.clone()
.send((ordered_block, finality_proof))
.await
.map_err(|e| ExecutionError::InternalError {
error: e.to_string(),
})?;
Ok(())
}

/// Synchronize to a commit that not present locally.
async fn sync_to(&self, _target: LedgerInfoWithSignatures) -> Result<(), StateSyncError> {
fail_point!("consensus::sync_to", |_| {
Err(anyhow::anyhow!("Injected error in sync_to").into())
});
unimplemented!();
}
}
1 change: 1 addition & 0 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod consensusdb;
mod counters;
mod epoch_manager;
mod error;
mod experimental;
mod liveness;
mod logging;
mod metrics_safety_rules;
Expand Down

0 comments on commit 7574d45

Please sign in to comment.