Skip to content

Commit

Permalink
[authority] Batch crash robustness v3 (MystenLabs#835)
Browse files Browse the repository at this point in the history
* Added ticketing system
* Notifier based sequencer

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
gdanezis and George Danezis authored Mar 21, 2022
1 parent 6f8d8bb commit c0bc30a
Show file tree
Hide file tree
Showing 9 changed files with 710 additions and 459 deletions.
108 changes: 55 additions & 53 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
authority_batch::{BatchSender, BroadcastReceiver, BroadcastSender},
authority_batch::{BroadcastReceiver, BroadcastSender},
execution_engine,
};
use move_binary_format::CompiledModule;
Expand Down Expand Up @@ -52,9 +52,12 @@ pub use temporary_store::AuthorityTemporaryStore;
mod authority_store;
pub use authority_store::AuthorityStore;

pub mod authority_notifier;

// based on https://github.com/diem/move/blob/62d48ce0d8f439faa83d05a4f5cd568d4bfcb325/language/tools/move-cli/src/sandbox/utils/mod.rs#L50
const MAX_GAS_BUDGET: u64 = 18446744073709551615 / 1000 - 1;
const MAX_ITEMS_LIMIT: u64 = 10_000;
const BROADCAST_CAPACITY: usize = 10_000;

/// a Trait object for `signature::Signer` that is:
/// - Pin, i.e. confined to one place in memory (we don't want to copy private keys).
Expand All @@ -79,13 +82,16 @@ pub struct AuthorityState {
move_vm: Arc<adapter::MoveVM>,

/// The database
_database: Arc<AuthorityStore>,
pub(crate) _database: Arc<AuthorityStore>, // TODO: remove pub

// Structures needed for handling batching and notifications.
/// The sender to notify of new transactions
/// and create batches for this authority.
/// Keep as None if there is no need for this.
batch_channels: Option<(BatchSender, BroadcastSender)>,
pub(crate) batch_channels: BroadcastSender, // TODO: remove pub

// The Transaction notifier ticketing engine.
pub(crate) batch_notifier: Arc<authority_notifier::TransactionNotifier>, // TODO: remove pub

/// Ensures there can only be a single consensus client is updating the state.
pub consensus_guardrail: AtomicUsize,
Expand All @@ -98,28 +104,9 @@ pub struct AuthorityState {
///
/// Repeating valid commands should produce no changes and return no error.
impl AuthorityState {
/// Set a listener for transaction certificate updates. Returns an
/// error if a listener is already registered.
pub fn set_batch_sender(
&mut self,
batch_sender: BatchSender,
broadcast_sender: BroadcastSender,
) -> SuiResult {
if self.batch_channels.is_some() {
return Err(SuiError::AuthorityUpdateFailure);
}
self.batch_channels = Some((batch_sender, broadcast_sender));
Ok(())
}

/// Get a broadcast receiver for updates
pub fn subscribe(&self) -> Result<BroadcastReceiver, SuiError> {
self.batch_channels
.as_ref()
.map(|(_, tx)| tx.subscribe())
.ok_or_else(|| SuiError::GenericAuthorityError {
error: "No broadcast subscriptions allowed for this authority.".to_string(),
})
pub fn subscribe_batch(&self) -> BroadcastReceiver {
self.batch_channels.subscribe()
}

/// The logic to check one object against a reference, and return the object if all is well
Expand Down Expand Up @@ -537,17 +524,10 @@ impl AuthorityState {
&gas_object_id,
);
// Update the database in an atomic manner
let (seq, resp) = self
.update_state(temporary_store, certificate, signed_effects)
.instrument(tracing::debug_span!("db_update_state"))
.await?; // Returns the OrderInfoResponse

// If there is a notifier registered, notify:
if let Some((sender, _)) = &self.batch_channels {
sender.send_item(seq, transaction_digest).await?;
}

Ok(resp)
self.update_state(temporary_store, certificate, signed_effects)
.instrument(tracing::debug_span!("db_update_state"))
.await // Returns the OrderInfoResponse
}

/// Process certificates coming from the consensus. It is crucial that this function is only
Expand Down Expand Up @@ -752,14 +732,22 @@ impl AuthorityState {
genesis_packages: Vec<Vec<CompiledModule>>,
genesis_ctx: &mut TxContext,
) -> Self {
let state = AuthorityState::new_without_genesis(committee, name, secret, store).await;
let state =
AuthorityState::new_without_genesis(committee, name, secret, store.clone()).await;

for genesis_modules in genesis_packages {
state
.store_package_and_init_modules_for_genesis(genesis_ctx, genesis_modules)
.await
.expect("We expect publishing the Genesis packages to not fail");
// Only initialize an empty database.
if store
.database_is_empty()
.expect("Database read should not fail.")
{
for genesis_modules in genesis_packages {
state
.store_package_and_init_modules_for_genesis(genesis_ctx, genesis_modules)
.await
.expect("We expect publishing the Genesis packages to not fail");
}
}

state
}

Expand All @@ -769,31 +757,37 @@ impl AuthorityState {
secret: StableSyncAuthoritySigner,
store: Arc<AuthorityStore>,
) -> Self {
let (tx, _rx) = tokio::sync::broadcast::channel(BROADCAST_CAPACITY);
let native_functions =
sui_framework::natives::all_natives(MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS);

Self {
let mut state = AuthorityState {
committee,
name,
secret,
_native_functions: native_functions.clone(),
move_vm: adapter::new_move_vm(native_functions)
.expect("We defined natives to not fail here"),
_database: store,
batch_channels: None,
_database: store.clone(),
batch_channels: tx,
batch_notifier: Arc::new(
authority_notifier::TransactionNotifier::new(store)
.expect("Notifier cannot start."),
),
consensus_guardrail: AtomicUsize::new(0),
}
};

state
.init_batches_from_database()
.expect("Init batches failed!");

state
}

pub(crate) fn db(&self) -> Arc<AuthorityStore> {
self._database.clone()
}

#[cfg(test)]
pub(crate) fn batch_sender(&self) -> &BatchSender {
&self.batch_channels.as_ref().unwrap().0
}

async fn get_object(&self, object_id: &ObjectID) -> Result<Option<Object>, SuiError> {
self._database.get_object(object_id)
}
Expand Down Expand Up @@ -852,7 +846,7 @@ impl AuthorityState {
) {
return Err(*error);
};
self._database
self.db()
.update_objects_state_for_genesis(temporary_store, ctx.digest())
}

Expand Down Expand Up @@ -885,14 +879,22 @@ impl AuthorityState {
.set_transaction_lock(mutable_input_objects, signed_transaction)
}

/// Update state and signals that a new transactions has been processed
/// to the batch maker service.
async fn update_state(
&self,
temporary_store: AuthorityTemporaryStore<AuthorityStore>,
certificate: CertifiedTransaction,
signed_effects: SignedTransactionEffects,
) -> Result<(u64, TransactionInfoResponse), SuiError> {
self._database
.update_state(temporary_store, certificate, signed_effects)
) -> Result<TransactionInfoResponse, SuiError> {
let notifier_ticket = self.batch_notifier.ticket()?;
self._database.update_state(
temporary_store,
certificate,
signed_effects,
Some(notifier_ticket.seq()),
)
// implicitely we drop the ticket here and that notifes the batch manager
}

/// Get a read reference to an object/seq lock
Expand Down
Loading

0 comments on commit c0bc30a

Please sign in to comment.