diff --git a/src/python/pants/backend/java/compile/javac.py b/src/python/pants/backend/java/compile/javac.py index 6dc47c5d330..d3676ae47b0 100644 --- a/src/python/pants/backend/java/compile/javac.py +++ b/src/python/pants/backend/java/compile/javac.py @@ -158,7 +158,6 @@ async def compile_java_source( ( prefixed_direct_dependency_classpath_digest, dest_dir_digest, - jdk_setup.digest, *( sources.snapshot.digest for _, sources in component_members_and_java_source_files diff --git a/src/python/pants/backend/java/dependency_inference/java_parser.py b/src/python/pants/backend/java/dependency_inference/java_parser.py index c358daa1adc..dc4b1149302 100644 --- a/src/python/pants/backend/java/dependency_inference/java_parser.py +++ b/src/python/pants/backend/java/dependency_inference/java_parser.py @@ -111,15 +111,6 @@ async def analyze_java_source_dependencies( ) ), ) - merged_digest = await Get( - Digest, - MergeDigests( - ( - tool_digest, - prefixed_source_files_digest, - ) - ), - ) analysis_output_path = "__source_analysis.json" @@ -132,7 +123,7 @@ async def analyze_java_source_dependencies( analysis_output_path, source_path, ], - input_digest=merged_digest, + input_digest=prefixed_source_files_digest, output_files=(analysis_output_path,), use_nailgun=tool_digest, append_only_caches=jdk_setup.append_only_caches, diff --git a/src/python/pants/backend/scala/compile/scalac.py b/src/python/pants/backend/scala/compile/scalac.py index 48cec7ec108..0a2cb24d3db 100644 --- a/src/python/pants/backend/scala/compile/scalac.py +++ b/src/python/pants/backend/scala/compile/scalac.py @@ -145,18 +145,19 @@ async def compile_scala_source( Digest, AddPrefix(merged_transitive_dependency_classpath_entries_digest, usercp) ) - merged_digest = await Get( - Digest, - MergeDigests( - ( - prefixed_transitive_dependency_classpath_digest, - tool_classpath.digest, - jdk_setup.digest, - *( - sources.snapshot.digest - for _, sources in component_members_and_scala_source_files - ), - ) + merged_tool_digest, merged_input_digest = await MultiGet( + Get(Digest, MergeDigests((tool_classpath.digest, jdk_setup.digest))), + Get( + Digest, + MergeDigests( + ( + prefixed_transitive_dependency_classpath_digest, + *( + sources.snapshot.digest + for _, sources in component_members_and_scala_source_files + ), + ) + ), ), ) @@ -183,8 +184,8 @@ async def compile_scala_source( ) ), ], - input_digest=merged_digest, - use_nailgun=jdk_setup.digest, + input_digest=merged_input_digest, + use_nailgun=merged_tool_digest, output_files=(output_file,), description=f"Compile {request.component} with scalac", level=LogLevel.DEBUG, diff --git a/src/python/pants/backend/scala/dependency_inference/scala_parser.py b/src/python/pants/backend/scala/dependency_inference/scala_parser.py index e9d28a36988..d993bb2c414 100644 --- a/src/python/pants/backend/scala/dependency_inference/scala_parser.py +++ b/src/python/pants/backend/scala/dependency_inference/scala_parser.py @@ -256,15 +256,6 @@ async def analyze_scala_source_dependencies( ) ), ) - merged_digest = await Get( - Digest, - MergeDigests( - ( - tool_digest, - prefixed_source_files_digest, - ) - ), - ) analysis_output_path = "__source_analysis.json" @@ -277,7 +268,7 @@ async def analyze_scala_source_dependencies( analysis_output_path, source_path, ], - input_digest=merged_digest, + input_digest=prefixed_source_files_digest, output_files=(analysis_output_path,), use_nailgun=tool_digest, append_only_caches=jdk_setup.append_only_caches, diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index a01d11c32f2..a444dfca21c 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -41,6 +41,7 @@ use log::Level; use protos::gen::build::bazel::remote::execution::v2 as remexec; use remexec::ExecutedActionMetadata; use serde::{Deserialize, Serialize}; +use store::{SnapshotOps, SnapshotOpsError, Store}; use workunit_store::{in_workunit, RunId, RunningWorkunit, WorkunitMetadata, WorkunitStore}; pub mod cache; @@ -177,6 +178,63 @@ fn serialize_level(level: &log::Level, s: S) -> Result Result { + let complete = store.merge(vec![input_files, use_nailgun]).await?; + Ok(Self { + complete, + input_files, + use_nailgun, + }) + } + + pub fn with_input_files(input_files: Digest) -> Self { + Self { + complete: input_files, + input_files, + use_nailgun: hashing::EMPTY_DIGEST, + } + } +} + +impl Default for InputDigests { + fn default() -> Self { + Self { + complete: hashing::EMPTY_DIGEST, + input_files: hashing::EMPTY_DIGEST, + use_nailgun: hashing::EMPTY_DIGEST, + } + } +} + /// /// A process to be executed. /// @@ -206,7 +264,10 @@ pub struct Process { /// pub working_directory: Option, - pub input_files: hashing::Digest, + /// + /// All of the input digests for the process. + /// + pub input_digests: InputDigests, pub output_files: BTreeSet, @@ -251,14 +312,6 @@ pub struct Process { pub platform_constraint: Option, - /// - /// If non-empty, the Digest of a nailgun server to use to attempt to spawn the Process. - /// - /// TODO: Currently this Digest must be a subset of the `input_digest`, but we should consider - /// making it disjoint, and then automatically merging it. - /// - pub use_nailgun: Digest, - pub cache_scope: ProcessCacheScope, } @@ -278,7 +331,7 @@ impl Process { argv, env: BTreeMap::new(), working_directory: None, - input_files: hashing::EMPTY_DIGEST, + input_digests: InputDigests::default(), output_files: BTreeSet::new(), output_directories: BTreeSet::new(), timeout: None, @@ -287,7 +340,6 @@ impl Process { append_only_caches: BTreeMap::new(), jdk_home: None, platform_constraint: None, - use_nailgun: hashing::EMPTY_DIGEST, execution_slot_variable: None, cache_scope: ProcessCacheScope::Successful, } diff --git a/src/rust/engine/process_execution/src/local.rs b/src/rust/engine/process_execution/src/local.rs index d14ace84a2c..ff236e74127 100644 --- a/src/rust/engine/process_execution/src/local.rs +++ b/src/rust/engine/process_execution/src/local.rs @@ -282,6 +282,18 @@ impl super::CommandRunner for CommandRunner { } }; + // Prepare the workdir. + let exclusive_spawn = prepare_workdir( + workdir_path.clone(), + &req, + req.input_digests.complete, + context.clone(), + self.store.clone(), + self.executor.clone(), + self.named_caches(), + ) + .await?; + workunit.increment_counter(Metric::LocalExecutionRequests, 1); let res = self .run_and_capture_workdir( @@ -291,6 +303,7 @@ impl super::CommandRunner for CommandRunner { self.executor.clone(), workdir_path.clone(), (), + exclusive_spawn, self.platform(), ) .map_err(|msg| { @@ -453,21 +466,11 @@ pub trait CapturedWorkdir { executor: task_executor::Executor, workdir_path: PathBuf, workdir_token: Self::WorkdirToken, + exclusive_spawn: bool, platform: Platform, ) -> Result { let start_time = Instant::now(); - // Prepare the workdir. - let exclusive_spawn = prepare_workdir( - workdir_path.clone(), - &req, - context.clone(), - store.clone(), - executor.clone(), - self.named_caches(), - ) - .await?; - // Spawn the process. // NB: We fully buffer up the `Stream` above into final `ChildResults` below and so could // instead be using `CommandExt::output_async` above to avoid the `ChildResults::collect_from` @@ -602,6 +605,7 @@ pub trait CapturedWorkdir { pub async fn prepare_workdir( workdir_path: PathBuf, req: &Process, + input_digest: hashing::Digest, context: Context, store: Store, executor: task_executor::Executor, @@ -627,7 +631,6 @@ pub async fn prepare_workdir( // non-determinism when paths overlap. let store2 = store.clone(); let workdir_path_2 = workdir_path.clone(); - let input_files = req.input_files; in_workunit!( context.workunit_store.clone(), "setup_sandbox".to_owned(), @@ -637,7 +640,7 @@ pub async fn prepare_workdir( }, |_workunit| async move { store2 - .materialize_directory(workdir_path_2, input_files) + .materialize_directory(workdir_path_2, input_digest) .await }, ) diff --git a/src/rust/engine/process_execution/src/local_tests.rs b/src/rust/engine/process_execution/src/local_tests.rs index f07e00a2a2c..8edeffc74ec 100644 --- a/src/rust/engine/process_execution/src/local_tests.rs +++ b/src/rust/engine/process_execution/src/local_tests.rs @@ -3,7 +3,7 @@ use testutil; use crate::{ CacheDest, CacheName, CommandRunner as CommandRunnerTrait, Context, - FallibleProcessResultWithPlatform, NamedCaches, Platform, Process, RelativePath, + FallibleProcessResultWithPlatform, InputDigests, NamedCaches, Platform, Process, RelativePath, }; use hashing::EMPTY_DIGEST; use shell_quote::bash; @@ -377,7 +377,7 @@ async fn test_directory_preservation() { let mut process = Process::new(argv.clone()).output_files(relative_paths(&["roland.ext"]).collect()); - process.input_files = TestDirectory::nested().digest(); + process.input_digests = InputDigests::with_input_files(TestDirectory::nested().digest()); process.working_directory = Some(RelativePath::new("cats").unwrap()); let result = run_command_locally_in_dir( @@ -553,7 +553,7 @@ async fn working_directory() { let mut process = Process::new(vec![find_bash(), "-c".to_owned(), "/bin/ls".to_string()]); process.working_directory = Some(RelativePath::new("cats").unwrap()); process.output_directories = relative_paths(&["roland.ext"]).collect::>(); - process.input_files = TestDirectory::nested().digest(); + process.input_digests = InputDigests::with_input_files(TestDirectory::nested().digest()); process.timeout = one_second(); process.description = "confused-cat".to_string(); diff --git a/src/rust/engine/process_execution/src/nailgun/mod.rs b/src/rust/engine/process_execution/src/nailgun/mod.rs index 17748e78ff7..b62a7b3b44f 100644 --- a/src/rust/engine/process_execution/src/nailgun/mod.rs +++ b/src/rust/engine/process_execution/src/nailgun/mod.rs @@ -1,4 +1,4 @@ -use std::collections::BTreeSet; +use std::collections::{BTreeMap, BTreeSet}; use std::net::SocketAddr; use std::path::{Path, PathBuf}; @@ -12,8 +12,10 @@ use task_executor::Executor; use tokio::net::TcpStream; use workunit_store::{in_workunit, Metric, RunningWorkunit, WorkunitMetadata}; -use crate::local::{CapturedWorkdir, ChildOutput}; -use crate::{Context, FallibleProcessResultWithPlatform, NamedCaches, Platform, Process}; +use crate::local::{prepare_workdir, CapturedWorkdir, ChildOutput}; +use crate::{ + Context, FallibleProcessResultWithPlatform, InputDigests, NamedCaches, Platform, Process, +}; #[cfg(test)] pub mod tests; @@ -44,15 +46,21 @@ fn construct_nailgun_server_request( full_args.push(NAILGUN_MAIN_CLASS.to_string()); full_args.extend(ARGS_TO_START_NAILGUN.iter().map(|&a| a.to_string())); + // Strip the input_files, preserving only the use_nailgun digest. + let input_digests = InputDigests { + complete: client_request.input_digests.use_nailgun, + use_nailgun: client_request.input_digests.use_nailgun, + input_files: hashing::EMPTY_DIGEST, + }; + Process { argv: full_args, - input_files: client_request.use_nailgun, + input_digests, output_files: BTreeSet::new(), output_directories: BTreeSet::new(), timeout: None, description: format!("nailgun server for {}", nailgun_name), level: log::Level::Info, - use_nailgun: hashing::EMPTY_DIGEST, execution_slot_variable: None, env: client_request.env, append_only_caches: client_request.append_only_caches, @@ -69,6 +77,8 @@ fn construct_nailgun_client_request( Process { argv: client_args, jdk_home: None, + // The append_only_caches are created and preserved by the server. + append_only_caches: BTreeMap::new(), ..original_req } } @@ -122,7 +132,7 @@ impl super::CommandRunner for CommandRunner { workunit: &mut RunningWorkunit, req: Process, ) -> Result { - if req.use_nailgun == hashing::EMPTY_DIGEST { + if req.input_digests.use_nailgun == hashing::EMPTY_DIGEST { trace!("The request is not nailgunnable! Short-circuiting to regular process execution"); return self.inner.run(context, workunit, req).await; } @@ -141,17 +151,22 @@ impl super::CommandRunner for CommandRunner { |workunit| async move { workunit.increment_counter(Metric::LocalExecutionRequests, 1); - // Separate argument lists, to form distinct EPRs for (1) starting the nailgun server and (2) running the client in it. + // Separate argument lists, to form distinct EPRs for + // 1. starting the nailgun server + // 2. running the client against it let ParsedJVMCommandLines { nailgun_args, + client_args, client_main_class, .. } = ParsedJVMCommandLines::parse_command_lines(&req.argv)?; - let nailgun_name = CommandRunner::calculate_nailgun_name(&client_main_class); - - let nailgun_req = - construct_nailgun_server_request(&nailgun_name, nailgun_args, req.clone()); - trace!("Running request under nailgun:\n {:#?}", &nailgun_req); + let nailgun_req = construct_nailgun_server_request( + &CommandRunner::calculate_nailgun_name(&client_main_class), + nailgun_args, + req.clone(), + ); + let client_req = construct_nailgun_client_request(req, client_main_class, client_args); + trace!("Running request under nailgun:\n {:#?}", &client_req); // Get an instance of a nailgun server for this fingerprint, and then run in its directory. let mut nailgun_process = self @@ -160,14 +175,27 @@ impl super::CommandRunner for CommandRunner { .await .map_err(|e| format!("Failed to connect to nailgun! {}", e))?; + // Prepare the workdir. + let exclusive_spawn = prepare_workdir( + nailgun_process.workdir_path().to_owned(), + &client_req, + client_req.input_digests.input_files, + context.clone(), + self.inner.store.clone(), + self.executor.clone(), + self.named_caches(), + ) + .await?; + let res = self .run_and_capture_workdir( - req, + client_req, context, self.inner.store.clone(), self.executor.clone(), nailgun_process.workdir_path().to_owned(), (nailgun_process.name().to_owned(), nailgun_process.address()), + exclusive_spawn, Platform::current().unwrap(), ) .await; @@ -204,28 +232,20 @@ impl CapturedWorkdir for CommandRunner { workdir_path.to_path_buf() }; - let ParsedJVMCommandLines { - client_args, - client_main_class, - .. - } = ParsedJVMCommandLines::parse_command_lines(&req.argv)?; - let (name, addr) = workdir_token; debug!("Connected to nailgun instance {} at {}...", name, addr); let mut child = { // Run the client request in the nailgun we have active. - let client_req = construct_nailgun_client_request(req, client_main_class, client_args); let cmd = Command { - command: client_req.argv[0].clone(), - args: client_req.argv[1..].to_vec(), - env: client_req + command: req.argv[0].clone(), + args: req.argv[1..].to_vec(), + env: req .env .iter() .map(|(k, v)| (k.clone(), v.clone())) .collect(), working_dir: client_workdir, }; - trace!("Client request: {:#?}", client_req); TcpStream::connect(addr) .and_then(move |stream| { nails::client::handle_connection(nails::Config::default(), stream, cmd, async { diff --git a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs index e7766c0429f..0595eb685a1 100644 --- a/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs +++ b/src/rust/engine/process_execution/src/nailgun/nailgun_pool.rs @@ -1,6 +1,8 @@ // Copyright 2019 Pants project contributors (see CONTRIBUTORS.md). // Licensed under the Apache License, Version 2.0 (see LICENSE). +use std::collections::HashSet; +use std::ffi::OsString; use std::io::{self, BufRead, Read}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use std::os::unix::process::ExitStatusExt; @@ -10,6 +12,7 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use async_lock::{Mutex, MutexGuardArc}; +use futures::future; use hashing::Fingerprint; use lazy_static::lazy_static; use log::{debug, info}; @@ -253,12 +256,44 @@ pub struct NailgunProcess { pub name: String, fingerprint: NailgunProcessFingerprint, workdir: TempDir, + workdir_include_names: HashSet, port: Port, executor: task_executor::Executor, handle: std::process::Child, } -fn read_port(child: &mut std::process::Child) -> Result { +/// Spawn a nailgun process, and read its port from stdout. +/// +/// NB: Uses blocking APIs, so should be backgrounded on an executor. +fn spawn_and_read_port( + process: Process, + workdir: PathBuf, +) -> Result<(std::process::Child, Port), String> { + let cmd = process.argv[0].clone(); + // TODO: This is an expensive operation, and thus we info! it. + // If it becomes annoying, we can downgrade the logging to just debug! + info!( + "Starting new nailgun server with cmd: {:?}, args {:?}, in cwd {}", + cmd, + &process.argv[1..], + workdir.display() + ); + + let mut child = std::process::Command::new(&cmd) + .args(&process.argv[1..]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .env_clear() + .envs(&process.env) + .current_dir(&workdir) + .spawn() + .map_err(|e| { + format!( + "Failed to create child handle with cmd: {} options {:#?}: {}", + &cmd, &process, e + ) + })?; + let stdout = child .stdout .as_mut() @@ -291,13 +326,15 @@ fn read_port(child: &mut std::process::Child) -> Result { } let port_line = port_line?; - let port = &NAILGUN_PORT_REGEX + let port_str = &NAILGUN_PORT_REGEX .captures_iter(&port_line) .next() .ok_or_else(|| format!("Output for nailgun server was unexpected:\n{:?}", port_line))?[1]; - port + let port = port_str .parse::() - .map_err(|e| format!("Error parsing port {}! {}", &port, e)) + .map_err(|e| format!("Error parsing port {}! {}", port_str, e))?; + + Ok((child, port)) } impl NailgunProcess { @@ -316,57 +353,40 @@ impl NailgunProcess { .tempdir_in(workdir_base) .map_err(|err| format!("Error making tempdir for nailgun server: {:?}", err))?; + // Prepare the workdir, and then list it to identify the base set of names which should be + // preserved across runs. TODO: This is less efficient than computing the set of names + // directly from the Process (or returning them from `prepare_workdir`), but it's also much + // simpler. prepare_workdir( workdir.path().to_owned(), &startup_options, + startup_options.input_digests.use_nailgun, context.clone(), store.clone(), executor.clone(), named_caches, ) .await?; - store - .materialize_directory(workdir.path().to_owned(), startup_options.input_files) + let workdir_include_names = list_workdir(workdir.path()).await?; + + // Spawn the process and read its port from stdout. + let (child, port) = executor + .spawn_blocking({ + let workdir = workdir.path().to_owned(); + move || spawn_and_read_port(startup_options, workdir) + }) .await?; - - let cmd = startup_options.argv[0].clone(); - // TODO: This is an expensive operation, and thus we info! it. - // If it becomes annoying, we can downgrade the logging to just debug! - info!( - "Starting new nailgun server with cmd: {:?}, args {:?}, in cwd {:?}", - cmd, - &startup_options.argv[1..], - workdir.path() - ); - let mut child = std::process::Command::new(&cmd) - .args(&startup_options.argv[1..]) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .env_clear() - .envs(&startup_options.env) - .current_dir(&workdir) - .spawn() - .map_err(|e| { - format!( - "Failed to create child handle with cmd: {} options {:#?}: {}", - &cmd, &startup_options, e - ) - })?; - - let port = read_port(&mut child)?; debug!( "Created nailgun server process with pid {} and port {}", child.id(), port ); - // Now that we've started it, clear its directory before the first client can access it. - clear_workdir(workdir.path(), &executor).await?; - Ok(NailgunProcess { port, fingerprint: nailgun_server_fingerprint, workdir, + workdir_include_names, name, executor, handle: child, @@ -446,7 +466,12 @@ impl BorrowedNailgunProcess { .as_ref() .unwrap(); - clear_workdir(process.workdir.path(), &process.executor).await?; + clear_workdir( + &process.executor, + process.workdir.path(), + &process.workdir_include_names, + ) + .await?; // Once we've successfully cleaned up, remove the process. let _ = self.0.take(); @@ -467,7 +492,11 @@ impl Drop for BorrowedNailgunProcess { } } -async fn clear_workdir(workdir: &Path, executor: &Executor) -> Result<(), String> { +async fn clear_workdir( + executor: &Executor, + workdir: &Path, + exclude_names: &HashSet, +) -> Result<(), String> { // Move all content into a temporary directory. let garbage_dir = tempfile::Builder::new() .prefix("process-execution") @@ -478,30 +507,41 @@ async fn clear_workdir(workdir: &Path, executor: &Executor) -> Result<(), String err ) })?; + let moves = list_workdir(workdir) + .await? + .into_iter() + .filter(|n| !exclude_names.contains(n)) + .map(|name| async { + tokio::fs::rename(workdir.join(&name), garbage_dir.path().join(&name)) + .await + .map_err(|e| { + format!( + "Failed to move {} to garbage: {}", + workdir.join(name).display(), + e + ) + }) + }) + .collect::>(); + future::try_join_all(moves).await?; + + // And drop it in the background. + let _ = executor.spawn_blocking(move || std::mem::drop(garbage_dir)); + + Ok(()) +} + +async fn list_workdir(workdir: &Path) -> Result, String> { let mut dir_entries = tokio::fs::read_dir(workdir) .await .map_err(|e| format!("Failed to read nailgun process directory: {}", e))?; + let mut names = HashSet::new(); while let Some(dir_entry) = dir_entries .next_entry() .await .map_err(|e| format!("Failed to read entry in nailgun process directory: {}", e))? { - tokio::fs::rename( - dir_entry.path(), - garbage_dir.path().join(dir_entry.file_name()), - ) - .await - .map_err(|e| { - format!( - "Failed to move {} to garbage: {}", - dir_entry.path().display(), - e - ) - })?; + names.insert(dir_entry.file_name()); } - - // And drop it in the background. - let _ = executor.spawn_blocking(move || std::mem::drop(garbage_dir)); - - Ok(()) + Ok(names) } diff --git a/src/rust/engine/process_execution/src/remote.rs b/src/rust/engine/process_execution/src/remote.rs index 5053a8ebdc1..c36c94e4058 100644 --- a/src/rust/engine/process_execution/src/remote.rs +++ b/src/rust/engine/process_execution/src/remote.rs @@ -791,7 +791,7 @@ impl crate::CommandRunner for CommandRunner { &self.store, command_digest, action_digest, - Some(request.input_files), + Some(request.input_digests.complete), ) .await?; @@ -1030,7 +1030,7 @@ pub fn make_execute_request( let mut action = remexec::Action { command_digest: Some((&digest(&command)?).into()), - input_root_digest: Some((&req.input_files).into()), + input_root_digest: Some((&req.input_digests.complete).into()), ..remexec::Action::default() }; diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index 9ff11e9acc5..bef105431d0 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -23,8 +23,8 @@ use workunit_store::{RunId, WorkunitStore}; use crate::remote::{digest, CommandRunner, ExecutionError, OperationOrStatus}; use crate::{ - CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, Platform, - Process, ProcessCacheScope, ProcessMetadata, + CommandRunner as CommandRunnerTrait, Context, FallibleProcessResultWithPlatform, InputDigests, + Platform, Process, ProcessCacheScope, ProcessMetadata, }; use std::any::type_name; use std::io::Cursor; @@ -76,7 +76,7 @@ async fn make_execute_request() { .into_iter() .collect(), working_directory: None, - input_files: input_directory.digest(), + input_digests: InputDigests::with_input_files(input_directory.digest()), // Intentionally poorly sorted: output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(), output_directories: relative_paths(&["directory/name"]).collect(), @@ -86,7 +86,6 @@ async fn make_execute_request() { append_only_caches: BTreeMap::new(), jdk_home: None, platform_constraint: None, - use_nailgun: EMPTY_DIGEST, execution_slot_variable: None, cache_scope: ProcessCacheScope::Always, }; @@ -153,7 +152,7 @@ async fn make_execute_request_with_instance_name() { .into_iter() .collect(), working_directory: None, - input_files: input_directory.digest(), + input_digests: InputDigests::with_input_files(input_directory.digest()), // Intentionally poorly sorted: output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(), output_directories: relative_paths(&["directory/name"]).collect(), @@ -163,7 +162,6 @@ async fn make_execute_request_with_instance_name() { append_only_caches: BTreeMap::new(), jdk_home: None, platform_constraint: None, - use_nailgun: EMPTY_DIGEST, execution_slot_variable: None, cache_scope: ProcessCacheScope::Always, }; @@ -243,7 +241,7 @@ async fn make_execute_request_with_cache_key_gen_version() { .into_iter() .collect(), working_directory: None, - input_files: input_directory.digest(), + input_digests: InputDigests::with_input_files(input_directory.digest()), // Intentionally poorly sorted: output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(), output_directories: relative_paths(&["directory/name"]).collect(), @@ -253,7 +251,6 @@ async fn make_execute_request_with_cache_key_gen_version() { append_only_caches: BTreeMap::new(), jdk_home: None, platform_constraint: None, - use_nailgun: EMPTY_DIGEST, execution_slot_variable: None, cache_scope: ProcessCacheScope::Always, }; @@ -331,7 +328,7 @@ async fn make_execute_request_with_jdk() { let mut req = Process::new(owned_string_vec(&["/bin/echo", "yo"])); req.jdk_home = Some(PathBuf::from("/tmp")); req.description = "some description".to_owned(); - req.input_files = input_directory.digest(); + req.input_digests = InputDigests::with_input_files(input_directory.digest()); let want_command = remexec::Command { arguments: vec!["/bin/echo".to_owned(), "yo".to_owned()], @@ -387,7 +384,7 @@ async fn make_execute_request_with_jdk() { async fn make_execute_request_with_jdk_and_extra_platform_properties() { let input_directory = TestDirectory::containing_roland(); let mut req = Process::new(owned_string_vec(&["/bin/echo", "yo"])); - req.input_files = input_directory.digest(); + req.input_digests = InputDigests::with_input_files(input_directory.digest()); req.description = "some description".to_owned(); req.jdk_home = Some(PathBuf::from("/tmp")); @@ -480,7 +477,7 @@ async fn make_execute_request_with_timeout() { .into_iter() .collect(), working_directory: None, - input_files: input_directory.digest(), + input_digests: InputDigests::with_input_files(input_directory.digest()), // Intentionally poorly sorted: output_files: relative_paths(&["path/to/file.ext", "other/file.ext"]).collect(), output_directories: relative_paths(&["directory/name"]).collect(), @@ -490,7 +487,6 @@ async fn make_execute_request_with_timeout() { append_only_caches: BTreeMap::new(), jdk_home: None, platform_constraint: None, - use_nailgun: EMPTY_DIGEST, execution_slot_variable: None, cache_scope: ProcessCacheScope::Always, }; @@ -2367,7 +2363,8 @@ pub(crate) fn assert_contains(haystack: &str, needle: &str) { pub(crate) fn cat_roland_request() -> Process { let argv = owned_string_vec(&["/bin/cat", "roland.ext"]); let mut process = Process::new(argv); - process.input_files = TestDirectory::containing_roland().digest(); + process.input_digests = + InputDigests::with_input_files(TestDirectory::containing_roland().digest()); process.timeout = one_second(); process.description = "cat a roland".to_string(); process diff --git a/src/rust/engine/process_executor/src/main.rs b/src/rust/engine/process_executor/src/main.rs index 23f0d1341e6..8eeb7773320 100644 --- a/src/rust/engine/process_executor/src/main.rs +++ b/src/rust/engine/process_executor/src/main.rs @@ -34,7 +34,9 @@ use std::time::Duration; use fs::RelativePath; use hashing::{Digest, Fingerprint, EMPTY_DIGEST}; -use process_execution::{Context, NamedCaches, Platform, ProcessCacheScope, ProcessMetadata}; +use process_execution::{ + Context, InputDigests, NamedCaches, Platform, ProcessCacheScope, ProcessMetadata, +}; use prost::Message; use protos::gen::build::bazel::remote::execution::v2::{Action, Command}; use protos::gen::buildbarn::cas::UncachedActionResult; @@ -372,7 +374,7 @@ async fn make_request( args.buildbarn_url.as_ref(), ) { (Some(input_digest), Some(input_digest_length), None, None, None) => { - make_request_from_flat_args(args, Digest::new(input_digest, input_digest_length)) + make_request_from_flat_args(store, args, Digest::new(input_digest, input_digest_length)).await } (None, None, Some(action_fingerprint), Some(action_digest_length), None) => { @@ -390,9 +392,10 @@ async fn make_request( } } -fn make_request_from_flat_args( +async fn make_request_from_flat_args( + store: &Store, args: &Opt, - input_root_digest: Digest, + input_files: Digest, ) -> Result<(process_execution::Process, ProcessMetadata), String> { let output_files = args .command @@ -428,11 +431,15 @@ fn make_request_from_flat_args( _ => EMPTY_DIGEST, }; + let input_digests = InputDigests::new(store, input_files, use_nailgun) + .await + .map_err(|e| format!("Could not create input digest for process: {:?}", e))?; + let process = process_execution::Process { argv: args.command.argv.clone(), env: collection_from_keyvalues(args.command.env.iter()), working_directory, - input_files: input_root_digest, + input_digests, output_files, output_directories, timeout: Some(Duration::new(15 * 60, 0)), @@ -441,7 +448,6 @@ fn make_request_from_flat_args( append_only_caches: BTreeMap::new(), jdk_home: args.command.jdk.clone(), platform_constraint: None, - use_nailgun, execution_slot_variable: None, cache_scope: ProcessCacheScope::Always, }; @@ -492,12 +498,14 @@ async fn extract_request_from_action_digest( ) }; - let input_files = require_digest(&action.input_root_digest) - .map_err(|err| format!("Bad input root digest: {:?}", err))?; + let input_digests = InputDigests::with_input_files( + require_digest(&action.input_root_digest) + .map_err(|err| format!("Bad input root digest: {:?}", err))?, + ); // In case the local Store doesn't have the input root Directory, // have it fetch it and identify it as a Directory, so that it doesn't get confused about the unknown metadata. - store.load_directory_or_err(input_files).await?; + store.load_directory_or_err(input_digests.complete).await?; let process = process_execution::Process { argv: command.arguments, @@ -507,7 +515,7 @@ async fn extract_request_from_action_digest( .map(|env| (env.name.clone(), env.value.clone())) .collect(), working_directory, - input_files, + input_digests, output_files: command .output_files .iter() @@ -527,7 +535,6 @@ async fn extract_request_from_action_digest( append_only_caches: BTreeMap::new(), jdk_home: None, platform_constraint: None, - use_nailgun: EMPTY_DIGEST, cache_scope: ProcessCacheScope::Always, }; diff --git a/src/rust/engine/src/context.rs b/src/rust/engine/src/context.rs index 4ce1657df2f..6c26927132f 100644 --- a/src/rust/engine/src/context.rs +++ b/src/rust/engine/src/context.rs @@ -183,9 +183,12 @@ impl Core { local_execution_root_dir.to_path_buf(), store.clone(), executor.clone(), - // TODO: The nailgun pool size should almost certainly be configurable independent - // of concurrency, along with per-instance memory usage. - exec_strategy_opts.local_parallelism, + // We set the nailgun pool size to twice the number that will ever be active in order to + // keep warm but idle processes for task fingerprints which are not currently busy (e.g. + // while busy running scalac, keeping some javac processes idle). + // TODO: The nailgun pool size should be configurable independent of concurrency, along + // with per-instance memory usage. See https://github.com/pantsbuild/pants/issues/13067. + exec_strategy_opts.local_parallelism * 2, )) } else { Box::new(local_command_runner) diff --git a/src/rust/engine/src/intrinsics.rs b/src/rust/engine/src/intrinsics.rs index 835bbe0586a..63dac404c59 100644 --- a/src/rust/engine/src/intrinsics.rs +++ b/src/rust/engine/src/intrinsics.rs @@ -168,18 +168,12 @@ impl Intrinsics { fn process_request_to_process_result( context: Context, - args: Vec, + mut args: Vec, ) -> BoxFuture<'static, NodeResult> { async move { - let process_request = Python::with_gil(|py| { - let py_process = (*args[0]).as_ref(py); - ExecuteProcess::lift(py_process).map_err(|str| { - throw(format!( - "Error lifting MultiPlatformExecuteProcess: {}", - str - )) - }) - })?; + let process_request = ExecuteProcess::lift(&context.core.store(), args.pop().unwrap()) + .map_err(|e| throw(format!("Error lifting MultiPlatformExecuteProcess: {}", e))) + .await?; let result = context.get(process_request).await?.0; diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index c2d6013651e..7e3da389c4a 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -31,14 +31,15 @@ use fs::{ Vfs, }; use process_execution::{ - self, CacheDest, CacheName, Platform, Process, ProcessCacheScope, ProcessResultSource, + self, CacheDest, CacheName, InputDigests, Platform, Process, ProcessCacheScope, + ProcessResultSource, }; use crate::externs::engine_aware::{EngineAwareParameter, EngineAwareReturnType}; use crate::externs::fs::PyFileDigest; use graph::{Entry, Node, NodeError, NodeVisualizer}; use hashing::Digest; -use store::{self, StoreFileByDigest}; +use store::{self, Store, StoreFileByDigest}; use workunit_store::{ in_workunit, Level, Metric, ObservationMetric, RunningWorkunit, UserMetadataItem, WorkunitMetadata, @@ -282,17 +283,32 @@ pub struct ExecuteProcess { } impl ExecuteProcess { - fn lift_process(value: &PyAny) -> Result { + async fn lift_process_input_digests( + store: &Store, + value: &Value, + ) -> Result { + let input_digests_fut: Result<_, String> = Python::with_gil(|py| { + let value = (**value).as_ref(py); + let input_files = lift_directory_digest(externs::getattr(value, "input_digest").unwrap()) + .map_err(|err| format!("Error parsing input_digest {}", err))?; + let use_nailgun = lift_directory_digest(externs::getattr(value, "use_nailgun").unwrap()) + .map_err(|err| format!("Error parsing use_nailgun {}", err))?; + + Ok(InputDigests::new(store, input_files, use_nailgun)) + }); + + input_digests_fut? + .await + .map_err(|e| format!("Failed to merge input digests for process: {:?}", e)) + } + + fn lift_process(value: &PyAny, input_digests: InputDigests) -> Result { let env = externs::getattr_from_str_frozendict(value, "env"); let working_directory = match externs::getattr_as_optional_string(value, "working_directory") { None => None, Some(dir) => Some(RelativePath::new(dir)?), }; - let py_digest = externs::getattr(value, "input_digest").unwrap(); - let digest = lift_directory_digest(py_digest) - .map_err(|err| format!("Error parsing input_digest {}", err))?; - let output_files = externs::getattr::>(value, "output_files") .unwrap() .into_iter() @@ -324,10 +340,6 @@ impl ExecuteProcess { let jdk_home = externs::getattr_as_optional_string(value, "jdk_home").map(PathBuf::from); - let py_use_nailgun = externs::getattr(value, "use_nailgun").unwrap(); - let use_nailgun = lift_directory_digest(py_use_nailgun) - .map_err(|err| format!("Error parsing use_nailgun {}", err))?; - let execution_slot_variable = externs::getattr_as_optional_string(value, "execution_slot_variable"); @@ -349,7 +361,7 @@ impl ExecuteProcess { argv: externs::getattr(value, "argv").unwrap(), env, working_directory, - input_files: digest, + input_digests, output_files, output_directories, timeout, @@ -358,15 +370,14 @@ impl ExecuteProcess { append_only_caches, jdk_home, platform_constraint, - use_nailgun, execution_slot_variable, cache_scope, }) } - pub fn lift(value: &PyAny) -> Result { - let process = Self::lift_process(value)?; - + pub async fn lift(store: &Store, value: Value) -> Result { + let input_digests = Self::lift_process_input_digests(store, &value).await?; + let process = Python::with_gil(|py| Self::lift_process((*value).as_ref(py), input_digests))?; Ok(Self { process }) } }