Skip to content

Commit

Permalink
Merge pull request #3 from winstonewert/master
Browse files Browse the repository at this point in the history
Improved error handling #2
  • Loading branch information
markschl authored Sep 14, 2020
2 parents 2895a57 + 85dfd37 commit af912ad
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 31 deletions.
7 changes: 7 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,10 @@ extern crate crossbeam;

pub mod read;
pub mod write;

fn unwrap_or_resume_unwind<V>(value: Result<V, Box<dyn std::any::Any + Send>>) -> V {
match value {
Ok(value) => value,
Err(error) => std::panic::resume_unwind(error),
}
}
28 changes: 10 additions & 18 deletions src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,6 @@ impl Reader {
self.empty_send.send(None).ok();
}

/// return errors that may still be in the queue. This is still possible even if the
/// sender was dropped, as they are in the queue buffer (see docs for std::mpsc::Receiver::recv).
#[inline]
fn get_errors(&self) -> io::Result<()> {
for res in &self.full_recv {
res?;
}
Ok(())
}

// assumes that self.buffer is not None. Returns a tuple of the read result
// and a flag indicating if a new buffer should be received (cannot be done
// here due to borrow checker)
Expand Down Expand Up @@ -203,7 +193,7 @@ pub fn reader<R, F, O, E>(bufsize: usize, queuelen: usize, reader: R, func: F) -
where
F: FnOnce(&mut Reader) -> Result<O, E>,
R: io::Read + Send,
E: Send + From<io::Error>,
E: Send,
{
reader_init(bufsize, queuelen, || Ok(reader), func)
}
Expand Down Expand Up @@ -237,7 +227,7 @@ where
I: Send + FnOnce() -> Result<R, E>,
F: FnOnce(&mut Reader) -> Result<O, E>,
R: io::Read,
E: Send + From<io::Error>,
E: Send,
{
assert!(queuelen >= 1);
assert!(bufsize > 0);
Expand All @@ -255,15 +245,17 @@ where
Ok::<_, E>(())
});

let out = func(&mut reader)?;
let out = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(&mut reader)));

reader.done();

handle.join().unwrap()?;

reader.get_errors()?;

Ok(out)
// We deliberately ensure that errors from the background reading thread are given priority.
// This does NOT include errors returned from the actual I/O which are returned via the channels
// To the reader. It includes errors returned by init_reader() and panics that occured while reading.
// Either of those cases will have cause the reader to be in an unworkable state. Consequently, we want to
// surface the error that caused this.
crate::unwrap_or_resume_unwind(handle.join())?;
crate::unwrap_or_resume_unwind(out)
})
.unwrap()
}
20 changes: 16 additions & 4 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,13 @@ impl Writer {
if self.full_send.send(Message::Buffer(full)).is_err() {
self.get_errors()?;
}
Ok(())
} else {
self.get_errors()?;
// If we reach this point, we couldn't communicate with the background writer
// but there were no errors recorded in the queue. BrokenPipe seems to closest error to return.
Err(io::Error::from(io::ErrorKind::BrokenPipe))
}
Ok(())
}

#[inline]
Expand Down Expand Up @@ -293,12 +296,21 @@ where
Ok(None)
});

let out = func(&mut writer)?;
let out = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| func(&mut writer)));

let writer_result = writer.done();

writer.done()?;
let handle = handle.join();

let of = handle.join().unwrap()?;
// Prefer errors from the background thread. This doesn't include actual I/O errors from the writing
// because those are sent via the channel to the main thread. Instead, it returns errors from init_writer
// or panics from the writing thread. If either of those happen, writing in the main thread will fail
// but we want to return the underlying reason.
let of = crate::unwrap_or_resume_unwind(handle)?;
let out = crate::unwrap_or_resume_unwind(out)?;

// Report write errors that happened after the main thread stopped writing.
writer_result?;
writer.get_errors()?;

Ok((out, of.unwrap()))
Expand Down
60 changes: 54 additions & 6 deletions tests/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,28 @@ struct Reader<'a> {
// used to test what happens to errors that are
// stuck in the queue
fails_after: usize,
panic: bool,
}

impl<'a> Reader<'a> {
fn new(data: &'a [u8], block_size: usize, fails_after: usize) -> Reader {
fn new(data: &'a [u8], block_size: usize, fails_after: usize, panic: bool) -> Reader {
Reader {
data: data,
block_size: block_size,
fails_after: fails_after,
panic: panic,
}
}
}

impl<'a> Read for Reader<'a> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.fails_after == 0 {
return Err(io::Error::new(io::ErrorKind::Other, "read err"));
if self.panic {
panic!("read err");
} else {
return Err(io::Error::new(io::ErrorKind::Other, "read err"));
}
}
self.fails_after -= 1;
let amt = min(self.data.len(), min(buf.len(), self.block_size));
Expand Down Expand Up @@ -61,11 +67,11 @@ fn read() {
for out_bufsize in 1..len {
for queuelen in 1..len {
// test the mock reader itself
let mut rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX);
let rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX, false);
assert_eq!(read_chunks(rdr, out_bufsize).unwrap().as_slice(), &text[..]);

// test threaded reader
let mut rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX);
let rdr = Reader::new(text, rdr_block_size, ::std::usize::MAX, false);
let out = reader(channel_bufsize, queuelen, rdr, |r| {
read_chunks(r, out_bufsize)
})
Expand All @@ -91,7 +97,7 @@ fn read_fail() {
for channel_bufsize in 1..len {
for queuelen in 1..len {
let mut out = vec![0];
let mut rdr = Reader::new(text, channel_bufsize, len / channel_bufsize);
let rdr = Reader::new(text, channel_bufsize, len / channel_bufsize, false);
let res: io::Result<_> = reader(channel_bufsize, queuelen, rdr, |r| {
while r.read(&mut out)? > 0 {}
Ok(())
Expand All @@ -109,10 +115,52 @@ fn read_fail() {
}
}

#[test]
#[should_panic(expected = "read err")]
fn read_panic() {
let text = b"The quick brown fox";
let rdr = Reader::new(text, 1, 1, true);
let _res: io::Result<_> = reader(1, 1, rdr, |r| {
r.read_to_end(&mut Vec::new())?;
Ok(())
});
}

#[test]
fn read_fail_processing() {
let text = b"The quick brown fox";

let rdr = Reader::new(text, 1, 1, false);
let res: Result<(), &'static str> = reader(1, 1, rdr, |_r| Err("gave up"));

if let Err(e) = res {
assert_eq!(&format!("{}", e), "gave up");
} else {
panic!("read should fail");
}
}

#[test]
#[should_panic(expected = "gave up")]
fn read_panic_processing() {
let text = b"The quick brown fox";

let rdr = Reader::new(text, 1, 1, false);
let _res: Result<(), &'static str> = reader(1, 1, rdr, |_r| panic!("gave up"));
}

#[test]
fn reader_init_fail() {
let e = io::Error::new(io::ErrorKind::Other, "init err");
let res = reader_init(5, 2, || Err::<&[u8], _>(e), |_| Ok(()));
let res = reader_init(
5,
2,
|| Err::<&[u8], _>(e),
|reader| {
reader.read_to_end(&mut Vec::new())?;
Ok(())
},
);
if let Err(e) = res {
assert_eq!(&format!("{}", e), "init err");
} else {
Expand Down
35 changes: 32 additions & 3 deletions tests/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ fn write_thread() {
for writer_bufsize in 1..len {
for queuelen in 1..len {
// Test the writer: write without flushing, which should result in empty output
let mut w = writer_init_finish(
let w = writer_init_finish(
channel_bufsize,
queuelen,
|| Ok(Writer::new(false, false, writer_bufsize)),
Expand Down Expand Up @@ -106,7 +106,15 @@ fn write_thread() {
#[test]
fn writer_init_fail() {
let e = io::Error::new(io::ErrorKind::Other, "init err");
let res = writer_init(5, 2, || Err::<&mut [u8], _>(e), |_| Ok(()));
let res = writer_init(
5,
2,
|| Err::<&mut [u8], _>(e),
|writer| {
writer.write(b"let the cows come home")?;
Ok(())
},
);
if let Err(e) = res {
assert_eq!(&format!("{}", e), "init err");
} else {
Expand All @@ -123,7 +131,7 @@ fn write_fail() {
for writer_bufsize in 1..len {
for queuelen in 1..len {
let w = Writer::new(true, false, writer_bufsize);
let res = writer(channel_bufsize, queuelen, w, |w| w.write(text));
let res = writer(channel_bufsize, queuelen, w, |w| w.write_all(text));
if let Err(e) = res {
assert_eq!(&format!("{}", e), "write err");
} else {
Expand All @@ -141,3 +149,24 @@ fn write_fail() {
}
}
}

#[test]
fn write_source_fail() {
let w = Writer::new(true, false, 1);
let res: std::io::Result<()> = writer(1, 1, w, |_w| {
Err(std::io::Error::from(std::io::ErrorKind::AddrInUse))
});

if let Err(e) = res {
assert_eq!(e.kind(), std::io::ErrorKind::AddrInUse);
} else {
panic!("expected error")
}
}

#[test]
#[should_panic(expected = "all out of bubblegum")]
fn write_source_panic() {
let w = Writer::new(true, false, 1);
let _res: std::io::Result<()> = writer(1, 1, w, |_w| panic!("all out of bubblegum"));
}

0 comments on commit af912ad

Please sign in to comment.