Skip to content

Commit

Permalink
Add SnapshotRejected event and fix the quota tests
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Aug 13, 2024
1 parent 67127f5 commit 80141dc
Show file tree
Hide file tree
Showing 10 changed files with 83 additions and 60 deletions.
2 changes: 1 addition & 1 deletion ffi/src/repository.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ pub(crate) fn subscribe(
loop {
match notification_rx.recv().await {
Ok(Event {
payload: Payload::BranchChanged(_) | Payload::BlockReceived { .. },
payload: Payload::SnapshotApproved(_) | Payload::BlockReceived { .. },
..
}) => (),
Ok(Event { .. }) => continue,
Expand Down
2 changes: 1 addition & 1 deletion lib/benches/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ async fn wait_for_event(rx: &mut broadcast::Receiver<Event>) {
loop {
match time::timeout(EVENT_TIMEOUT, rx.recv()).await {
Ok(Ok(Event {
payload: Payload::BranchChanged(_) | Payload::BlockReceived { .. },
payload: Payload::SnapshotApproved(_) | Payload::BlockReceived { .. },
..
}))
| Ok(Err(RecvError::Lagged(_))) => return,
Expand Down
3 changes: 2 additions & 1 deletion lib/src/branch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,8 @@ pub(crate) struct BranchEventSender {

impl BranchEventSender {
pub fn send(&self) {
self.event_tx.send(Payload::BranchChanged(self.branch_id));
self.event_tx
.send(Payload::SnapshotApproved(self.branch_id));
}
}

Expand Down
6 changes: 4 additions & 2 deletions lib/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ use tokio::sync::broadcast;
#[derive(Copy, Clone, Debug)]
#[non_exhaustive]
pub enum Payload {
/// A new snapshot was created in the specified branch.
BranchChanged(PublicKey),
/// A new snapshot was approved in the specified branch.
SnapshotApproved(PublicKey),
/// A new snapshot was rejected in the specified branch.
SnapshotRejected(PublicKey),
/// A block with the specified id was received from a remote replica.
BlockReceived(BlockId),
/// The `maintain` worker job successfully completed. It won't perform any more work until
Expand Down
7 changes: 6 additions & 1 deletion lib/src/network/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,12 @@ impl Inner {

// Notify about newly approved snapshots
for branch_id in &status.approved_branches {
event_tx.send(Payload::BranchChanged(*branch_id));
event_tx.send(Payload::SnapshotApproved(*branch_id));
}

// Notify about newly rejected snapshots
for branch_id in &status.rejected_branches {
event_tx.send(Payload::SnapshotRejected(*branch_id));
}

status
Expand Down
4 changes: 2 additions & 2 deletions lib/src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,13 @@ impl Inner {
loop {
match event_rx.recv().await {
Ok(Event { payload, .. }) => match payload {
Payload::BranchChanged(branch_id) => {
Payload::SnapshotApproved(branch_id) => {
self.handle_branch_changed_event(branch_id).await?
}
Payload::BlockReceived(block_id) => {
self.handle_block_received_event(block_id).await?;
}
Payload::MaintenanceCompleted => continue,
Payload::SnapshotRejected(_) | Payload::MaintenanceCompleted => continue,
},
Err(RecvError::Lagged(_)) => self.handle_unknown_event().await?,
Err(RecvError::Closed) => return Ok(()),
Expand Down
4 changes: 2 additions & 2 deletions lib/src/network/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ async fn save_snapshot(
.await;

for branch_id in status.approved_branches {
vault.event_tx.send(Payload::BranchChanged(branch_id));
vault.event_tx.send(Payload::SnapshotApproved(branch_id));
}
}

Expand Down Expand Up @@ -552,7 +552,7 @@ async fn create_changeset(

tx.commit().await.unwrap();

vault.event_tx.send(Payload::BranchChanged(*writer_id));
vault.event_tx.send(Payload::SnapshotApproved(*writer_id));
}

async fn load_latest_root_node(vault: &Vault, writer_id: &PublicKey) -> Option<RootNode> {
Expand Down
10 changes: 5 additions & 5 deletions lib/src/repository/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(super) async fn run(shared: Arc<Shared>) {
future::ready(match event {
Ok(Event { scope, .. }) if scope == event_scope => None,
Ok(Event {
payload: Payload::BranchChanged(_),
payload: Payload::SnapshotApproved(_),
..
}) => Some(Command::Interrupt),
Ok(Event {
Expand All @@ -55,7 +55,7 @@ pub(super) async fn run(shared: Arc<Shared>) {
})
| Err(Lagged) => Some(Command::Wait),
Ok(Event {
payload: Payload::MaintenanceCompleted,
payload: Payload::SnapshotRejected(_) | Payload::MaintenanceCompleted,
..
}) => None,
})
Expand Down Expand Up @@ -92,16 +92,16 @@ pub(super) async fn run(shared: Arc<Shared>) {
event::into_stream(shared.vault.event_tx.subscribe()).filter_map(move |event| {
future::ready(match event {
Ok(Event {
payload: Payload::BranchChanged(_),
payload: Payload::SnapshotApproved(_),
scope,
}) if scope != event_scope => Some(Command::Interrupt),
Ok(Event {
payload: Payload::BranchChanged(_) | Payload::BlockReceived { .. },
payload: Payload::SnapshotApproved(_) | Payload::BlockReceived { .. },
..
})
| Err(Lagged) => Some(Command::Wait),
Ok(Event {
payload: Payload::MaintenanceCompleted,
payload: Payload::SnapshotRejected(_) | Payload::MaintenanceCompleted,
..
}) => None,
})
Expand Down
44 changes: 18 additions & 26 deletions lib/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,41 +462,33 @@ where
break;
}

wait(&mut rx).await
wait(&mut rx).await;
}
})
.await
.unwrap()
}

pub(crate) async fn wait(rx: &mut broadcast::Receiver<Event>) {
loop {
match time::timeout(*EVENT_TIMEOUT, rx.recv()).await {
Ok(event) => {
debug!(?event);

match event {
Ok(Event {
payload:
Payload::BranchChanged(_)
| Payload::BlockReceived { .. }
| Payload::MaintenanceCompleted,
..
})
| Err(RecvError::Lagged(_)) => return,
Ok(Event { .. }) => continue,
Err(RecvError::Closed) => panic!("notification channel unexpectedly closed"),
}
}
Err(_) => {
const MESSAGE: &str = "timeout waiting for notification";
/// Waits for an event to be received and returns it. Returns `None` if the received lagged.
pub(crate) async fn wait(rx: &mut broadcast::Receiver<Event>) -> Option<Payload> {
match time::timeout(*EVENT_TIMEOUT, rx.recv()).await {
Ok(event) => {
debug!(?event);

// NOTE: in release mode backtrace is useless so this trace helps us to locate the
// source of the panic:
error!("{}", MESSAGE);
panic!("{}", MESSAGE);
match event {
Ok(event) => Some(event.payload),
Err(RecvError::Lagged(_)) => None,
Err(RecvError::Closed) => panic!("notification channel unexpectedly closed"),
}
}
Err(_) => {
const MESSAGE: &str = "timeout waiting for notification";

// NOTE: in release mode backtrace is useless so this trace helps us to locate the
// source of the panic:
error!("{}", MESSAGE);
panic!("{}", MESSAGE);
}
}
}

Expand Down
61 changes: 42 additions & 19 deletions lib/tests/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,20 @@
#[macro_use]
mod common;

use self::common::{
actor, dump, sync_watch, traffic_monitor::TrafficMonitor, Env, Proto, DEFAULT_REPO,
};
use crate::common::wait;

use self::common::{actor, dump, sync_watch, Env, Proto, DEFAULT_REPO};
use assert_matches::assert_matches;
use backoff::{backoff::Backoff, ExponentialBackoffBuilder};
use metrics_ext::WatchRecorder;
use ouisync::{
Access, AccessMode, EntryType, Error, Repository, StorageSize, StoreError, VersionVector,
BLOB_HEADER_SIZE, BLOCK_SIZE,
Access, AccessMode, EntryType, Error, Payload, Repository, StorageSize, StoreError,
VersionVector, BLOB_HEADER_SIZE, BLOCK_SIZE,
};
use rand::Rng;
use std::{cmp::Ordering, io::SeekFrom, sync::Arc, time::Duration};
use std::{cmp::Ordering, collections::HashSet, io::SeekFrom, sync::Arc, time::Duration};
use tokio::{
sync::{broadcast, mpsc, Barrier},
time::sleep,
time::{self, sleep},
};
use tracing::{instrument, Instrument};

Expand Down Expand Up @@ -1139,12 +1138,9 @@ fn quota_exceed() {

env.actor("reader", {
async move {
let watch_recorder = WatchRecorder::new();
let mut traffic = TrafficMonitor::new(watch_recorder.subscriber());

let network = actor::create_network(Proto::Tcp).await;

let params = actor::get_repo_params(DEFAULT_REPO).with_recorder(watch_recorder);
let params = actor::get_repo_params(DEFAULT_REPO);
let secrets = actor::get_repo_secrets(DEFAULT_REPO);
let repo = Repository::create(
&params,
Expand All @@ -1164,10 +1160,18 @@ fn quota_exceed() {
assert!(size0 <= quota);

info!("read 0.dat");

let mut rx = repo.subscribe();

tx.send(()).await.unwrap();

// Wait for the traffic to settle
traffic.wait().await;
// Wait for the next snapshot to be rejected
loop {
match wait(&mut rx).await {
Some(Payload::SnapshotRejected(_)) => break,
_ => continue,
}
}

// The second file is rejected because it exceeds the quota
let size1 = repo.size().await.unwrap();
Expand Down Expand Up @@ -1214,14 +1218,11 @@ fn quota_concurrent_writes() {
}

env.actor("reader", async move {
let watch_recorder = WatchRecorder::new();
let mut traffic = TrafficMonitor::new(watch_recorder.subscriber());

let network = actor::create_network(Proto::Tcp).await;
network.add_user_provided_peer(&actor::lookup_addr("writer-0").await);
network.add_user_provided_peer(&actor::lookup_addr("writer-1").await);

let params = actor::get_repo_params(DEFAULT_REPO).with_recorder(watch_recorder);
let params = actor::get_repo_params(DEFAULT_REPO);
let secrets = actor::get_repo_secrets(DEFAULT_REPO);
let repo = Repository::create(
&params,
Expand All @@ -1231,9 +1232,31 @@ fn quota_concurrent_writes() {
.unwrap();
repo.set_quota(Some(quota)).await.unwrap();

let mut rx = repo.subscribe();

let _reg = network.register(repo.handle()).await;

traffic.wait().await;
// One snapshot is approved and one rejected.
let mut approved = HashSet::new();
let mut rejected = HashSet::new();

loop {
// HACK: wait 1 second after receiving the last event to ensure the sync has settled.
// TODO: Find a better way to do this.
match time::timeout(Duration::from_secs(1), wait(&mut rx)).await {
Ok(Some(Payload::SnapshotApproved(writer_id))) => {
approved.insert(writer_id);
}
Ok(Some(Payload::SnapshotRejected(writer_id))) => {
rejected.insert(writer_id);
}
Ok(_) => (),
Err(_) => break,
}
}

assert_eq!(approved.len(), 1);
assert_eq!(rejected.len(), 1);

let size = repo.size().await.unwrap();
assert!(size <= quota);
Expand Down

0 comments on commit 80141dc

Please sign in to comment.