Skip to content

Commit

Permalink
[authority] Consensus client (MystenLabs#718)
Browse files Browse the repository at this point in the history
Consensus client & networked sequencer
  • Loading branch information
asonnino authored Mar 21, 2022
1 parent 2960c00 commit 86501f4
Show file tree
Hide file tree
Showing 16 changed files with 646 additions and 62 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ members = [
"sui_programmability/adapter",
"sui_programmability/framework",
"sui_types",
"test_utils"
]

[profile.release]
Expand Down
2 changes: 2 additions & 0 deletions sui_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ serde-reflection = "0.3.5"
serde_yaml = "0.8.23"
assert-str = "0.1.0"

test_utils = { path = "../test_utils" }

[[example]]
name = "generate-format"
path = "src/generate_format.rs"
Expand Down
37 changes: 26 additions & 11 deletions sui_core/src/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,18 @@
// Copyright (c) 2022, Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
authority_batch::{BatchSender, BroadcastReceiver, BroadcastSender},
execution_engine,
};
use move_binary_format::CompiledModule;
use move_bytecode_utils::module_cache::ModuleCache;
use move_core_types::{
language_storage::{ModuleId, StructTag},
resolver::{ModuleResolver, ResourceResolver},
};
use move_vm_runtime::native_functions::NativeFunctionTable;
use std::sync::atomic::AtomicUsize;
use std::{
collections::{BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
pin::Pin,
Expand All @@ -29,11 +34,6 @@ use sui_types::{
};
use tracing::*;

use crate::{
authority_batch::{BatchSender, BroadcastReceiver, BroadcastSender},
execution_engine,
};

#[cfg(test)]
#[path = "unit_tests/authority_tests.rs"]
pub mod authority_tests;
Expand Down Expand Up @@ -86,6 +86,9 @@ pub struct AuthorityState {
/// and create batches for this authority.
/// Keep as None if there is no need for this.
batch_channels: Option<(BatchSender, BroadcastSender)>,

/// Ensures there can only be a single consensus client is updating the state.
pub consensus_guardrail: AtomicUsize,
}

/// The authority state encapsulates all state, drives execution, and ensures safety.
Expand Down Expand Up @@ -552,11 +555,14 @@ impl AuthorityState {
pub async fn handle_consensus_certificate(
&self,
certificate: CertifiedTransaction,
last_consensus_index: SequenceNumber,
) -> SuiResult<()> {
// Ensure it is a shared object certificate
if !certificate.transaction.contains_shared_object() {
// TODO: Maybe add a warning here, no respectable authority should
// have sequenced this.
log::debug!(
"Transaction without shared object has been sequenced: {:?}",
certificate.transaction
);
return Ok(());
}

Expand All @@ -574,11 +580,15 @@ impl AuthorityState {
// Check the certificate.
certificate.check(&self.committee)?;

// Persist the certificate. We are about to lock one or more shared object.
// Persist the certificate since we are about to lock one or more shared object.
// We thus need to make sure someone (if not the client) can continue the protocol.
// Also atomically lock the shared objects for this particular transaction.
// Also atomically lock the shared objects for this particular transaction and
// increment the last consensus index. Note that a single process can ever call
// this function and that the last consensus index is also kept in memory. It is
// thus ok to only persist now (despite this function may have returned earlier).
// In the worst case, the synchronizer of the consensus client will catch up.
self._database
.persist_certificate_and_lock_shared_objects(&transaction_digest, certificate)
.persist_certificate_and_lock_shared_objects(certificate, last_consensus_index)
}

pub async fn handle_transaction_info_request(
Expand Down Expand Up @@ -762,7 +772,7 @@ impl AuthorityState {
let native_functions =
sui_framework::natives::all_natives(MOVE_STDLIB_ADDRESS, SUI_FRAMEWORK_ADDRESS);

AuthorityState {
Self {
committee,
name,
secret,
Expand All @@ -771,6 +781,7 @@ impl AuthorityState {
.expect("We defined natives to not fail here"),
_database: store,
batch_channels: None,
consensus_guardrail: AtomicUsize::new(0),
}
}

Expand Down Expand Up @@ -933,6 +944,10 @@ impl AuthorityState {
) -> Result<Option<(ObjectRef, TransactionDigest)>, SuiError> {
self._database.get_latest_parent_entry(object_id)
}

pub fn last_consensus_index(&self) -> SuiResult<SequenceNumber> {
self._database.last_consensus_index()
}
}

impl ModuleResolver for AuthorityState {
Expand Down
35 changes: 31 additions & 4 deletions sui_core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ pub type ReplicaStore = SuiDataStore<true>;

const NUM_SHARDS: usize = 4096;

/// The key where the latest consensus index is stored in the database.
// TODO: Make a single table (e.g., called `variables`) storing all our lonely variables in one place.
const LAST_CONSENSUS_INDEX_ADDR: u64 = 0;

/// ALL_OBJ_VER determines whether we want to store all past
/// versions of every object in the store. Authority doesn't store
/// them, but other entities such as replicas will.
Expand Down Expand Up @@ -94,6 +98,12 @@ pub struct SuiDataStore<const ALL_OBJ_VER: bool> {
/// A sequence of batches indexing into the sequence of executed transactions.
pub batches: DBMap<TxSequenceNumber, SignedBatch>,

/// The following table is used to store a single value (the corresponding key is a constant). The value
/// represents the index of the latest consensus message this authority processed. This field is written
/// by a single process acting as consensus (light) client. It is used to ensure the authority processes
/// every message output by consensus (and in the right order).
last_consensus_index: DBMap<u64, SequenceNumber>,

/// The next available sequence number to use in the `executed sequence` table.
pub next_sequence_number: AtomicU64,
}
Expand Down Expand Up @@ -174,6 +184,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
("schedule", &options),
("executed_sequence", &options),
("batches", &options),
("last_consensus_index", &options),
],
)
.expect("Cannot open DB.");
Expand Down Expand Up @@ -205,6 +216,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
sequenced,
schedule,
batches,
last_consensus_index,
) = reopen! (
&db,
"objects";<ObjectID, Object>,
Expand All @@ -217,7 +229,8 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
"signed_effects";<TransactionDigest, SignedTransactionEffects>,
"sequenced";<(TransactionDigest, ObjectID), SequenceNumber>,
"schedule";<ObjectID, SequenceNumber>,
"batches";<TxSequenceNumber, SignedBatch>
"batches";<TxSequenceNumber, SignedBatch>,
"last_consensus_index";<u64, SequenceNumber>
);
AuthorityStore {
objects,
Expand All @@ -236,6 +249,7 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
.collect(),
executed_sequence,
batches,
last_consensus_index,
next_sequence_number,
}
}
Expand Down Expand Up @@ -800,27 +814,32 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {
Ok(write_batch)
}

/// Lock a sequence number for the shared objects of the input transaction.
/// Lock a sequence number for the shared objects of the input transaction. Also update the
/// last consensus index.
pub fn persist_certificate_and_lock_shared_objects(
&self,
transaction_digest: &TransactionDigest,
certificate: CertifiedTransaction,
global_certificate_index: SequenceNumber,
) -> Result<(), SuiError> {
let transaction_digest = certificate.transaction.digest();
let certificate_to_write = std::iter::once((transaction_digest, &certificate));

let mut sequenced_to_write = Vec::new();
let mut schedule_to_write = Vec::new();
for id in certificate.transaction.shared_input_objects() {
let version = self.schedule.get(id)?.unwrap_or_default();
sequenced_to_write.push(((*transaction_digest, *id), version));
sequenced_to_write.push(((transaction_digest, *id), version));
let next_version = version.increment();
schedule_to_write.push((*id, next_version));
}

let index_to_write = std::iter::once((LAST_CONSENSUS_INDEX_ADDR, global_certificate_index));

let mut write_batch = self.sequenced.batch();
write_batch = write_batch.insert_batch(&self.certificates, certificate_to_write)?;
write_batch = write_batch.insert_batch(&self.sequenced, sequenced_to_write)?;
write_batch = write_batch.insert_batch(&self.schedule, schedule_to_write)?;
write_batch = write_batch.insert_batch(&self.last_consensus_index, index_to_write)?;
write_batch.write().map_err(SuiError::from)
}

Expand Down Expand Up @@ -925,6 +944,14 @@ impl<const ALL_OBJ_VER: bool> SuiDataStore<ALL_OBJ_VER> {

Ok((batches, transactions))
}

/// Return the latest consensus index. It is used to bootstrap the consensus client.
pub fn last_consensus_index(&self) -> SuiResult<SequenceNumber> {
self.last_consensus_index
.get(&LAST_CONSENSUS_INDEX_ADDR)
.map(|x| x.unwrap_or_default())
.map_err(SuiError::from)
}
}

impl<const ALL_OBJ_VER: bool> BackingPackageStore for SuiDataStore<ALL_OBJ_VER> {
Expand Down
Loading

0 comments on commit 86501f4

Please sign in to comment.