Skip to content

Commit

Permalink
Handle termination of background writer correctly
Browse files Browse the repository at this point in the history
fetch_error() should only be called if no panic or error in main thread happened
  • Loading branch information
Markus Schlegel committed Sep 14, 2020
1 parent 8210e52 commit 4f3e16a
Showing 1 changed file with 22 additions and 14 deletions.
36 changes: 22 additions & 14 deletions src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,41 +33,46 @@ impl Writer {
bufsize: usize,
) -> Self {
let buffer = io::Cursor::new(vec![0; bufsize].into_boxed_slice());

Writer {
empty_recv,
full_send,
buffer,
}
}

/// Sends the current buffer to the background thread for writing. Returns
/// `false` if the background thread has terminated, which could have the
/// following reasons:
/// 1) There was a write/flush error or panic. These have to be handled
/// accordingly, here we do nothing more.
/// 2) Message::Done has already been sent, send_to_background() was called
/// after done()). This should not happen in the current code.
/// 3) Writer initialization failed.
#[inline]
fn send_to_background(&mut self) -> io::Result<()> {
fn send_to_background(&mut self) -> io::Result<bool> {
if let Ok(empty) = self.empty_recv.recv() {
let full = replace(&mut self.buffer, io::Cursor::new(empty?));
if self.full_send.send(Message::Buffer(full)).is_err() {
self.get_errors()?;
if self.full_send.send(Message::Buffer(full)).is_ok() {
return Ok(true);
}
Ok(())
} else {
self.get_errors()?;
// If we reach this point, we couldn't communicate with the background writer
// but there were no errors recorded in the queue. BrokenPipe seems to closest error to return.
Err(io::Error::from(io::ErrorKind::BrokenPipe))
}
Ok(false)
}

#[inline]
fn done(&mut self) -> io::Result<()> {
// send last buffer
self.send_to_background()?;
// Tell the background writer to finish up if it didn't already
self.full_send.send(Message::Done).ok();
Ok(())
}

// return errors that may still be in the queue
// Checks all items in the empty_recv queue for a possible error,
// throwing away all buffers. This should thus only be called at the
// very end.
#[inline]
fn get_errors(&self) -> io::Result<()> {
fn fetch_error(&self) -> io::Result<()> {
for res in &self.empty_recv {
res?;
}
Expand All @@ -82,7 +87,9 @@ impl Write for Writer {
let n = self.buffer.write(&buffer[written..])?;
written += n;
if n == 0 {
self.send_to_background()?;
if !self.send_to_background()? {
break;
}
}
}
Ok(written)
Expand Down Expand Up @@ -311,7 +318,8 @@ where

// Report write errors that happened after the main thread stopped writing.
writer_result?;
writer.get_errors()?;
// Return errors that may have occurred when writing the last few chunks.
writer.fetch_error()?;

Ok((out, of.unwrap()))
})
Expand Down

0 comments on commit 4f3e16a

Please sign in to comment.