Skip to content

Commit

Permalink
Out of order batch (MystenLabs#1853)
Browse files Browse the repository at this point in the history
* Out of order batch test

* Fix notifier out of order bug

* Test stream is empty before tx0 is sent

* Test order

Co-authored-by: George Danezis <[email protected]>
  • Loading branch information
lxfind and George Danezis authored May 7, 2022
1 parent 402ed43 commit 60723c9
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 8 deletions.
45 changes: 37 additions & 8 deletions sui_core/src/authority/authority_notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,29 @@
// SPDX-License-Identifier: Apache-2.0
use super::*;

use std::sync::atomic::{AtomicBool, AtomicU64};
use std::{
collections::BTreeSet,
sync::atomic::{AtomicBool, AtomicU64},
};
use sui_types::batch::TxSequenceNumber;

use tokio::sync::Notify;
use typed_store::traits::Map;

use parking_lot::Mutex;

pub struct TransactionNotifier {
state: Arc<AuthorityStore>,
low_watermark: AtomicU64,
high_watermark: AtomicU64,
notify: Notify,
has_stream: AtomicBool,
is_closed: AtomicBool,
inner: Mutex<LockedNotifier>,
}

struct LockedNotifier {
high_watermark: u64,
live_tickets: BTreeSet<TxSequenceNumber>,
}

impl TransactionNotifier {
Expand All @@ -24,10 +34,16 @@ impl TransactionNotifier {
Ok(TransactionNotifier {
state,
low_watermark: AtomicU64::new(seq),
high_watermark: AtomicU64::new(seq),
notify: Notify::new(),
has_stream: AtomicBool::new(false),
is_closed: AtomicBool::new(false),

// Keep a set of the tickets that are still being processed
// This is the size of the number of concurrent processes.
inner: Mutex::new(LockedNotifier {
high_watermark: seq,
live_tickets: BTreeSet::new(),
}),
})
}

Expand All @@ -41,9 +57,11 @@ impl TransactionNotifier {
return Err(SuiError::ClosedNotifierError);
}

let seq = self
.high_watermark
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let mut inner = self.inner.lock();
// Insert the ticket into the set of live tickets.
let seq = inner.high_watermark;
inner.high_watermark += 1;
inner.live_tickets.insert(seq);
Ok(TransactionNotifierTicket {
transaction_notifier: self.clone(),
seq,
Expand Down Expand Up @@ -101,7 +119,7 @@ impl TransactionNotifier {
.iter()
.skip_to(&next_seq)
{
// ... contued here with take_while. And expand the buffer with the new items.
// ... continued here with take_while. And expand the buffer with the new items.
temp_buffer.extend(
iter.take_while(|(tx_seq, _tx_digest)| *tx_seq < last_safe)
.map(|(tx_seq, _tx_digest)| (tx_seq, _tx_digest)),
Expand Down Expand Up @@ -172,9 +190,20 @@ impl TransactionNotifierTicket {
/// associated with this sequence number,
impl Drop for TransactionNotifierTicket {
fn drop(&mut self) {
let mut inner = self.transaction_notifier.inner.lock();
inner.live_tickets.remove(&self.seq);

// The new low watermark is either the lowest outstanding ticket
// or the high watermark.
let new_low_watermark = *inner
.live_tickets
.iter()
.next()
.unwrap_or(&inner.high_watermark);

self.transaction_notifier
.low_watermark
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
.store(new_low_watermark, std::sync::atomic::Ordering::SeqCst);
self.transaction_notifier.notify.notify_one();
}
}
Expand Down
77 changes: 77 additions & 0 deletions sui_core/src/unit_tests/batch_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,83 @@ async fn test_batch_manager_out_of_order() {
_join.await.expect("No errors in task").expect("ok");
}

#[tokio::test]
async fn test_batch_manager_drop_out_of_order() {
// Create a random directory to store the DB
let dir = env::temp_dir();
let path = dir.join(format!("DB_{:?}", ObjectID::random()));
fs::create_dir(&path).unwrap();

// Create an authority
let mut opts = rocksdb::Options::default();
opts.set_max_open_files(max_files_authority_tests());
let store = Arc::new(AuthorityStore::open(&path, Some(opts)));

// Make a test key pair
let seed = [1u8; 32];
let (committee, _, authority_key) =
init_state_parameters_from_rng(&mut StdRng::from_seed(seed));
let authority_state = Arc::new(init_state(committee, authority_key, store.clone()).await);

let inner_state = authority_state.clone();
let _join = tokio::task::spawn(async move {
inner_state
// Make sure that a batch will not be formed due to time, but will be formed
// when there are 4 transactions.
.run_batch_service(4, Duration::from_millis(10000))
.await
});
// Send transactions out of order
let mut rx = authority_state.subscribe_batch();

let t0 = authority_state.batch_notifier.ticket().expect("ok");
let t1 = authority_state.batch_notifier.ticket().expect("ok");
let t2 = authority_state.batch_notifier.ticket().expect("ok");
let t3 = authority_state.batch_notifier.ticket().expect("ok");

store.side_sequence(t1.seq(), &TransactionDigest::random());
drop(t1);
store.side_sequence(t3.seq(), &TransactionDigest::random());
drop(t3);
store.side_sequence(t2.seq(), &TransactionDigest::random());
drop(t2);

// Give a chance to send signals
tokio::task::yield_now().await;
// Still nothing has arrived out of order
assert_eq!(rx.len(), 0);

store.side_sequence(t0.seq(), &TransactionDigest::random());
drop(t0);

// Get transactions in order then batch.
assert!(matches!(
rx.recv().await.unwrap(),
UpdateItem::Transaction((0, _))
));

assert!(matches!(
rx.recv().await.unwrap(),
UpdateItem::Transaction((1, _))
));
assert!(matches!(
rx.recv().await.unwrap(),
UpdateItem::Transaction((2, _))
));
assert!(matches!(
rx.recv().await.unwrap(),
UpdateItem::Transaction((3, _))
));

// Then we (eventually) get a batch
assert!(matches!(rx.recv().await.unwrap(), UpdateItem::Batch(_)));

// When we close the sending channel we also also end the service task
authority_state.batch_notifier.close();

_join.await.expect("No errors in task").expect("ok");
}

#[tokio::test]
async fn test_handle_move_order_with_batch() {
let (sender, sender_key) = get_key_pair();
Expand Down

0 comments on commit 60723c9

Please sign in to comment.