Skip to content

Commit

Permalink
Update to nails 0.11.0. (pantsbuild#11370)
Browse files Browse the repository at this point in the history
Update to nails 0.11.0 to pull in some type safety (and hopefully correctness) improvements around cancellation.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Dec 23, 2020
1 parent c06a2e8 commit 0d03530
Show file tree
Hide file tree
Showing 7 changed files with 74 additions and 81 deletions.
4 changes: 2 additions & 2 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/rust/engine/nailgun/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ async_latch = { path = "../async_latch" }
bytes = "0.5"
futures = "0.3"
log = "0.4"
nails = "0.8"
nails = "0.11"
os_pipe = "0.9"
task_executor = { path = "../task_executor" }
tokio = { version = "0.2.23", features = ["tcp", "fs", "sync", "io-std", "signal"] }
Expand Down
8 changes: 0 additions & 8 deletions src/rust/engine/nailgun/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,6 @@ async fn handle_client_output(
NailgunClientError::PostConnect(format!("Failed to flush stderr: {}", err))
})?
},
Some(ChildOutput::Exit(_)) => {
// NB: We ignore exit here and allow the main thread to handle exiting. This API is
// error prone: see https://github.com/stuhood/nails/issues/1 for more info.
}
None => break,
}
}
Expand Down Expand Up @@ -100,10 +96,6 @@ async fn handle_client_input(mut stdin_write: mpsc::Sender<ChildInput>) -> Resul
.await
.map_err(send_to_io)?;
}
stdin_write
.send(ChildInput::StdinEOF)
.await
.map_err(send_to_io)?;
Ok(())
}

Expand Down
112 changes: 48 additions & 64 deletions src/rust/engine/nailgun/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@ use std::sync::Arc;

use async_latch::AsyncLatch;
use bytes::Bytes;
use futures::channel::{mpsc, oneshot};
use futures::channel::oneshot;
use futures::{future, sink, stream, FutureExt, SinkExt, StreamExt, TryStreamExt};
use log::{debug, error, info};
use nails::execution::{self, sink_for, stream_for, ChildInput, ChildOutput, ExitCode};
use log::{debug, info};
use nails::execution::{
self, child_channel, sink_for, stream_for, ChildInput, ChildOutput, ExitCode,
};
use nails::Nail;
use task_executor::Executor;
use tokio::fs::File;
Expand Down Expand Up @@ -227,85 +229,67 @@ struct RawFdNail {
}

impl Nail for RawFdNail {
fn spawn(
&self,
cmd: execution::Command,
input_stream: mpsc::Receiver<ChildInput>,
) -> Result<nails::server::Child, io::Error> {
fn spawn(&self, cmd: execution::Command) -> Result<nails::server::Child, io::Error> {
let env = cmd.env.iter().cloned().collect::<HashMap<_, _>>();

// Handle stdin. If the input stream closes, the run is cancelled.
let cancelled = AsyncLatch::new();
// Handle stdin.
let (stdin_handle, stdin_sink) = Self::input(Self::ttypath_from_env(&env, 0))?;
let accepts_stdin = {
let (accepts_stdin, input_stream_fut) = if let Some(mut stdin_sink) = stdin_sink {
// Forward all stdin to the child process.
(
true,
async move {
let mut input_stream = input_stream.filter_map(|child_input| {
Box::pin(async move {
match child_input {
ChildInput::Stdin(bytes) => Some(Ok(bytes)),
ChildInput::StdinEOF => None,
}
})
});
let _ = stdin_sink.send_all(&mut input_stream).await;
}
.boxed(),
)
} else {
// Stdin will be handled directly by the TTY. Only propagate cancellation.
(false, input_stream.fold((), |(), _| async {}).boxed())
};
// Spawn a task that will propagate the input stream, and then trigger cancellation.
let cancelled = cancelled.clone();
let _join = self.executor.spawn(input_stream_fut.map(move |_| {
cancelled.trigger();
}));
accepts_stdin
let maybe_stdin_write = if let Some(mut stdin_sink) = stdin_sink {
let (stdin_write, stdin_read) = child_channel::<ChildInput>();
// Spawn a task that will propagate the input stream.
let _join = self.executor.spawn(async move {
let mut input_stream = stdin_read.map(|child_input| match child_input {
ChildInput::Stdin(bytes) => Ok(bytes),
});
let _ = stdin_sink.send_all(&mut input_stream).await;
});
Some(stdin_write)
} else {
// Stdin will be handled directly by the TTY.
None
};

// And stdout/stderr.
let (stdout_stream, stdout_handle) = Self::output(Self::ttypath_from_env(&env, 1))?;
let (stderr_stream, stderr_handle) = Self::output(Self::ttypath_from_env(&env, 2))?;

// Set up a cancellation token that is triggered on client shutdown.
let cancelled = AsyncLatch::new();
let shutdown = {
let cancelled = cancelled.clone();
async move {
cancelled.trigger();
}
};

// Spawn the underlying function as a blocking task, and capture its exit code to append to the
// output stream.
let nail = self.clone();
let exit_code_future = self.executor.spawn_blocking(move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
let exit_code = self
.executor
.spawn_blocking(move || {
// NB: This closure captures the stdio handles, and will drop/close them when it completes.
(nail.runner)(RawFdExecution {
cmd,
cancelled,
stdin_fd: stdin_handle.as_raw_fd(),
stdout_fd: stdout_handle.as_raw_fd(),
stderr_fd: stderr_handle.as_raw_fd(),
})
})
});
.boxed();

// Fully consume the stdout/stderr streams before waiting on the exit stream.
// Select a single stdout/stderr stream.
let stdout_stream = stdout_stream.map_ok(ChildOutput::Stdout);
let stderr_stream = stderr_stream.map_ok(ChildOutput::Stderr);
let exit_stream = exit_code_future
.into_stream()
.map(|exit_code| Ok(ChildOutput::Exit(exit_code)));
let output_stream = stream::select(stdout_stream, stderr_stream)
.chain(exit_stream)
.map(|res| match res {
Ok(o) => o,
Err(e) => {
error!("IO error interacting with the runner: {:?}", e);
ChildOutput::Exit(ExitCode(-1))
}
})
.boxed();
let output_stream = stream::select(stdout_stream, stderr_stream).boxed();

Ok(nails::server::Child {
Ok(nails::server::Child::new(
output_stream,
accepts_stdin,
})
maybe_stdin_write,
exit_code,
Some(shutdown.boxed()),
))
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ futures = "0.3"
hashing = { path = "../hashing" }
libc = "0.2.39"
log = "0.4"
nails = "0.8"
nails = "0.11"
sha2 = "0.9"
sharded_lmdb = { path = "../sharded_lmdb" }
shell-quote = "0.1.0"
Expand Down
12 changes: 11 additions & 1 deletion src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use fs::{
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::stream::{BoxStream, StreamExt, TryStreamExt};
use log::{debug, info};
use nails::execution::{ChildOutput, ExitCode};
use nails::execution::ExitCode;
use shell_quote::bash;
use store::{OneOffStoreFileByDigest, Snapshot, Store};
use tokio::process::{Child, Command};
Expand Down Expand Up @@ -178,6 +178,16 @@ impl HermeticCommand {
}
}

// TODO: A Stream that ends with `Exit` is error prone: we should consider creating a Child struct
// similar to nails::server::Child (which is itself shaped like `std::process::Child`).
// See https://github.com/stuhood/nails/issues/1 for more info.
#[derive(Debug, PartialEq, Eq)]
pub enum ChildOutput {
Stdout(Bytes),
Stderr(Bytes),
Exit(ExitCode),
}

///
/// The fully collected outputs of a completed child process.
///
Expand Down
15 changes: 11 additions & 4 deletions src/rust/engine/process_execution/src/nailgun/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use async_trait::async_trait;
use futures::future::{FutureExt, TryFutureExt};
use futures::stream::{BoxStream, StreamExt};
use log::{debug, trace};
use nails::execution::{child_channel, ChildInput, ChildOutput, Command};
use nails::execution::{self, child_channel, ChildInput, Command};
use tokio::net::TcpStream;

use crate::local::CapturedWorkdir;
use crate::local::{CapturedWorkdir, ChildOutput};
use crate::nailgun::nailgun_pool::NailgunProcessName;
use crate::{
Context, FallibleProcessResultWithPlatform, MultiPlatformProcess, NamedCaches, Platform, Process,
Expand Down Expand Up @@ -276,12 +276,19 @@ impl CapturedWorkdir for CommandRunner {
})
.await?;

let output_stream = child.output_stream.take().unwrap();
let output_stream = child
.output_stream
.take()
.unwrap()
.map(|output| match output {
execution::ChildOutput::Stdout(bytes) => Ok(ChildOutput::Stdout(bytes)),
execution::ChildOutput::Stderr(bytes) => Ok(ChildOutput::Stderr(bytes)),
});
let exit_code = child
.wait()
.map_ok(ChildOutput::Exit)
.map_err(|e| format!("Error communicating with server: {}", e));

Ok(futures::stream::select(output_stream.map(Ok), exit_code.into_stream()).boxed())
Ok(futures::stream::select(output_stream, exit_code.into_stream()).boxed())
}
}

0 comments on commit 0d03530

Please sign in to comment.