Skip to content

Commit

Permalink
Added try_send_multiple() in ipc_queue::Sender
Browse files Browse the repository at this point in the history
  • Loading branch information
mzohreva committed Sep 30, 2020
1 parent 137477e commit 5b2ba1e
Showing 1 changed file with 63 additions and 2 deletions.
65 changes: 63 additions & 2 deletions ipc-queue/src/interface_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,29 @@ impl<T: Transmittable, S: Synchronizer> Sender<T, S> {
})
}

/// Tries to send multiple values. Calling this function has the same
/// semantics as calling `try_send` for each item in order until an error
/// occurs, but it has the benefit of notifying the receiver at most once.
///
/// Returns the number of successfully sent items if any item was
/// successfully sent, otherwise returns an error.
pub fn try_send_multiple(&self, values: &[Identified<T>]) -> Result<usize, TrySendError> {
let mut wake_receiver = false;
let mut sent = 0;
for val in values {
wake_receiver |= match self.inner.try_send_impl(*val) {
Ok(wake_receiver) => wake_receiver,
Err(e) if sent == 0 => return Err(e),
Err(_) => break,
};
sent += 1;
}
if wake_receiver {
self.synchronizer.notify(QueueEvent::NotEmpty);
}
Ok(sent)
}

pub fn send(&self, val: Identified<T>) -> Result<(), SendError> {
loop {
match self.inner.try_send_impl(val) {
Expand Down Expand Up @@ -133,9 +156,9 @@ impl<'r, T: Transmittable, S: Synchronizer> Iterator for TryIter<'r, T, S> {

#[cfg(test)]
mod tests {
use crate::*;
use crate::test_support::TestValue;
use crate::test_support::pubsub::{Channel, Subscription};
use crate::test_support::TestValue;
use crate::*;
use std::thread;

fn do_single_sender(len: usize, n: u64) {
Expand Down Expand Up @@ -315,6 +338,44 @@ mod tests {
h.join().unwrap();
}

#[test]
fn try_send_multiple() {
let s = TestSynchronizer::new();
let (tx, rx) = bounded(32, s);
const SENDERS: usize = 4;
const N: usize = 1024;
let mut handles = Vec::with_capacity(SENDERS);

for t in 0..SENDERS {
let tx = tx.clone();
handles.push(thread::spawn(move || {
let mut to_send = Vec::with_capacity(N);
for i in 0..N {
let id = (t * N + i + 1) as u64;
to_send.push(Identified { id, data: TestValue(i as u64) });
}
let mut sent = 0;
while sent < to_send.len() {
match tx.try_send_multiple(&to_send[sent..]) {
Err(_) => thread::yield_now(),
Ok(n) => sent += n,
}
}
}));
}

let mut values = Vec::with_capacity(N * SENDERS);
for _ in 0..(N * SENDERS) {
values.push(rx.recv().unwrap());
}
values.sort_by_key(|v| v.id);
assert!(values.windows(2).all(|w| w[0].id < w[1].id));

for h in handles {
h.join().unwrap();
}
}

#[derive(Clone)]
pub struct TestSynchronizer {
not_empty: Subscription<()>,
Expand Down

0 comments on commit 5b2ba1e

Please sign in to comment.