Skip to content

Commit

Permalink
Checkpoint integration test (MystenLabs#2343)
Browse files Browse the repository at this point in the history
checkpoint-consensus integration test
  • Loading branch information
asonnino authored Jun 6, 2022
1 parent d92a8e7 commit 893b4b4
Show file tree
Hide file tree
Showing 8 changed files with 166 additions and 56 deletions.
67 changes: 27 additions & 40 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,10 @@ impl AuthorityState {
state
}

pub fn checkpoints(&self) -> Option<Arc<Mutex<CheckpointStore>>> {
self.checkpoints.clone()
}

pub(crate) fn insert_new_epoch_info(&self, new_committee: &Committee) -> SuiResult {
let current_epoch_info = self.database.get_last_epoch_info()?;
fp_ensure!(
Expand Down
38 changes: 35 additions & 3 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

use crate::{
authority::AuthorityState,
consensus_adapter::{ConsensusAdapter, ConsensusListener, ConsensusListenerMessage},
consensus_adapter::{
CheckpointConsensusAdapter, CheckpointSender, ConsensusAdapter, ConsensusListener,
ConsensusListenerMessage,
},
};
use anyhow::anyhow;
use anyhow::Result;
Expand All @@ -19,7 +22,10 @@ use sui_network::{
};

use sui_types::{crypto::VerificationObligation, error::*, messages::*};
use tokio::sync::mpsc::{channel, Sender};
use tokio::{
sync::mpsc::{channel, Sender},
task::JoinHandle,
};

use sui_types::messages_checkpoint::CheckpointRequest;
use sui_types::messages_checkpoint::CheckpointResponse;
Expand Down Expand Up @@ -129,6 +135,7 @@ impl AuthorityServer {
.add_service(ValidatorServer::new(ValidatorService {
state: self.state,
consensus_adapter: self.consensus_adapter,
_checkpoint_consensus_handle: None,
}))
.bind(&address)
.await
Expand All @@ -147,6 +154,7 @@ impl AuthorityServer {
pub struct ValidatorService {
state: Arc<AuthorityState>,
consensus_adapter: ConsensusAdapter,
_checkpoint_consensus_handle: Option<JoinHandle<()>>,
}

impl ValidatorService {
Expand Down Expand Up @@ -189,17 +197,41 @@ impl ValidatorService {
/* max_pending_transactions */ 1_000_000,
);

// The consensus adapter allows the authority to send user certificates through consensus.
let consensus_adapter = ConsensusAdapter::new(
state.clone(),
consensus_config.address().to_owned(),
state.clone_committee(),
tx_sui_to_consensus,
tx_sui_to_consensus.clone(),
/* max_delay */ Duration::from_millis(5_000),
);

// Update the checkpoint store with a consensus client.
let checkpoint_consensus_handle = if let Some(checkpoint_store) = state.checkpoints() {
let (tx_checkpoint_consensus_adapter, rx_checkpoint_consensus_adapter) = channel(1_000);
let consensus_sender = CheckpointSender::new(tx_checkpoint_consensus_adapter);
checkpoint_store
.lock()
.set_consensus(Box::new(consensus_sender))?;

let handle = CheckpointConsensusAdapter::new(
/* consensus_address */ consensus_config.address().to_owned(),
/* tx_consensus_listener */ tx_sui_to_consensus,
rx_checkpoint_consensus_adapter,
/* checkpoint_locals */ checkpoint_store.lock().get_locals(),
/* retry_delay */ Duration::from_millis(5_000),
/* max_pending_transactions */ 10_000,
)
.spawn();
Some(handle)
} else {
None
};

Ok(Self {
state,
consensus_adapter,
_checkpoint_consensus_handle: checkpoint_consensus_handle,
})
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -856,7 +856,7 @@ impl CheckpointStore {
// Helper write functions

/// Set the next checkpoint proposal.
fn set_proposal(&mut self) -> Result<CheckpointProposal, SuiError> {
pub fn set_proposal(&mut self) -> Result<CheckpointProposal, SuiError> {
// Check that:
// - there is no current proposal.
// - there are no unprocessed transactions.
Expand Down
20 changes: 13 additions & 7 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use crate::authority::AuthorityState;
use crate::checkpoints::CheckpointLocals;
use crate::checkpoints::ConsensusSender;
use arc_swap::ArcSwapOption;
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
Expand Down Expand Up @@ -278,6 +277,14 @@ pub struct CheckpointSender {
tx_checkpoint_consensus_adapter: Sender<CheckpointFragment>,
}

impl CheckpointSender {
pub fn new(tx_checkpoint_consensus_adapter: Sender<CheckpointFragment>) -> Self {
Self {
tx_checkpoint_consensus_adapter,
}
}
}

impl ConsensusSender for CheckpointSender {
fn send_to_consensus(&self, fragment: CheckpointFragment) -> SuiResult {
self.tx_checkpoint_consensus_adapter
Expand All @@ -295,7 +302,7 @@ pub struct CheckpointConsensusAdapter {
/// Receive new checkpoint fragments to sequence.
rx_checkpoint_consensus_adapter: Receiver<CheckpointFragment>,
/// A pointer to the checkpoints local store.
checkpoint_locals: ArcSwapOption<CheckpointLocals>,
checkpoint_locals: Arc<CheckpointLocals>,
/// The initial delay to wait before re-attempting a connection with consensus (in ms).
retry_delay: Duration,
/// The maximum number of checkpoint fragment pending sequencing.
Expand All @@ -310,7 +317,7 @@ impl CheckpointConsensusAdapter {
consensus_address: Multiaddr,
tx_consensus_listener: Sender<ConsensusListenerMessage>,
rx_checkpoint_consensus_adapter: Receiver<CheckpointFragment>,
checkpoint_locals: ArcSwapOption<CheckpointLocals>,
checkpoint_locals: Arc<CheckpointLocals>,
retry_delay: Duration,
max_pending_transactions: usize,
) -> Self {
Expand All @@ -332,8 +339,8 @@ impl CheckpointConsensusAdapter {
}

/// Spawn a `CheckpointConsensusAdapter` in a dedicated tokio task.
pub fn spawn(mut instance: Self) -> JoinHandle<()> {
tokio::spawn(async move { instance.run().await })
pub fn spawn(mut self) -> JoinHandle<()> {
tokio::spawn(async move { self.run().await })
}

/// Submit a transaction to consensus.
Expand Down Expand Up @@ -403,8 +410,7 @@ impl CheckpointConsensusAdapter {
// Cleanup the buffer.
if self.buffer.len() >= self.max_pending_transactions {
// Drop the earliest fragments. They are not needed for liveness.
let locals = self.checkpoint_locals.load_full();
if let Some(proposal) = &locals.as_ref().unwrap().current_proposal {
if let Some(proposal) = &self.checkpoint_locals.current_proposal {
let current_sequence_number = proposal.sequence_number();
self.buffer.retain(|(_, s)| s >= current_sequence_number);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/sui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ workspace-hack = { path = "../workspace-hack"}

[dev-dependencies]
pretty_assertions = "1.2.0"

tokio-util = { version = "0.7.2", features = ["codec"] }
typed-store = { git = "https://github.com/MystenLabs/mysten-infra", rev = "ff5c1d69057fe93be658377462ca2875a57a0223"}
test-utils = { path = "../test-utils" }

[features]
Expand Down
Loading

0 comments on commit 893b4b4

Please sign in to comment.