Skip to content

Commit

Permalink
Move write_transaction_outputs to struct impl (MystenLabs#16956)
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark authored Mar 29, 2024
1 parent 9e79b6f commit d0ef3f8
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 91 deletions.
2 changes: 1 addition & 1 deletion crates/sui-core/src/execution_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sui_types::{
};
use tracing::instrument;

pub(crate) mod cached_version_map;
pub(crate) mod cache_types;
pub mod passthrough_cache;
pub mod writeback_cache;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,42 @@ impl<V> CachedVersionMap<V> {
}
}

// an iterator adapter that asserts that the wrapped iterator yields elements in order
pub(super) struct AssertOrdered<I: Iterator> {
iter: I,
last: Option<I::Item>,
}

impl<I: Iterator> AssertOrdered<I> {
fn new(iter: I) -> Self {
Self { iter, last: None }
}
}

impl<I: IntoIterator> From<I> for AssertOrdered<I::IntoIter> {
fn from(iter: I) -> Self {
Self::new(iter.into_iter())
}
}

impl<I: Iterator> Iterator for AssertOrdered<I>
where
I::Item: Ord + Copy,
{
type Item = I::Item;

fn next(&mut self) -> Option<Self::Item> {
let next = self.iter.next();
if let Some(next) = next {
if let Some(last) = &self.last {
assert!(*last < next, "iterator must yield elements in order");
}
self.last = Some(next);
}
next
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -245,4 +281,18 @@ mod tests {
let map: CachedVersionMap<&str> = CachedVersionMap::default();
assert!(map.get_highest().is_none());
}

#[test]
fn test_assert_order() {
let iter = AssertOrdered::from(1..=10);
let result: Vec<_> = iter.collect();
assert_eq!(result, (1..=10).collect::<Vec<_>>());
}

#[test]
#[should_panic(expected = "iterator must yield elements in order")]
fn test_assert_order_panics() {
let iter = AssertOrdered::from(vec![1, 3, 2]);
let _ = iter.collect::<Vec<_>>();
}
}
189 changes: 99 additions & 90 deletions crates/sui-core/src/execution_cache/writeback_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,11 @@ use sui_types::object::Object;
use sui_types::storage::{MarkerValue, ObjectKey, ObjectOrTombstone, ObjectStore, PackageObject};
use sui_types::sui_system_state::{get_sui_system_state, SuiSystemState};
use sui_types::transaction::{VerifiedSignedTransaction, VerifiedTransaction};
use tracing::{info, instrument};
use tracing::{debug, info, instrument, trace};

use super::ExecutionCacheAPI;
use super::{
cached_version_map::CachedVersionMap, implement_passthrough_traits, CheckpointCache,
cache_types::CachedVersionMap, implement_passthrough_traits, CheckpointCache,
ExecutionCacheCommit, ExecutionCacheMetrics, ExecutionCacheRead, ExecutionCacheReconfigAPI,
ExecutionCacheWrite, NotifyReadWrapper, StateSyncAPI,
};
Expand Down Expand Up @@ -475,7 +475,103 @@ impl WritebackCache {
)
}

#[instrument(level = "debug", skip_all)]
async fn write_transaction_outputs(
&self,
epoch_id: EpochId,
tx_outputs: Arc<TransactionOutputs>,
) -> SuiResult {
trace!(digest = ?tx_outputs.transaction.digest(), "writing transaction outputs to cache");

let TransactionOutputs {
transaction,
effects,
markers,
written,
deleted,
wrapped,
events,
..
} = &*tx_outputs;

// Deletions and wraps must be written first. The reason is that one of the deletes
// may be a child object, and if we write the parent object first, a reader may or may
// not see the previous version of the child object, instead of the deleted/wrapped
// tombstone, which would cause an execution fork
for ObjectKey(id, version) in deleted.iter() {
self.write_object_entry(id, *version, ObjectEntry::Deleted)
.await;
}

for ObjectKey(id, version) in wrapped.iter() {
self.write_object_entry(id, *version, ObjectEntry::Wrapped)
.await;
}

// Update all markers
for (object_key, marker_value) in markers.iter() {
self.write_marker_value(epoch_id, object_key, *marker_value)
.await;
}

// Write children before parents to ensure that readers do not observe a parent object
// before its most recent children are visible.
for (object_id, object) in written.iter() {
if object.is_child_object() {
self.write_object_entry(object_id, object.version(), object.clone().into())
.await;
}
}
for (object_id, object) in written.iter() {
if !object.is_child_object() {
self.write_object_entry(object_id, object.version(), object.clone().into())
.await;
if object.is_package() {
debug!("caching package: {:?}", object.compute_object_reference());
self.packages
.insert(*object_id, PackageObject::new(object.clone()));
}
}
}

let tx_digest = *transaction.digest();
let effects_digest = effects.digest();

self.dirty
.transaction_effects
.insert(effects_digest, effects.clone());

match self.dirty.transaction_events.entry(events.digest()) {
DashMapEntry::Occupied(mut occupied) => {
occupied.get_mut().0.insert(tx_digest);
}
DashMapEntry::Vacant(entry) => {
let mut txns = BTreeSet::new();
txns.insert(tx_digest);
entry.insert((txns, events.clone()));
}
}

self.dirty
.executed_effects_digests
.insert(tx_digest, effects_digest);

self.dirty
.pending_transaction_writes
.insert(tx_digest, tx_outputs);

self.executed_effects_digests_notify_read
.notify(&tx_digest, &effects_digest);

self.metrics
.pending_notify_read
.set(self.executed_effects_digests_notify_read.num_pending() as i64);

Ok(())
}

// Commits dirty data for the given TransactionDigest to the db.
#[instrument(level = "debug", skip(self))]
async fn commit_transaction_outputs(
&self,
epoch: EpochId,
Expand Down Expand Up @@ -1064,99 +1160,12 @@ impl ExecutionCacheWrite for WritebackCache {
todo!()
}

#[instrument(level = "debug", skip_all)]
fn write_transaction_outputs(
&self,
epoch_id: EpochId,
tx_outputs: Arc<TransactionOutputs>,
) -> BoxFuture<'_, SuiResult> {
async move {
let TransactionOutputs {
transaction,
effects,
markers,
written,
deleted,
wrapped,
events,
..
} = &*tx_outputs;

// Deletions and wraps must be written first. The reason is that one of the deletes
// may be a child object, and if we write the parent object first, a reader may or may
// not see the previous version of the child object, instead of the deleted/wrapped
// tombstone, which would cause an execution fork
for ObjectKey(id, version) in deleted.iter() {
self.write_object_entry(id, *version, ObjectEntry::Deleted)
.await;
}

for ObjectKey(id, version) in wrapped.iter() {
self.write_object_entry(id, *version, ObjectEntry::Wrapped)
.await;
}

// Update all markers
for (object_key, marker_value) in markers.iter() {
self.write_marker_value(epoch_id, object_key, *marker_value)
.await;
}

// Write children before parents to ensure that readers do not observe a parent object
// before its most recent children are visible.
for (object_id, object) in written.iter() {
if object.is_child_object() {
self.write_object_entry(object_id, object.version(), object.clone().into())
.await;
}
}
for (object_id, object) in written.iter() {
if !object.is_child_object() {
self.write_object_entry(object_id, object.version(), object.clone().into())
.await;
if object.is_package() {
self.packages
.insert(*object_id, PackageObject::new(object.clone()));
}
}
}

let tx_digest = *transaction.digest();
let effects_digest = effects.digest();

self.dirty
.transaction_effects
.insert(effects_digest, effects.clone());

match self.dirty.transaction_events.entry(events.digest()) {
DashMapEntry::Occupied(mut occupied) => {
occupied.get_mut().0.insert(tx_digest);
}
DashMapEntry::Vacant(entry) => {
let mut txns = BTreeSet::new();
txns.insert(tx_digest);
entry.insert((txns, events.clone()));
}
}

self.dirty
.executed_effects_digests
.insert(tx_digest, effects_digest);

self.dirty
.pending_transaction_writes
.insert(tx_digest, tx_outputs);

self.executed_effects_digests_notify_read
.notify(&tx_digest, &effects_digest);

self.metrics
.pending_notify_read
.set(self.executed_effects_digests_notify_read.num_pending() as i64);

Ok(())
}
.boxed()
WritebackCache::write_transaction_outputs(self, epoch_id, tx_outputs).boxed()
}
}

Expand Down

0 comments on commit d0ef3f8

Please sign in to comment.