Skip to content

Commit

Permalink
[authority] Move reconfiguration state into PerEpochStore (MystenLabs…
Browse files Browse the repository at this point in the history
…#6497)

The reconfiguration state information is tightly coupled to other epoch tables. For example, we do state transaction from CloseUserCerts => CloseAllCerts based on `epoch_tables.end_of_publish`.

If external event happens, for example due to state sync where we need to change an epoch any state regarding reconfiguration of previous epoch becomes irrelevant.

I did not think super well yet what to do with the RwLock holding the ReconfigurationState in regards with bigger picture. I think most likely the `epoch_tables` itself needs to be stored in the RwLock. (will do that in separate PR when I have more clarity on that)

Note that `open_all_certs` method is also deleted in this case - there is no way to transition back to open reconfiguration state, other than by switching to a new epoch(that will create empty reconfiguration state table).
  • Loading branch information
andll authored Nov 30, 2022
1 parent 56cb6c7 commit 81778e5
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 75 deletions.
50 changes: 6 additions & 44 deletions crates/sui-core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use move_core_types::identifier::Identifier;
use move_core_types::parser::parse_struct_tag;
use move_core_types::{language_storage::ModuleId, resolver::ModuleResolver};
use move_vm_runtime::{move_vm::MoveVM, native_functions::NativeFunctionTable};
use parking_lot::RwLock;
use prometheus::{
exponential_buckets, register_histogram_with_registry, register_int_counter_with_registry,
register_int_gauge_with_registry, Histogram, IntCounter, IntGauge, Registry,
Expand Down Expand Up @@ -557,9 +556,6 @@ pub struct AuthorityState {
pub(crate) batch_notifier: Arc<authority_notifier::TransactionNotifier>, // TODO: remove pub

pub metrics: Arc<AuthorityMetrics>,

/// In-memory cache of the content from the reconfig_state db table.
reconfig_state_mem: RwLock<ReconfigState>,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -1480,11 +1476,6 @@ impl AuthorityState {
.expect("Notifier cannot start."),
),
metrics,
reconfig_state_mem: RwLock::new(
store
.load_reconfig_state()
.expect("Load reconfig state at initialization cannot fail"),
),
};

prometheus_registry
Expand Down Expand Up @@ -1636,47 +1627,16 @@ impl AuthorityState {
self.committee().clone().deref().clone()
}

pub fn get_reconfig_state_read_lock_guard(
&self,
) -> parking_lot::RwLockReadGuard<ReconfigState> {
self.reconfig_state_mem.read()
}

pub fn get_reconfig_state_write_lock_guard(
&self,
) -> parking_lot::RwLockWriteGuard<ReconfigState> {
self.reconfig_state_mem.write()
}

// This method can only be called from ConsensusAdapter::begin_reconfiguration
pub fn close_user_certs(
&self,
mut lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>,
) {
lock_guard.close_user_certs();
self.database
.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
pub fn close_user_certs(&self, lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>) {
self.database.epoch_tables().close_user_certs(lock_guard)
}

pub async fn close_all_certs(
&self,
mut lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>,
) {
lock_guard.close_all_certs();
self.database
.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
}

pub async fn open_all_certs(
&self,
mut lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>,
lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>,
) {
lock_guard.open_all_certs();
self.database
.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
self.database.epoch_tables().close_all_certs(lock_guard)
}

pub(crate) async fn get_object(
Expand Down Expand Up @@ -2300,6 +2260,8 @@ impl AuthorityState {
);

if !self
.database
.epoch_tables()
.get_reconfig_state_read_lock_guard()
.should_accept_consensus_certs()
{
Expand Down
68 changes: 67 additions & 1 deletion crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use narwhal_executor::ExecutionIndices;
use parking_lot::RwLock;
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
Expand All @@ -16,6 +17,7 @@ use typed_store::rocks::{DBBatch, DBMap, DBOptions, TypedStoreError};
use typed_store::traits::TypedStoreDebug;

use crate::authority::authority_notify_read::{NotifyRead, Registration};
use crate::epoch::reconfiguration::ReconfigState;
use sui_types::message_envelope::{TrustedEnvelope, VerifiedEnvelope};
use typed_store::Map;
use typed_store_derive::DBMapUtils;
Expand All @@ -24,6 +26,8 @@ use typed_store_derive::DBMapUtils;
// TODO: Make a single table (e.g., called `variables`) storing all our lonely variables in one place.
const LAST_CONSENSUS_INDEX_ADDR: u64 = 0;

const RECONFIG_STATE_INDEX: u64 = 0;

#[derive(Serialize, Deserialize, Clone, Debug, Default, PartialEq, Eq)]
pub struct ExecutionIndicesWithHash {
pub index: ExecutionIndices,
Expand All @@ -34,6 +38,8 @@ pub struct AuthorityPerEpochStore<S> {
#[allow(dead_code)]
epoch_id: EpochId,
tables: AuthorityEpochTables<S>,
/// In-memory cache of the content from the reconfig_state db table.
reconfig_state_mem: RwLock<ReconfigState>,
consensus_notify_read: NotifyRead<ConsensusTransactionKey, ()>,
}

Expand Down Expand Up @@ -94,6 +100,9 @@ pub struct AuthorityEpochTables<S> {
/// consensus output index
checkpoint_boundary: DBMap<u64, u64>,

/// This table contains current reconfiguration state for validator for current epoch
reconfig_state: DBMap<u64, ReconfigState>,

/// Validators that have sent EndOfPublish message in this epoch
end_of_publish: DBMap<AuthorityName, ()>,
}
Expand All @@ -117,20 +126,40 @@ where
pub fn path(epoch: EpochId, parent_path: &Path) -> PathBuf {
parent_path.join(format!("epoch_{}", epoch))
}

fn load_reconfig_state(&self) -> SuiResult<ReconfigState> {
let state = self
.reconfig_state
.get(&RECONFIG_STATE_INDEX)?
.unwrap_or_default();
Ok(state)
}
}

impl<S> AuthorityPerEpochStore<S>
where
S: std::fmt::Debug + Serialize + for<'de> Deserialize<'de>,
{
pub fn new(epoch: EpochId, parent_path: &Path, db_options: Option<Options>) -> Self {
let tables = AuthorityEpochTables::open(epoch, parent_path, db_options);
let reconfig_state = tables
.load_reconfig_state()
.expect("Load reconfig state at initialization cannot fail");
Self {
epoch_id: epoch,
tables: AuthorityEpochTables::open(epoch, parent_path, db_options),
tables,
reconfig_state_mem: RwLock::new(reconfig_state),
consensus_notify_read: NotifyRead::new(),
}
}

pub fn store_reconfig_state(&self, new_state: &ReconfigState) -> SuiResult {
self.tables
.reconfig_state
.insert(&RECONFIG_STATE_INDEX, new_state)?;
Ok(())
}

pub fn insert_transaction(
&self,
transaction: VerifiedEnvelope<SenderSignedData, S>,
Expand Down Expand Up @@ -435,6 +464,43 @@ where
)?;
self.finish_consensus_transaction_process_with_batch(batch, key, consensus_index)
}

pub fn get_reconfig_state_read_lock_guard(
&self,
) -> parking_lot::RwLockReadGuard<ReconfigState> {
self.reconfig_state_mem.read()
}

pub fn get_reconfig_state_write_lock_guard(
&self,
) -> parking_lot::RwLockWriteGuard<ReconfigState> {
self.reconfig_state_mem.write()
}

// This method can only be called from ConsensusAdapter::begin_reconfiguration
pub fn close_user_certs(
&self,
mut lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>,
) {
lock_guard.close_user_certs();
self.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
}

pub fn close_all_certs(
&self,
mut lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>,
) {
lock_guard.close_all_certs();
self.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
}

pub fn open_all_certs(&self, mut lock_guard: parking_lot::RwLockWriteGuard<'_, ReconfigState>) {
lock_guard.open_all_certs();
self.store_reconfig_state(&lock_guard)
.expect("Updating reconfig state cannot fail");
}
}

fn transactions_table_default_config() -> DBOptions {
Expand Down
25 changes: 0 additions & 25 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ pub struct CertLockGuard(LockGuard);
const NUM_SHARDS: usize = 4096;
const SHARD_SIZE: usize = 128;

const RECONFIG_STATE_INDEX: u64 = 0;

/// ALL_OBJ_VER determines whether we want to store all past
/// versions of every object in the store. Authority doesn't store
/// them, but other entities such as replicas will.
Expand Down Expand Up @@ -991,29 +989,6 @@ impl<S: Eq + Debug + Serialize + for<'de> Deserialize<'de>> SuiDataStore<S> {
Ok(assigned_seq)
}

pub fn load_reconfig_state(&self) -> SuiResult<ReconfigState> {
let state = match self
.perpetual_tables
.reconfig_state
.get(&RECONFIG_STATE_INDEX)?
{
Some(state) => state,
None => {
let init_state = ReconfigState::default();
self.store_reconfig_state(&init_state)?;
init_state
}
};
Ok(state)
}

pub fn store_reconfig_state(&self, new_state: &ReconfigState) -> SuiResult {
self.perpetual_tables
.reconfig_state
.insert(&RECONFIG_STATE_INDEX, new_state)?;
Ok(())
}

/// This function is called at the end of epoch for each transaction that's
/// executed locally on the validator but didn't make to the last checkpoint.
/// The effects of the execution is reverted here.
Expand Down
2 changes: 0 additions & 2 deletions crates/sui-core/src/authority/authority_store_tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ pub struct AuthorityPerpetualTables<S> {

/// A sequence of batches indexing into the sequence of executed transactions.
pub batches: DBMap<TxSequenceNumber, SignedBatch>,

pub reconfig_state: DBMap<u64, ReconfigState>,
}

impl<S> AuthorityPerpetualTables<S>
Expand Down
11 changes: 8 additions & 3 deletions crates/sui-core/src/consensus_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ impl ConsensusAdapter {
#[allow(clippy::collapsible_if)] // This if can be collapsed but it will be ugly
if self
.authority
.database
.epoch_tables()
.get_reconfig_state_read_lock_guard()
.is_reject_user_certs()
{
Expand Down Expand Up @@ -287,8 +289,9 @@ impl ConsensusAdapter {
/// It transition reconfig state to reject new certificates from user
/// ConsensusAdapter will send EndOfPublish message once pending certificate queue is drained
pub fn close_epoch(self: &Arc<Self>) -> SuiResult {
let epoch_tables = self.authority.database.epoch_tables();
let send_end_of_publish = {
let reconfig_guard = self.authority.get_reconfig_state_write_lock_guard();
let reconfig_guard = epoch_tables.get_reconfig_state_write_lock_guard();
let pending_certificates = self.pending_certificates.lock();
let send_end_of_publish = pending_certificates.is_empty();
self.authority.close_user_certs(reconfig_guard);
Expand All @@ -313,8 +316,9 @@ impl ConsensusAdapter {
self: &Arc<Self>,
transaction: ConsensusTransaction,
) -> SuiResult<JoinHandle<()>> {
let epoch_tables = self.authority.database.epoch_tables();
let _lock = if transaction.is_user_certificate() {
let lock = self.authority.get_reconfig_state_read_lock_guard();
let lock = epoch_tables.get_reconfig_state_read_lock_guard();
if !lock.should_accept_user_certs() {
return Err(SuiError::ValidatorHaltedAtEpochEnd);
}
Expand Down Expand Up @@ -391,7 +395,8 @@ impl ConsensusAdapter {
}
let send_end_of_publish =
if let ConsensusTransactionKind::UserTransaction(certificate) = &transaction.kind {
let reconfig_guard = self.authority.get_reconfig_state_read_lock_guard();
let epoch_tables = self.authority.database.epoch_tables();
let reconfig_guard = epoch_tables.get_reconfig_state_read_lock_guard();
// note that pending_certificates lock is always acquired *after* reconfiguration lock
// acquiring locks in different order might lead to deadlocks
let mut pending_certificates = self.pending_certificates.lock();
Expand Down

0 comments on commit 81778e5

Please sign in to comment.