Skip to content

Commit

Permalink
[consensus] properly guard the buffer manager requests
Browse files Browse the repository at this point in the history
Previous implementation only guards the `process` function, this commit changes it to guard the whole `request` life cycle to avoid the race condition that the request is created but not dequeued and the counter is 0.
  • Loading branch information
zekun000 committed Jul 13, 2022
1 parent faf9493 commit 1143ceb
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 56 deletions.
27 changes: 16 additions & 11 deletions consensus/src/experimental/buffer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use crate::{
buffer_item::BufferItem,
execution_phase::{ExecutionRequest, ExecutionResponse},
persisting_phase::PersistingRequest,
pipeline_phase::CountedRequest,
signing_phase::{SigningRequest, SigningResponse},
},
network::NetworkSender,
Expand Down Expand Up @@ -75,18 +76,18 @@ pub struct BufferManager {
// the roots point to the first *unprocessed* item.
// None means no items ready to be processed (either all processed or no item finishes previous stage)
execution_root: BufferItemRootType,
execution_phase_tx: Sender<ExecutionRequest>,
execution_phase_tx: Sender<CountedRequest<ExecutionRequest>>,
execution_phase_rx: Receiver<ExecutionResponse>,

signing_root: BufferItemRootType,
signing_phase_tx: Sender<SigningRequest>,
signing_phase_tx: Sender<CountedRequest<SigningRequest>>,
signing_phase_rx: Receiver<SigningResponse>,

commit_msg_tx: NetworkSender,
commit_msg_rx: channel::aptos_channel::Receiver<AccountAddress, VerifiedEvent>,

// we don't hear back from the persisting phase
persisting_phase_tx: Sender<PersistingRequest>,
persisting_phase_tx: Sender<CountedRequest<PersistingRequest>>,

block_rx: UnboundedReceiver<OrderedBlocks>,
reset_rx: UnboundedReceiver<ResetRequest>,
Expand All @@ -100,13 +101,13 @@ pub struct BufferManager {
impl BufferManager {
pub fn new(
author: Author,
execution_phase_tx: Sender<ExecutionRequest>,
execution_phase_tx: Sender<CountedRequest<ExecutionRequest>>,
execution_phase_rx: Receiver<ExecutionResponse>,
signing_phase_tx: Sender<SigningRequest>,
signing_phase_tx: Sender<CountedRequest<SigningRequest>>,
signing_phase_rx: Receiver<SigningResponse>,
commit_msg_tx: NetworkSender,
commit_msg_rx: channel::aptos_channel::Receiver<AccountAddress, VerifiedEvent>,
persisting_phase_tx: Sender<PersistingRequest>,
persisting_phase_tx: Sender<CountedRequest<PersistingRequest>>,
block_rx: UnboundedReceiver<OrderedBlocks>,
reset_rx: UnboundedReceiver<ResetRequest>,
verifier: ValidatorVerifier,
Expand Down Expand Up @@ -141,6 +142,10 @@ impl BufferManager {
}
}

fn create_new_request<Request>(&self, req: Request) -> CountedRequest<Request> {
CountedRequest::new(req, self.ongoing_tasks.clone())
}

/// process incoming ordered blocks
/// push them into the buffer and update the roots if they are none.
fn process_ordered_blocks(&mut self, ordered_blocks: OrderedBlocks) {
Expand All @@ -167,7 +172,7 @@ impl BufferManager {
if self.execution_root.is_some() {
let ordered_blocks = self.buffer.get(&self.execution_root).get_blocks().clone();
self.execution_phase_tx
.send(ExecutionRequest { ordered_blocks })
.send(self.create_new_request(ExecutionRequest { ordered_blocks }))
.await
.expect("Failed to send execution request")
}
Expand All @@ -188,10 +193,10 @@ impl BufferManager {
let item = self.buffer.get(&self.signing_root);
let executed_item = item.unwrap_executed_ref();
self.signing_phase_tx
.send(SigningRequest {
.send(self.create_new_request(SigningRequest {
ordered_ledger_info: executed_item.ordered_proof.clone(),
commit_ledger_info: executed_item.commit_proof.ledger_info().clone(),
})
}))
.await
.expect("Failed to send signing request");
}
Expand Down Expand Up @@ -226,7 +231,7 @@ impl BufferManager {
.await;
}
self.persisting_phase_tx
.send(PersistingRequest {
.send(self.create_new_request(PersistingRequest {
blocks: blocks_to_persist,
commit_ledger_info: aggregated_item.commit_proof,
// we use the last callback
Expand All @@ -236,7 +241,7 @@ impl BufferManager {
// the block_tree and storage are the same for all the callbacks in the current epoch
// the commit root is used in logging only.
callback: aggregated_item.callback,
})
}))
.await
.expect("Failed to send persist request");
debug!("Advance head to {:?}", self.buffer.head_cursor());
Expand Down
12 changes: 5 additions & 7 deletions consensus/src/experimental/decoupled_execution_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
buffer_manager::{create_channel, BufferManager, OrderedBlocks, ResetRequest},
execution_phase::{ExecutionPhase, ExecutionRequest, ExecutionResponse},
persisting_phase::{PersistingPhase, PersistingRequest},
pipeline_phase::PipelinePhase,
pipeline_phase::{CountedRequest, PipelinePhase},
signing_phase::{SigningPhase, SigningRequest, SigningResponse},
},
metrics_safety_rules::MetricsSafetyRules,
Expand Down Expand Up @@ -40,7 +40,7 @@ pub fn prepare_phases_and_buffer_manager(
) {
// Execution Phase
let (execution_phase_request_tx, execution_phase_request_rx) =
create_channel::<ExecutionRequest>();
create_channel::<CountedRequest<ExecutionRequest>>();
let (execution_phase_response_tx, execution_phase_response_rx) =
create_channel::<ExecutionResponse>();

Expand All @@ -51,11 +51,11 @@ pub fn prepare_phases_and_buffer_manager(
execution_phase_request_rx,
Some(execution_phase_response_tx),
Box::new(execution_phase_processor),
ongoing_tasks.clone(),
);

// Signing Phase
let (signing_phase_request_tx, signing_phase_request_rx) = create_channel::<SigningRequest>();
let (signing_phase_request_tx, signing_phase_request_rx) =
create_channel::<CountedRequest<SigningRequest>>();
let (signing_phase_response_tx, signing_phase_response_rx) =
create_channel::<SigningResponse>();

Expand All @@ -64,19 +64,17 @@ pub fn prepare_phases_and_buffer_manager(
signing_phase_request_rx,
Some(signing_phase_response_tx),
Box::new(signing_phase_processor),
ongoing_tasks.clone(),
);

// Persisting Phase
let (persisting_phase_request_tx, persisting_phase_request_rx) =
create_channel::<PersistingRequest>();
create_channel::<CountedRequest<PersistingRequest>>();

let persisting_phase_processor = PersistingPhase::new(persisting_proxy);
let persisting_phase = PipelinePhase::new(
persisting_phase_request_rx,
None,
Box::new(persisting_phase_processor),
ongoing_tasks.clone(),
);

(
Expand Down
41 changes: 33 additions & 8 deletions consensus/src/experimental/pipeline_phase.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,59 @@ pub trait StatelessPipeline: Send + Sync {
async fn process(&self, req: Self::Request) -> Self::Response;
}

struct TaskGuard {
counter: Arc<AtomicU64>,
}

impl TaskGuard {
fn new(counter: Arc<AtomicU64>) -> Self {
counter.fetch_add(1, Ordering::SeqCst);
Self { counter }
}
}

impl Drop for TaskGuard {
fn drop(&mut self) {
self.counter.fetch_sub(1, Ordering::SeqCst);
}
}

pub struct CountedRequest<Request> {
req: Request,
guard: TaskGuard,
}

impl<Request> CountedRequest<Request> {
pub fn new(req: Request, counter: Arc<AtomicU64>) -> Self {
let guard = TaskGuard::new(counter);
Self { req, guard }
}
}

pub struct PipelinePhase<T: StatelessPipeline> {
rx: Receiver<T::Request>,
rx: Receiver<CountedRequest<T::Request>>,
maybe_tx: Option<Sender<T::Response>>,
processor: Box<T>,
ongoing_tasks: Arc<AtomicU64>,
}

impl<T: StatelessPipeline> PipelinePhase<T> {
pub fn new(
rx: Receiver<T::Request>,
rx: Receiver<CountedRequest<T::Request>>,
maybe_tx: Option<Sender<T::Response>>,
processor: Box<T>,
ongoing_tasks: Arc<AtomicU64>,
) -> Self {
Self {
rx,
maybe_tx,
processor,
ongoing_tasks,
}
}

pub async fn start(mut self) {
// main loop
while let Some(req) = self.rx.next().await {
self.ongoing_tasks.fetch_add(1, Ordering::SeqCst);
while let Some(counted_req) = self.rx.next().await {
let CountedRequest { req, guard: _guard } = counted_req;
let response = self.processor.process(req).await;
self.ongoing_tasks.fetch_sub(1, Ordering::SeqCst);
if let Some(tx) = &mut self.maybe_tx {
if tx.send(response).await.is_err() {
break;
Expand Down
25 changes: 11 additions & 14 deletions consensus/src/experimental/tests/execution_phase_tests.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

use std::sync::{atomic::AtomicU64, Arc};

use crate::{
experimental::{
buffer_manager::create_channel,
execution_phase::{ExecutionPhase, ExecutionRequest, ExecutionResponse},
pipeline_phase::{CountedRequest, PipelinePhase},
tests::phase_tester::PhaseTester,
},
test_utils::{consensus_runtime, RandomComputeResultStateComputer},
};
use aptos_crypto::HashValue;
use aptos_types::{ledger_info::LedgerInfo, validator_verifier::random_validator_verifier};
use consensus_types::{
Expand All @@ -12,16 +19,7 @@ use consensus_types::{
quorum_cert::QuorumCert,
};
use executor_types::{Error, StateComputeResult};

use crate::{
experimental::{
buffer_manager::create_channel,
execution_phase::{ExecutionPhase, ExecutionRequest, ExecutionResponse},
pipeline_phase::PipelinePhase,
tests::phase_tester::PhaseTester,
},
test_utils::{consensus_runtime, RandomComputeResultStateComputer},
};
use std::sync::Arc;

pub fn prepare_execution_phase() -> (HashValue, ExecutionPhase) {
let execution_proxy = Arc::new(RandomComputeResultStateComputer::new());
Expand Down Expand Up @@ -95,14 +93,13 @@ fn execution_phase_tests() {
unit_phase_tester.unit_test(&execution_phase);

// e2e tests
let (in_channel_tx, in_channel_rx) = create_channel::<ExecutionRequest>();
let (in_channel_tx, in_channel_rx) = create_channel::<CountedRequest<ExecutionRequest>>();
let (out_channel_tx, out_channel_rx) = create_channel::<ExecutionResponse>();

let execution_phase_pipeline = PipelinePhase::new(
in_channel_rx,
Some(out_channel_tx),
Box::new(execution_phase),
Arc::new(AtomicU64::new(0)),
);

runtime.spawn(execution_phase_pipeline.start());
Expand Down
13 changes: 10 additions & 3 deletions consensus/src/experimental/tests/phase_tester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
use crate::{
experimental::{
buffer_manager::{Receiver, Sender},
pipeline_phase::StatelessPipeline,
pipeline_phase::{CountedRequest, StatelessPipeline},
},
test_utils::{consensus_runtime, timed_block_on},
};
use futures::{SinkExt, StreamExt};
use std::sync::{atomic::AtomicU64, Arc};

pub struct PhaseTestCase<T: StatelessPipeline> {
index: usize,
Expand Down Expand Up @@ -76,7 +77,11 @@ impl<T: StatelessPipeline> PhaseTester<T> {

// e2e tests are for the pipeline phases
// this function consumes the tester
pub fn e2e_test(self, mut tx: Sender<T::Request>, mut rx: Receiver<T::Response>) {
pub fn e2e_test(
self,
mut tx: Sender<CountedRequest<T::Request>>,
mut rx: Receiver<T::Response>,
) {
let mut runtime = consensus_runtime();

timed_block_on(&mut runtime, async move {
Expand All @@ -93,7 +98,9 @@ impl<T: StatelessPipeline> PhaseTester<T> {
termion::style::Reset,
prompt.unwrap_or(format!("Test {}", index))
);
tx.send(input).await.ok();
tx.send(CountedRequest::new(input, Arc::new(AtomicU64::new(0))))
.await
.ok();
let resp = rx.next().await.unwrap();
judge(resp);
eprintln!(
Expand Down
19 changes: 6 additions & 13 deletions consensus/src/experimental/tests/signing_phase_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use crate::{
experimental::{
buffer_manager::{create_channel, Receiver, Sender},
pipeline_phase::PipelinePhase,
pipeline_phase::{CountedRequest, PipelinePhase},
signing_phase::{SigningPhase, SigningRequest, SigningResponse},
tests::{
phase_tester::PhaseTester,
Expand All @@ -23,28 +23,21 @@ use aptos_types::{
validator_signer::ValidatorSigner,
};
use safety_rules::Error;
use std::{
collections::BTreeMap,
sync::{atomic::AtomicU64, Arc},
};
use std::collections::BTreeMap;

pub fn prepare_signing_pipeline(
signing_phase: SigningPhase,
) -> (
Sender<SigningRequest>,
Sender<CountedRequest<SigningRequest>>,
Receiver<SigningResponse>,
PipelinePhase<SigningPhase>,
) {
// e2e tests
let (in_channel_tx, in_channel_rx) = create_channel::<SigningRequest>();
let (in_channel_tx, in_channel_rx) = create_channel::<CountedRequest<SigningRequest>>();
let (out_channel_tx, out_channel_rx) = create_channel::<SigningResponse>();

let signing_phase_pipeline = PipelinePhase::new(
in_channel_rx,
Some(out_channel_tx),
Box::new(signing_phase),
Arc::new(AtomicU64::new(0)),
);
let signing_phase_pipeline =
PipelinePhase::new(in_channel_rx, Some(out_channel_tx), Box::new(signing_phase));

(in_channel_tx, out_channel_rx, signing_phase_pipeline)
}
Expand Down

0 comments on commit 1143ceb

Please sign in to comment.