Skip to content

Commit

Permalink
Fix initialization of next_shared_object_versions (MystenLabs#7044)
Browse files Browse the repository at this point in the history
Previously, we attempted to initialize `next_shared_object_versions` the
first time we sequence a cert that touches a given object.
Unfortunately, this will not work if a transaction touching that object
has already been executed via checkpoint sync. See the referenced bug
below for details.

To fix this, we now initializate `next_shared_object_versions` for a
given object _before_ we set the shared locks for that object regardless
of whether we are setting the locks in the consensus task or from
checkpoint sync. This ensures that, if the object already existed (i.e.
was created in a prior epoch) we are guaranteed to read the latest
version of the object from the previous checkpoint.

However, because we are now initializing `next_shared_object_versions`
from multiple tasks, we need to account for races. Optimistic
transactions are used to handle races. When there is a race, one task
will succeed in initializing `next_shared_object_versions`, and then
other tasks will have to retry, will notice that
`next_shared_object_versions` has been initialized already, and will not
attempt to write to it again.

Also expose SetSnapshot function of rocksdb transactions.

Fixes MystenLabs#5754
  • Loading branch information
mystenmark authored Dec 29, 2022
1 parent e075c9f commit f77f58f
Show file tree
Hide file tree
Showing 8 changed files with 416 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

188 changes: 156 additions & 32 deletions crates/sui-core/src/authority/authority_per_epoch_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use parking_lot::RwLock;
use parking_lot::{Mutex, RwLockReadGuard};
use rocksdb::Options;
use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::iter;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -46,7 +46,7 @@ use sui_types::message_envelope::TrustedEnvelope;
use sui_types::messages_checkpoint::{CheckpointSequenceNumber, CheckpointSignatureMessage};
use sui_types::storage::{transaction_input_object_keys, ObjectKey, ParentSync};
use sui_types::temporary_store::InnerTemporaryStore;
use typed_store::Map;
use typed_store::{retry_transaction_forever, Map};
use typed_store_derive::DBMapUtils;

/// The key where the latest consensus index is stored in the database.
Expand All @@ -66,6 +66,7 @@ pub struct ExecutionIndicesWithHash {
pub struct AuthorityPerEpochStore {
committee: Committee,
tables: AuthorityEpochTables,

/// In-memory cache of the content from the reconfig_state db table.
reconfig_state_mem: RwLock<ReconfigState>,
consensus_notify_read: NotifyRead<ConsensusTransactionKey, ()>,
Expand Down Expand Up @@ -186,7 +187,7 @@ pub struct AuthorityEpochTables {

impl AuthorityEpochTables {
pub fn open(epoch: EpochId, parent_path: &Path, db_options: Option<Options>) -> Self {
Self::open_tables_read_write(Self::path(epoch, parent_path), db_options, None)
Self::open_tables_transactional(Self::path(epoch, parent_path), db_options, None)
}

pub fn open_readonly(epoch: EpochId, parent_path: &Path) -> AuthorityEpochTablesReadOnly {
Expand Down Expand Up @@ -427,14 +428,146 @@ impl AuthorityPerEpochStore {
Ok(())
}

pub fn set_assigned_shared_object_versions(
// For each id in objects_to_init, return the next version for that id as recorded in the
// next_shared_object_versions table.
//
// If any ids are missing, then we need to initialize the table. We first check if a previous
// version of that object has been written. If so, then the object was written in a previous
// epoch, and we initialize next_shared_object_versions to that value. If no version of the
// object has yet been written, we initialize the object to the initial version recorded in the
// certificate (which is a function of the lamport version computation of the transaction that
// created the shared object originally - which transaction may not yet have been execugted on
// this node).
//
// Because all paths that assign shared locks for a shared object transaction call this
// function, it is impossible for parent_sync to be updated before this function completes
// successfully for each affected object id.
async fn get_or_init_next_object_versions(
&self,
transaction_digest: &TransactionDigest,
certificate: &VerifiedCertificate,
objects_to_init: impl Iterator<Item = ObjectID> + Clone,
parent_sync_store: impl ParentSync,
) -> SuiResult<Vec<SequenceNumber>> {
// Since this can be called from consensus task, we must retry forever - the only other
// option is to panic. It is extremely unlikely that more than 2 retries will be needed, as
// the only two writers are the consensus task and checkpoint execution.
retry_transaction_forever!({
// This code may still be correct without using a transaction snapshot, but I couldn't
// convince myself of that.
let db_transaction = self
.tables
.next_shared_object_versions
.transaction_with_snapshot()?;

let next_versions = db_transaction.multi_get(
&self.tables.next_shared_object_versions,
objects_to_init.clone(),
)?;

let uninitialized_objects: Vec<ObjectID> = next_versions
.iter()
.zip(objects_to_init.clone())
.filter_map(|(next_version, id)| match next_version {
None => Some(id),
Some(_) => None,
})
.collect();

// The common case is that there are no uninitialized versions - this early return will
// happen every time except the first time an object is used in an epoch.
if uninitialized_objects.is_empty() {
// unwrap ok - we already verified that next_versions is not missing any keys.
return Ok(next_versions.into_iter().map(|v| v.unwrap()).collect());
}

// if the object has never been used before (in any epoch) the initial version comes
// from the cert.
let initial_versions: HashMap<_, _> = certificate
.shared_input_objects()
.map(|(id, v)| (*id, *v))
.collect();

let mut versions_to_write = Vec::new();
for id in &uninitialized_objects {
// Note: we don't actually need to read from the transaction here, as no writer
// can update parent_sync_store until after get_or_init_next_object_versions
// completes.
versions_to_write.push(
match parent_sync_store.get_latest_parent_entry_ref(*id)? {
Some(objref) => (*id, objref.1),
None => (
*id,
*initial_versions
.get(id)
.expect("object cannot be missing from shared_input_objects"),
),
},
);
}

let versions_to_write = uninitialized_objects.iter().map(|id| {
// Note: we don't actually need to read from the transaction here, as no writer
// can update parent_sync_store until after get_or_init_next_object_versions
// completes.
match parent_sync_store
.get_latest_parent_entry_ref(*id)
.expect("read cannot fail")
{
Some(objref) => (*id, objref.1),
None => (
*id,
*initial_versions
.get(id)
.expect("object cannot be missing from shared_input_objects"),
),
}
});

debug!(
?versions_to_write,
"initializing next_shared_object_versions"
);
db_transaction
.insert_batch(&self.tables.next_shared_object_versions, versions_to_write)?
.commit()
})?;

// this case only occurs when there were uninitialized versions, which is rare, so its much
// simpler to just re-read all the ids here.
let next_versions = self
.tables
.next_shared_object_versions
.multi_get(objects_to_init)?
.into_iter()
// unwrap ok - we just finished initializing all versions.
.map(|v| v.unwrap())
.collect();

Ok(next_versions)
}

pub async fn set_assigned_shared_object_versions(
&self,
certificate: &VerifiedCertificate,
assigned_versions: &Vec<(ObjectID, SequenceNumber)>,
parent_sync_store: impl ParentSync,
) -> SuiResult {
let tx_digest = certificate.digest();

debug!(
?tx_digest,
?assigned_versions,
"set_assigned_shared_object_versions"
);
self.get_or_init_next_object_versions(
certificate,
assigned_versions.iter().map(|(id, _)| *id),
parent_sync_store,
)
.await?;
self.tables
.assigned_shared_object_versions
.insert(transaction_digest, assigned_versions)?;
.insert(tx_digest, assigned_versions)?;
Ok(())
}

Expand Down Expand Up @@ -602,34 +735,18 @@ impl AuthorityPerEpochStore {
let transaction_digest = *certificate.digest();

// Make an iterator to update the locks of the transaction's shared objects.
let ids = certificate.shared_input_objects().map(|(id, _)| id);
let versions = self.multi_get_next_shared_object_versions(ids)?;
let ids: Vec<_> = certificate
.shared_input_objects()
.map(|(id, _)| *id)
.collect();

let versions = self
.get_or_init_next_object_versions(certificate, ids.iter().copied(), &parent_sync_store)
.await?;

let mut input_object_keys = transaction_input_object_keys(certificate)?;
let mut assigned_versions = Vec::new();
for ((id, initial_shared_version), v) in
certificate.shared_input_objects().zip(versions.iter())
{
// On epoch changes, the `next_shared_object_versions` table will be empty, and we rely on
// parent sync to recover the current version of the object. However, if an object was
// previously aware of the object as owned, and it was upgraded to shared, the version
// in parent sync may be out of date, causing a fork. In that case, we know that the
// `initial_shared_version` will be greater than the version in parent sync, and we can
// use that. It is the version that the object was shared at, and can be trusted
// because it has been checked and signed by a quorum of other validators when creating
// the certificate.
let version = match v {
Some(v) => *v,
None => *initial_shared_version.max(
&parent_sync_store
// TODO: if we use an eventually consistent object store in the future,
// we must make this read strongly consistent somehow!
.get_latest_parent_entry_ref(*id)?
.map(|objref| objref.1)
.unwrap_or_default(),
),
};

for ((id, _), version) in certificate.shared_input_objects().zip(versions.into_iter()) {
assigned_versions.push((*id, version));
input_object_keys.push(ObjectKey(*id, version));
}
Expand Down Expand Up @@ -706,9 +823,16 @@ impl AuthorityPerEpochStore {
// TODO: clear the shared object locks per transaction after ensuring consistency.
let mut write_batch = self.tables.assigned_shared_object_versions.batch();

let tx_digest = *certificate.digest();

debug!(
?tx_digest,
?assigned_versions,
"finish_assign_shared_object_versions"
);
write_batch = write_batch.insert_batch(
&self.tables.assigned_shared_object_versions,
iter::once((certificate.digest(), assigned_versions)),
iter::once((tx_digest, assigned_versions)),
)?;

write_batch =
Expand Down
26 changes: 15 additions & 11 deletions crates/sui-core/src/authority/authority_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,10 @@ impl AuthorityStore {
// Insert each output object into the stores
write_batch = write_batch.insert_batch(
&self.perpetual_tables.objects,
written
.iter()
.map(|(_, (obj_ref, new_object, _kind))| (ObjectKey::from(obj_ref), new_object)),
written.iter().map(|(_, (obj_ref, new_object, _kind))| {
trace!(tx_digest=?transaction_digest, ?obj_ref, "writing object");
(ObjectKey::from(obj_ref), new_object)
}),
)?;

let new_locks_to_init: Vec<_> = written
Expand Down Expand Up @@ -1293,14 +1294,17 @@ impl AuthorityStore {
.epoch_store()
.acquire_tx_lock(certificate.digest())
.await;
self.epoch_store().set_assigned_shared_object_versions(
certificate.digest(),
&effects
.shared_objects
.iter()
.map(|(id, version, _)| (*id, *version))
.collect(),
)
self.epoch_store()
.set_assigned_shared_object_versions(
certificate,
&effects
.shared_objects
.iter()
.map(|(id, version, _)| (*id, *version))
.collect(),
self,
)
.await
}

pub fn get_transaction(
Expand Down
2 changes: 2 additions & 0 deletions crates/sui-core/tests/staged/sui.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -551,4 +551,6 @@ TypedStoreError:
CrossDBBatch: UNIT
4:
MetricsReporting: UNIT
5:
TransactionWriteConflict: UNIT

1 change: 1 addition & 0 deletions crates/typed-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ once_cell = "1.13.0"
proc-macro2 = "1.0.47"
quote = "1.0.23"
rstest = "0.16.0"
rand = "0.8.5"
syn = { version = "1.0.104", features = ["derive"] }
typed-store-derive = {path = "../typed-store-derive"}
2 changes: 2 additions & 0 deletions crates/typed-store/src/rocks/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub enum TypedStoreError {
CrossDBBatch,
#[error("Metric reporting thread failed with error")]
MetricsReporting,
#[error("Conflicting write detected")]
TransactionWriteConflict,
}

#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Hash, Debug, Error)]
Expand Down
Loading

0 comments on commit f77f58f

Please sign in to comment.