Skip to content

Commit

Permalink
Move NotifyRead to mysten-common crate (MystenLabs#9599)
Browse files Browse the repository at this point in the history
  • Loading branch information
andll authored Mar 20, 2023
1 parent f389b8e commit 4568b46
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 184 deletions.
3 changes: 1 addition & 2 deletions crates/mysten-common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// Low level ultilities shared between Sui and Narwhal.
pub mod notify_once;
pub mod sync;
6 changes: 6 additions & 0 deletions crates/mysten-common/src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

/// Low level ultilities shared between Sui and Narwhal.
pub mod notify_once;
pub mod notify_read;
File renamed without changes.
178 changes: 178 additions & 0 deletions crates/mysten-common/src/sync/notify_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright (c) Mysten Labs, Inc.
// SPDX-License-Identifier: Apache-2.0

use parking_lot::Mutex;
use parking_lot::MutexGuard;
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::task::{Context, Poll};
use tokio::sync::oneshot;

type Registrations<V> = Vec<oneshot::Sender<V>>;

pub struct NotifyRead<K, V> {
pending: Vec<Mutex<HashMap<K, Registrations<V>>>>,
count_pending: AtomicUsize,
}

impl<K: Eq + Hash + Clone, V: Clone> NotifyRead<K, V> {
pub fn new() -> Self {
let pending = (0..255).map(|_| Default::default()).collect();
let count_pending = Default::default();
Self {
pending,
count_pending,
}
}

/// Asynchronously notifies waiters and return number of remaining pending registration
pub fn notify(&self, key: &K, value: &V) -> usize {
let registrations = self.pending(key).remove(key);
let Some(registrations) = registrations else {
return self.count_pending.load(Ordering::Relaxed);
};
let rem = self
.count_pending
.fetch_sub(registrations.len(), Ordering::Relaxed);
for registration in registrations {
registration.send(value.clone()).ok();
}
rem
}

pub fn register_one(&self, key: &K) -> Registration<K, V> {
self.count_pending.fetch_add(1, Ordering::Relaxed);
let (sender, receiver) = oneshot::channel();
self.register(key, sender);
Registration {
this: self,
registration: Some((key.clone(), receiver)),
}
}

pub fn register_all(&self, keys: Vec<K>) -> Vec<Registration<K, V>> {
self.count_pending.fetch_add(keys.len(), Ordering::Relaxed);
let mut registrations = vec![];
for key in keys.iter() {
let (sender, receiver) = oneshot::channel();
self.register(key, sender);
let registration = Registration {
this: self,
registration: Some((key.clone(), receiver)),
};
registrations.push(registration);
}
registrations
}

fn register(&self, key: &K, sender: oneshot::Sender<V>) {
self.pending(key)
.entry(key.clone())
.or_default()
.push(sender);
}

fn pending(&self, key: &K) -> MutexGuard<HashMap<K, Registrations<V>>> {
let mut state = DefaultHasher::new();
key.hash(&mut state);
let hash = state.finish();
let pending = self
.pending
.get((hash % self.pending.len() as u64) as usize)
.unwrap();
pending.lock()
}

pub fn num_pending(&self) -> usize {
self.count_pending.load(Ordering::Relaxed)
}

fn cleanup(&self, key: &K) {
let mut pending = self.pending(key);
// it is possible that registration was fulfilled before we get here
let Some(registrations) = pending.get_mut(key) else { return; };
let mut count_deleted = 0usize;
registrations.retain(|s| {
let delete = s.is_closed();
if delete {
count_deleted += 1;
}
!delete
});
self.count_pending
.fetch_sub(count_deleted, Ordering::Relaxed);
if registrations.is_empty() {
pending.remove(key);
}
}
}

/// Registration resolves to the value but also provides safe cancellation
/// When Registration is dropped before it is resolved, we de-register from the pending list
pub struct Registration<'a, K: Eq + Hash + Clone, V: Clone> {
this: &'a NotifyRead<K, V>,
registration: Option<(K, oneshot::Receiver<V>)>,
}

impl<'a, K: Eq + Hash + Clone + Unpin, V: Clone + Unpin> Future for Registration<'a, K, V> {
type Output = V;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let receiver = self
.registration
.as_mut()
.map(|(_key, receiver)| receiver)
.expect("poll can not be called after drop");
let poll = Pin::new(receiver).poll(cx);
if poll.is_ready() {
// When polling complete we no longer need to cancel
self.registration.take();
}
poll.map(|r| r.expect("Sender never drops when registration is pending"))
}
}

impl<'a, K: Eq + Hash + Clone, V: Clone> Drop for Registration<'a, K, V> {
fn drop(&mut self) {
if let Some((key, receiver)) = self.registration.take() {
mem::drop(receiver);
// Receiver is dropped before cleanup
self.this.cleanup(&key)
}
}
}
impl<K: Eq + Hash + Clone, V: Clone> Default for NotifyRead<K, V> {
fn default() -> Self {
Self::new()
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::future::join_all;

#[tokio::test]
pub async fn test_notify_read() {
let notify_read = NotifyRead::<u64, u64>::new();
let mut registrations = notify_read.register_all(vec![1, 2, 3]);
assert_eq!(3, notify_read.count_pending.load(Ordering::Relaxed));
registrations.pop();
assert_eq!(2, notify_read.count_pending.load(Ordering::Relaxed));
notify_read.notify(&2, &2);
notify_read.notify(&1, &1);
let reads = join_all(registrations).await;
assert_eq!(0, notify_read.count_pending.load(Ordering::Relaxed));
assert_eq!(reads, vec![1, 2]);
// ensure cleanup is done correctly
for pending in &notify_read.pending {
assert!(pending.lock().is_empty());
}
}
}
171 changes: 1 addition & 170 deletions crates/sui-core/src/authority/authority_notify_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,12 @@ use async_trait::async_trait;
use either::Either;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use parking_lot::Mutex;
use parking_lot::MutexGuard;
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::mem;
use std::pin::Pin;
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Instant;
use sui_types::base_types::TransactionDigest;
use sui_types::error::SuiResult;
use sui_types::messages::{TransactionEffects, TransactionEffectsAPI};
use tokio::sync::oneshot;
use tokio::time::Instant;
use tracing::debug;

#[async_trait]
Expand All @@ -44,140 +33,6 @@ pub trait EffectsNotifyRead: Send + Sync + 'static {
) -> SuiResult<Vec<Option<TransactionEffects>>>;
}

type Registrations<V> = Vec<oneshot::Sender<V>>;

pub(crate) struct NotifyRead<K, V> {
pending: Vec<Mutex<HashMap<K, Registrations<V>>>>,
count_pending: AtomicUsize,
}

impl<K: Eq + Hash + Clone, V: Clone> NotifyRead<K, V> {
pub fn new() -> Self {
let pending = (0..255).map(|_| Default::default()).collect();
let count_pending = Default::default();
Self {
pending,
count_pending,
}
}

/// Asynchronously notifies waiters and return number of remaining pending registration
pub fn notify(&self, key: &K, value: &V) -> usize {
let registrations = self.pending(key).remove(key);
let Some(registrations) = registrations else {
return self.count_pending.load(Ordering::Relaxed);
};
let rem = self
.count_pending
.fetch_sub(registrations.len(), Ordering::Relaxed);
for registration in registrations {
registration.send(value.clone()).ok();
}
rem
}

pub fn register_one(&self, key: &K) -> Registration<K, V> {
self.count_pending.fetch_add(1, Ordering::Relaxed);
let (sender, receiver) = oneshot::channel();
self.register(key, sender);
Registration {
this: self,
registration: Some((key.clone(), receiver)),
}
}

pub fn register_all(&self, keys: Vec<K>) -> Vec<Registration<K, V>> {
self.count_pending.fetch_add(keys.len(), Ordering::Relaxed);
let mut registrations = vec![];
for key in keys.iter() {
let (sender, receiver) = oneshot::channel();
self.register(key, sender);
let registration = Registration {
this: self,
registration: Some((key.clone(), receiver)),
};
registrations.push(registration);
}
registrations
}

fn register(&self, key: &K, sender: oneshot::Sender<V>) {
self.pending(key)
.entry(key.clone())
.or_default()
.push(sender);
}

fn pending(&self, key: &K) -> MutexGuard<HashMap<K, Registrations<V>>> {
let mut state = DefaultHasher::new();
key.hash(&mut state);
let hash = state.finish();
let pending = self
.pending
.get((hash % self.pending.len() as u64) as usize)
.unwrap();
pending.lock()
}

pub fn num_pending(&self) -> usize {
self.count_pending.load(Ordering::Relaxed)
}

fn cleanup(&self, key: &K) {
let mut pending = self.pending(key);
// it is possible that registration was fulfilled before we get here
let Some(registrations) = pending.get_mut(key) else { return; };
let mut count_deleted = 0usize;
registrations.retain(|s| {
let delete = s.is_closed();
if delete {
count_deleted += 1;
}
!delete
});
self.count_pending
.fetch_sub(count_deleted, Ordering::Relaxed);
if registrations.is_empty() {
pending.remove(key);
}
}
}

/// Registration resolves to the value but also provides safe cancellation
/// When Registration is dropped before it is resolved, we de-register from the pending list
pub struct Registration<'a, K: Eq + Hash + Clone, V: Clone> {
this: &'a NotifyRead<K, V>,
registration: Option<(K, oneshot::Receiver<V>)>,
}

impl<'a, K: Eq + Hash + Clone + Unpin, V: Clone + Unpin> Future for Registration<'a, K, V> {
type Output = V;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let receiver = self
.registration
.as_mut()
.map(|(_key, receiver)| receiver)
.expect("poll can not be called after drop");
let poll = Pin::new(receiver).poll(cx);
if poll.is_ready() {
// When polling complete we no longer need to cancel
self.registration.take();
}
poll.map(|r| r.expect("Sender never drops when registration is pending"))
}
}

impl<'a, K: Eq + Hash + Clone, V: Clone> Drop for Registration<'a, K, V> {
fn drop(&mut self) {
if let Some((key, receiver)) = self.registration.take() {
mem::drop(receiver);
// Receiver is dropped before cleanup
self.this.cleanup(&key)
}
}
}

#[async_trait]
impl EffectsNotifyRead for Arc<AuthorityStore> {
async fn notify_read_executed_effects(
Expand Down Expand Up @@ -231,27 +86,3 @@ impl EffectsNotifyRead for Arc<AuthorityStore> {
AuthorityStore::multi_get_executed_effects(self, digests)
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::future::join_all;

#[tokio::test]
pub async fn test_notify_read() {
let notify_read = NotifyRead::<u64, u64>::new();
let mut registrations = notify_read.register_all(vec![1, 2, 3]);
assert_eq!(3, notify_read.count_pending.load(Ordering::Relaxed));
registrations.pop();
assert_eq!(2, notify_read.count_pending.load(Ordering::Relaxed));
notify_read.notify(&2, &2);
notify_read.notify(&1, &1);
let reads = join_all(registrations).await;
assert_eq!(0, notify_read.count_pending.load(Ordering::Relaxed));
assert_eq!(reads, vec![1, 2]);
// ensure cleanup is done correctly
for pending in &notify_read.pending {
assert!(pending.lock().is_empty());
}
}
}
Loading

0 comments on commit 4568b46

Please sign in to comment.