Skip to content

Commit

Permalink
Fix for hang with redirected stdio. (pantsbuild#16970)
Browse files Browse the repository at this point in the history
As described in pantsbuild#16969, our use of blocking IO for logging/stdio can cause a deadlock by preventing the task which consumes the other end of the stdio pipes from running.

This change introduces dedicated threads to consume each of `stdout` and `stderr`, which prevents them from ever being subject to the state of the tokio runtime.

As mentioned in pantsbuild#16969, this is really more of a "workaround" than a fix for the deeper issue. But it reliably fixes pantsbuild#16121, and will prevent any other such issues.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Sep 23, 2022
1 parent e6c6121 commit d2bbfe6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 11 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/rust/engine/nailgun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ nails = "0.13"
os_pipe = "1.0"
task_executor = { path = "../task_executor" }
tokio = { version = "1.16", features = ["fs", "io-std", "io-util", "net", "signal", "sync"] }
tokio-stream = "0.1"

[dev-dependencies]
tokio = { version = "1.16", features = ["io-std", "macros", "net", "rt-multi-thread"] }
49 changes: 38 additions & 11 deletions src/rust/engine/nailgun/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@ use bytes::Bytes;
use futures::channel::oneshot;
use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt, TryStreamExt};
use log::{debug, info};
use nails::execution::{
self, child_channel, sink_for, stream_for, ChildInput, ChildOutput, ExitCode,
};
use nails::execution::{self, child_channel, sink_for, ChildInput, ChildOutput, ExitCode};
use nails::Nail;
use task_executor::Executor;
use tokio::fs::File;
use tokio::net::TcpListener;
use tokio::sync::{Notify, RwLock};
use tokio::sync::{mpsc, Notify, RwLock};
use tokio_stream::wrappers::UnboundedReceiverStream;

pub struct Server {
exit_sender: oneshot::Sender<()>,
Expand Down Expand Up @@ -284,10 +283,10 @@ impl RawFdNail {
if let Some(tty) = Self::try_open_tty(tty_path, OpenOptions::new().read(true)) {
Ok((Box::new(tty), None))
} else {
let (stdin_reader, stdin_writer) = os_pipe::pipe()?;
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
let write_handle =
File::from_std(unsafe { std::fs::File::from_raw_fd(stdin_writer.into_raw_fd()) });
Ok((Box::new(stdin_reader), Some(sink_for(write_handle))))
File::from_std(unsafe { std::fs::File::from_raw_fd(pipe_writer.into_raw_fd()) });
Ok((Box::new(pipe_reader), Some(sink_for(write_handle))))
}
}

Expand Down Expand Up @@ -316,10 +315,12 @@ impl RawFdNail {
) {
Ok((stream::empty().boxed(), Box::new(tty)))
} else {
let (stdin_reader, stdin_writer) = os_pipe::pipe()?;
let read_handle =
File::from_std(unsafe { std::fs::File::from_raw_fd(stdin_reader.into_raw_fd()) });
Ok((stream_for(read_handle).boxed(), Box::new(stdin_writer)))
let (pipe_reader, pipe_writer) = os_pipe::pipe()?;
let read_handle = unsafe { std::fs::File::from_raw_fd(pipe_reader.into_raw_fd()) };
Ok((
blocking_stream_for(read_handle).boxed(),
Box::new(pipe_writer),
))
}
}

Expand Down Expand Up @@ -349,3 +350,29 @@ impl RawFdNail {
.map(PathBuf::from)
}
}

// TODO: See https://github.com/pantsbuild/pants/issues/16969.
pub fn blocking_stream_for<R: io::Read + Send + Sized + 'static>(
mut r: R,
) -> impl futures::Stream<Item = Result<Bytes, io::Error>> {
let (sender, receiver) = mpsc::unbounded_channel();
std::thread::spawn(move || {
let mut buf = [0; 4096];
loop {
match r.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
if sender.send(Ok(Bytes::copy_from_slice(&buf[..n]))).is_err() {
break;
}
}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
Err(e) => {
let _ = sender.send(Err(e));
break;
}
}
}
});
UnboundedReceiverStream::new(receiver)
}

0 comments on commit d2bbfe6

Please sign in to comment.