Skip to content

Commit

Permalink
Remove committee from CheckpointStore (MystenLabs#2384)
Browse files Browse the repository at this point in the history
  • Loading branch information
lxfind authored Jun 3, 2022
1 parent 9f138b8 commit 394a352
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 183 deletions.
35 changes: 30 additions & 5 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ mod authority_store;
pub use authority_store::{
AuthorityStore, AuthorityStoreWrapper, GatewayStore, ReplicaStore, SuiDataStore,
};
use sui_types::messages_checkpoint::{
CheckpointRequest, CheckpointRequestType, CheckpointResponse,
};
use sui_types::object::Owner;
use sui_types::sui_system_state::SuiSystemState;

Expand Down Expand Up @@ -730,6 +733,32 @@ impl AuthorityState {
Ok((items, (should_subscribe, start, end)))
}

pub fn handle_checkpoint_request(
&self,
request: &CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
let mut checkpoint_store = self
.checkpoints
.as_ref()
.ok_or(SuiError::UnsupportedFeatureError {
error: "Checkpoint not supported".to_owned(),
})?
.lock();
match &request.request_type {
CheckpointRequestType::LatestCheckpointProposal => {
checkpoint_store.handle_latest_proposal(request)
}
CheckpointRequestType::PastCheckpoint(seq) => {
checkpoint_store.handle_past_checkpoint(request.detail, *seq)
}
CheckpointRequestType::SetCertificate(cert, opt_contents) => checkpoint_store
.handle_checkpoint_certificate(cert, opt_contents, &self.committee.load()),
CheckpointRequestType::SetFragment(fragment) => {
checkpoint_store.handle_receive_fragment(fragment, &self.committee.load())
}
}
}

pub async fn new(
committee: Committee,
name: AuthorityName,
Expand Down Expand Up @@ -873,10 +902,6 @@ impl AuthorityState {
Ok(())
}

pub(crate) fn checkpoints(&self) -> Option<Arc<Mutex<CheckpointStore>>> {
self.checkpoints.clone()
}

pub(crate) fn db(&self) -> Arc<AuthorityStore> {
self.database.clone()
}
Expand Down Expand Up @@ -1251,7 +1276,7 @@ impl ExecutionState for AuthorityState {
if let Some(checkpoint) = &self.checkpoints {
checkpoint
.lock()
.handle_internal_fragment(seq, *fragment)
.handle_internal_fragment(seq, *fragment, &self.committee.load())
.map_err(|e| SuiError::from(&e.to_string()[..]))?;

// NOTE: The method `handle_internal_fragment` is idempotent, so we don't need
Expand Down
35 changes: 22 additions & 13 deletions crates/sui-core/src/authority_active/checkpoint_driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,11 @@ pub async fn checkpoint_process<A>(
// In either case try to upgrade the signed checkpoint to a certified one
// if possible
let result = {
state_checkpoints
.lock()
.handle_checkpoint_certificate(&checkpoint, &None)
state_checkpoints.lock().handle_checkpoint_certificate(
&checkpoint,
&None,
&active_authority.state.committee.load(),
)
}; // unlock

if let Err(err) = result {
Expand All @@ -162,9 +164,11 @@ pub async fn checkpoint_process<A>(
.await
{
// Retry with contents
let _ = state_checkpoints
.lock()
.handle_checkpoint_certificate(&checkpoint, &Some(contents));
let _ = state_checkpoints.lock().handle_checkpoint_certificate(
&checkpoint,
&Some(contents),
&active_authority.state.committee.load(),
);
}
}
}
Expand Down Expand Up @@ -424,9 +428,10 @@ where
let (past, _contents) =
get_one_checkpoint(net.clone(), seq, false, &available_authorities).await?;

if let Err(err) = checkpoint_db
.lock()
.handle_checkpoint_certificate(&past, &None)
if let Err(err) =
checkpoint_db
.lock()
.handle_checkpoint_certificate(&past, &None, &net.committee)
{
warn!("Error handling certificate: {err:?}");
}
Expand All @@ -445,9 +450,10 @@ where
let (past, _contents) =
get_one_checkpoint(net.clone(), seq, true, &available_authorities).await?;

if let Err(err) = checkpoint_db
.lock()
.handle_checkpoint_certificate(&past, &_contents)
if let Err(err) =
checkpoint_db
.lock()
.handle_checkpoint_certificate(&past, &_contents, &net.committee)
{
warn!("Sync Err: {err:?}");
}
Expand Down Expand Up @@ -642,7 +648,10 @@ pub async fn diff_proposals<A>(
let proposer = &fragment.proposer.0.authority;
let other = &fragment.other.0.authority;
debug!("Send fragment: {proposer:?} -- {other:?}");
let _ = checkpoint_db.lock().handle_receive_fragment(&fragment);
let _ = checkpoint_db.lock().handle_receive_fragment(
&fragment,
&active_authority.state.committee.load(),
);
}
Err(err) => {
// TODO: some error occurred -- log it.
Expand Down
18 changes: 3 additions & 15 deletions crates/sui-core/src/authority_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,13 +312,7 @@ impl AuthorityAPI for LocalAuthorityClient {
) -> Result<CheckpointResponse, SuiError> {
let state = self.state.clone();

let result = state
.checkpoints
.as_ref()
.unwrap()
.lock()
.handle_checkpoint_request(&request);
result
state.handle_checkpoint_request(&request)
}
}

Expand Down Expand Up @@ -347,14 +341,8 @@ impl LocalAuthorityClient {
let store = Arc::new(AuthorityStore::open(&store_path, None));
let mut checkpoints_path = path.clone();
checkpoints_path.push("checkpoints");
let checkpoints = CheckpointStore::open(
&checkpoints_path,
None,
address,
committee.clone(),
secret.clone(),
)
.expect("Should not fail to open local checkpoint DB");
let checkpoints = CheckpointStore::open(&checkpoints_path, None, address, secret.clone())
.expect("Should not fail to open local checkpoint DB");

let state = AuthorityState::new(
committee.clone(),
Expand Down
16 changes: 6 additions & 10 deletions crates/sui-core/src/authority_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,17 +379,13 @@ impl Validator for ValidatorService {
&self,
request: tonic::Request<CheckpointRequest>,
) -> Result<tonic::Response<CheckpointResponse>, tonic::Status> {
if let Some(checkpoint) = &self.state.checkpoints() {
let request = request.into_inner();

let response = checkpoint
.lock()
.handle_checkpoint_request(&request)
.map_err(|e| tonic::Status::internal(e.to_string()))?;
let request = request.into_inner();

return Ok(tonic::Response::new(response));
}
let response = self
.state
.handle_checkpoint_request(&request)
.map_err(|e| tonic::Status::internal(e.to_string()))?;

Err(tonic::Status::internal("Unsupported".to_string()))
return Ok(tonic::Response::new(response));
}
}
52 changes: 19 additions & 33 deletions crates/sui-core/src/checkpoints/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use sui_types::{
messages::CertifiedTransaction,
messages_checkpoint::{
AuthenticatedCheckpoint, AuthorityCheckpointInfo, CertifiedCheckpoint, CheckpointContents,
CheckpointFragment, CheckpointRequest, CheckpointRequestType, CheckpointResponse,
CheckpointSequenceNumber, CheckpointSummary, SignedCheckpoint, SignedCheckpointProposal,
CheckpointFragment, CheckpointRequest, CheckpointResponse, CheckpointSequenceNumber,
CheckpointSummary, SignedCheckpoint, SignedCheckpointProposal,
},
};
use typed_store::{
Expand Down Expand Up @@ -79,8 +79,6 @@ pub struct CheckpointStore {
// Fixed size, static, identity of the authority
/// The name of this authority.
pub name: AuthorityName,
/// Committee of this Sui instance.
pub committee: Committee,
/// The signature key of the authority.
pub secret: StableSyncAuthoritySigner,

Expand Down Expand Up @@ -213,7 +211,6 @@ impl CheckpointStore {
path: P,
db_options: Option<Options>,
name: AuthorityName,
committee: Committee,
secret: StableSyncAuthoritySigner,
) -> Result<CheckpointStore, SuiError> {
let (options, point_lookup) = default_db_options(db_options);
Expand Down Expand Up @@ -260,7 +257,6 @@ impl CheckpointStore {

let mut checkpoint_db = CheckpointStore {
name,
committee,
secret,
transactions_to_checkpoint,
checkpoint_contents,
Expand All @@ -283,22 +279,6 @@ impl CheckpointStore {

// Define handlers for request

pub fn handle_checkpoint_request(
&mut self,
request: &CheckpointRequest,
) -> Result<CheckpointResponse, SuiError> {
match &request.request_type {
CheckpointRequestType::LatestCheckpointProposal => self.handle_latest_proposal(request),
CheckpointRequestType::PastCheckpoint(seq) => {
self.handle_past_checkpoint(request, *seq)
}
CheckpointRequestType::SetCertificate(cert, opt_contents) => {
self.handle_checkpoint_certificate(cert, opt_contents)
}
CheckpointRequestType::SetFragment(fragment) => self.handle_receive_fragment(fragment),
}
}

pub fn handle_latest_proposal(
&mut self,
request: &CheckpointRequest,
Expand Down Expand Up @@ -355,7 +335,7 @@ impl CheckpointStore {

pub fn handle_past_checkpoint(
&mut self,
request: &CheckpointRequest,
detail: bool,
seq: CheckpointSequenceNumber,
) -> Result<CheckpointResponse, SuiError> {
// Get the checkpoint with a given sequence number
Expand All @@ -367,7 +347,7 @@ impl CheckpointStore {
// If a checkpoint is found, and if requested, return the list of transaction digest in it.
let detail = if let &AuthenticatedCheckpoint::None = &checkpoint {
None
} else if request.detail {
} else if detail {
Some(CheckpointContents::new(
self.checkpoint_contents
.iter()
Expand Down Expand Up @@ -477,9 +457,10 @@ impl CheckpointStore {
pub fn handle_receive_fragment(
&mut self,
fragment: &CheckpointFragment,
committee: &Committee,
) -> Result<CheckpointResponse, SuiError> {
// Check structure is correct and signatures verify
fragment.verify(&self.committee)?;
fragment.verify(committee)?;

// Does the fragment event suggest it is for the current round?
let next_checkpoint_seq = self.next_checkpoint();
Expand Down Expand Up @@ -532,7 +513,7 @@ impl CheckpointStore {
// Since we seem to be missing information to complete it (ie there is a checkpoint
// but we are not included in it.)
loop {
let construct = self.attempt_to_construct_checkpoint();
let construct = self.attempt_to_construct_checkpoint(committee);
// Exit if checkpoint construction leads to an error or returns false
// (ie no new checkpoint is created.)
if construct.is_err() || !construct.unwrap() {
Expand All @@ -558,6 +539,7 @@ impl CheckpointStore {
&mut self,
_seq: ExecutionIndices,
_fragment: CheckpointFragment,
committee: &Committee,
) -> Result<(), FragmentInternalError> {
// Ensure we have not already processed this fragment.
if let Some((last_seq, _)) = self.fragments.iter().skip_to_last().next() {
Expand All @@ -569,7 +551,7 @@ impl CheckpointStore {

// Check structure is correct and signatures verify
_fragment
.verify(&self.committee)
.verify(committee)
.map_err(FragmentInternalError::Error)?;

// Save the new fragment in the DB
Expand Down Expand Up @@ -603,13 +585,16 @@ impl CheckpointStore {
}

// Attempt to move forward, as many times as we can
while self.attempt_to_construct_checkpoint()? {}
while self.attempt_to_construct_checkpoint(committee)? {}
Ok(())
}

/// Attempt to construct the next expected checkpoint, and return true if a new
/// checkpoint is created or false if it is not.
fn attempt_to_construct_checkpoint(&mut self) -> Result<bool, FragmentInternalError> {
fn attempt_to_construct_checkpoint(
&mut self,
committee: &Committee,
) -> Result<bool, FragmentInternalError> {
let next_sequence_number = self.next_checkpoint();
let fragments: Vec<_> = self
.fragments
Expand All @@ -620,7 +605,7 @@ impl CheckpointStore {
// Run the reconstruction logic to build a checkpoint.
let _potential_checkpoint = FragmentReconstruction::construct(
self.next_checkpoint(),
self.committee.clone(),
committee.clone(),
&fragments,
)
.map_err(FragmentInternalError::Error)?;
Expand Down Expand Up @@ -754,6 +739,7 @@ impl CheckpointStore {
&mut self,
checkpoint: &CertifiedCheckpoint,
contents: &Option<CheckpointContents>,
committee: &Committee,
) -> Result<CheckpointResponse, SuiError> {
// Get the record in our checkpoint database for this sequence number.
let current = self
Expand All @@ -769,7 +755,7 @@ impl CheckpointStore {
None => {
if let &Some(contents) = &contents {
// Check and process contents
checkpoint.verify_with_transactions(&self.committee, contents)?;
checkpoint.verify_with_transactions(committee, contents)?;
self.handle_internal_set_checkpoint(checkpoint.checkpoint.clone(), contents)?;
// Then insert it
self.checkpoints.insert(
Expand All @@ -780,7 +766,7 @@ impl CheckpointStore {
// Now that we have the new checkpoint we try to move forward the checkpoint creation
// process. We try to use fragments in the sequence to create past checkpoints.
loop {
let construct = self.attempt_to_construct_checkpoint();
let construct = self.attempt_to_construct_checkpoint(committee);
// Exit if checkpoint construction leads to an error or returns false
// (ie no new checkpoint is created.)
if construct.is_err() || !construct.unwrap() {
Expand All @@ -794,7 +780,7 @@ impl CheckpointStore {
// In this case we have an internal signed checkpoint so we promote it to a
// full certificate.
Some(AuthenticatedCheckpoint::Signed(_)) => {
checkpoint.verify(&self.committee)?;
checkpoint.verify(committee)?;
self.checkpoints.insert(
checkpoint.checkpoint.sequence_number(),
&AuthenticatedCheckpoint::Certified(checkpoint.clone()),
Expand Down
Loading

0 comments on commit 394a352

Please sign in to comment.