Skip to content

Commit

Permalink
[jvm] Split nailgun digest from input file digests (pantsbuild#13813)
Browse files Browse the repository at this point in the history
As discussed in pantsbuild#13787, the `input_files` digest must currently include the `use_nailgun` digest, and that means that the input files for the server are materialized for every run.
* Add an `InputDigests` struct to encapsulate managing the collection of input digests for a `Process` (which will soon also include the immutable digests from pantsbuild#12716), and split the `use_nailgun` digest from the `input_files` digest.
* Move nailgun spawning (which uses `std::process` and `std::fs`, and so is synchronous) onto the `Executor`.
* Adjust the (still hardcoded) nailgun pool size to keep idle servers beyond the number that are currently active (e.g. to allow for idle `javac` processes while `scalac` processes are active).

Collectively, these changes make compilation 40% faster.

Fixes pantsbuild#13787.
  • Loading branch information
stuhood authored Dec 6, 2021
1 parent 559a086 commit 3442d08
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 195 deletions.
1 change: 0 additions & 1 deletion src/python/pants/backend/java/compile/javac.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ async def compile_java_source(
(
prefixed_direct_dependency_classpath_digest,
dest_dir_digest,
jdk_setup.digest,
*(
sources.snapshot.digest
for _, sources in component_members_and_java_source_files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,6 @@ async def analyze_java_source_dependencies(
)
),
)
merged_digest = await Get(
Digest,
MergeDigests(
(
tool_digest,
prefixed_source_files_digest,
)
),
)

analysis_output_path = "__source_analysis.json"

Expand All @@ -132,7 +123,7 @@ async def analyze_java_source_dependencies(
analysis_output_path,
source_path,
],
input_digest=merged_digest,
input_digest=prefixed_source_files_digest,
output_files=(analysis_output_path,),
use_nailgun=tool_digest,
append_only_caches=jdk_setup.append_only_caches,
Expand Down
29 changes: 15 additions & 14 deletions src/python/pants/backend/scala/compile/scalac.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,19 @@ async def compile_scala_source(
Digest, AddPrefix(merged_transitive_dependency_classpath_entries_digest, usercp)
)

merged_digest = await Get(
Digest,
MergeDigests(
(
prefixed_transitive_dependency_classpath_digest,
tool_classpath.digest,
jdk_setup.digest,
*(
sources.snapshot.digest
for _, sources in component_members_and_scala_source_files
),
)
merged_tool_digest, merged_input_digest = await MultiGet(
Get(Digest, MergeDigests((tool_classpath.digest, jdk_setup.digest))),
Get(
Digest,
MergeDigests(
(
prefixed_transitive_dependency_classpath_digest,
*(
sources.snapshot.digest
for _, sources in component_members_and_scala_source_files
),
)
),
),
)

Expand All @@ -183,8 +184,8 @@ async def compile_scala_source(
)
),
],
input_digest=merged_digest,
use_nailgun=jdk_setup.digest,
input_digest=merged_input_digest,
use_nailgun=merged_tool_digest,
output_files=(output_file,),
description=f"Compile {request.component} with scalac",
level=LogLevel.DEBUG,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,15 +256,6 @@ async def analyze_scala_source_dependencies(
)
),
)
merged_digest = await Get(
Digest,
MergeDigests(
(
tool_digest,
prefixed_source_files_digest,
)
),
)

analysis_output_path = "__source_analysis.json"

Expand All @@ -277,7 +268,7 @@ async def analyze_scala_source_dependencies(
analysis_output_path,
source_path,
],
input_digest=merged_digest,
input_digest=prefixed_source_files_digest,
output_files=(analysis_output_path,),
use_nailgun=tool_digest,
append_only_caches=jdk_setup.append_only_caches,
Expand Down
74 changes: 63 additions & 11 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use log::Level;
use protos::gen::build::bazel::remote::execution::v2 as remexec;
use remexec::ExecutedActionMetadata;
use serde::{Deserialize, Serialize};
use store::{SnapshotOps, SnapshotOpsError, Store};
use workunit_store::{in_workunit, RunId, RunningWorkunit, WorkunitMetadata, WorkunitStore};

pub mod cache;
Expand Down Expand Up @@ -177,6 +178,63 @@ fn serialize_level<S: serde::Serializer>(level: &log::Level, s: S) -> Result<S::
s.serialize_str(&level.to_string())
}

///
/// Input Digests for a process execution. The `complete` Digest is the computed union of all
/// inputs: the rest of the Digests should be disjoint (or have identical contents where they
/// overlap).
///
#[derive(Clone, Debug, Hash, Eq, PartialEq, Serialize)]
pub struct InputDigests {
///
/// All of the input Digests, merged.
///
pub complete: Digest,

///
/// The input files for the process execution, which will be materialize as mutable inputs in a
/// sandbox for the process.
///
pub input_files: Digest,

///
/// If non-empty, the Digest of a nailgun server to use to attempt to spawn the Process.
///
pub use_nailgun: Digest,
}

impl InputDigests {
pub async fn new(
store: &Store,
input_files: Digest,
use_nailgun: Digest,
) -> Result<Self, SnapshotOpsError> {
let complete = store.merge(vec![input_files, use_nailgun]).await?;
Ok(Self {
complete,
input_files,
use_nailgun,
})
}

pub fn with_input_files(input_files: Digest) -> Self {
Self {
complete: input_files,
input_files,
use_nailgun: hashing::EMPTY_DIGEST,
}
}
}

impl Default for InputDigests {
fn default() -> Self {
Self {
complete: hashing::EMPTY_DIGEST,
input_files: hashing::EMPTY_DIGEST,
use_nailgun: hashing::EMPTY_DIGEST,
}
}
}

///
/// A process to be executed.
///
Expand Down Expand Up @@ -206,7 +264,10 @@ pub struct Process {
///
pub working_directory: Option<RelativePath>,

pub input_files: hashing::Digest,
///
/// All of the input digests for the process.
///
pub input_digests: InputDigests,

pub output_files: BTreeSet<RelativePath>,

Expand Down Expand Up @@ -251,14 +312,6 @@ pub struct Process {

pub platform_constraint: Option<Platform>,

///
/// If non-empty, the Digest of a nailgun server to use to attempt to spawn the Process.
///
/// TODO: Currently this Digest must be a subset of the `input_digest`, but we should consider
/// making it disjoint, and then automatically merging it.
///
pub use_nailgun: Digest,

pub cache_scope: ProcessCacheScope,
}

Expand All @@ -278,7 +331,7 @@ impl Process {
argv,
env: BTreeMap::new(),
working_directory: None,
input_files: hashing::EMPTY_DIGEST,
input_digests: InputDigests::default(),
output_files: BTreeSet::new(),
output_directories: BTreeSet::new(),
timeout: None,
Expand All @@ -287,7 +340,6 @@ impl Process {
append_only_caches: BTreeMap::new(),
jdk_home: None,
platform_constraint: None,
use_nailgun: hashing::EMPTY_DIGEST,
execution_slot_variable: None,
cache_scope: ProcessCacheScope::Successful,
}
Expand Down
29 changes: 16 additions & 13 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,18 @@ impl super::CommandRunner for CommandRunner {
}
};

// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
workdir_path.clone(),
&req,
req.input_digests.complete,
context.clone(),
self.store.clone(),
self.executor.clone(),
self.named_caches(),
)
.await?;

workunit.increment_counter(Metric::LocalExecutionRequests, 1);
let res = self
.run_and_capture_workdir(
Expand All @@ -291,6 +303,7 @@ impl super::CommandRunner for CommandRunner {
self.executor.clone(),
workdir_path.clone(),
(),
exclusive_spawn,
self.platform(),
)
.map_err(|msg| {
Expand Down Expand Up @@ -453,21 +466,11 @@ pub trait CapturedWorkdir {
executor: task_executor::Executor,
workdir_path: PathBuf,
workdir_token: Self::WorkdirToken,
exclusive_spawn: bool,
platform: Platform,
) -> Result<FallibleProcessResultWithPlatform, String> {
let start_time = Instant::now();

// Prepare the workdir.
let exclusive_spawn = prepare_workdir(
workdir_path.clone(),
&req,
context.clone(),
store.clone(),
executor.clone(),
self.named_caches(),
)
.await?;

// Spawn the process.
// NB: We fully buffer up the `Stream` above into final `ChildResults` below and so could
// instead be using `CommandExt::output_async` above to avoid the `ChildResults::collect_from`
Expand Down Expand Up @@ -602,6 +605,7 @@ pub trait CapturedWorkdir {
pub async fn prepare_workdir(
workdir_path: PathBuf,
req: &Process,
input_digest: hashing::Digest,
context: Context,
store: Store,
executor: task_executor::Executor,
Expand All @@ -627,7 +631,6 @@ pub async fn prepare_workdir(
// non-determinism when paths overlap.
let store2 = store.clone();
let workdir_path_2 = workdir_path.clone();
let input_files = req.input_files;
in_workunit!(
context.workunit_store.clone(),
"setup_sandbox".to_owned(),
Expand All @@ -637,7 +640,7 @@ pub async fn prepare_workdir(
},
|_workunit| async move {
store2
.materialize_directory(workdir_path_2, input_files)
.materialize_directory(workdir_path_2, input_digest)
.await
},
)
Expand Down
6 changes: 3 additions & 3 deletions src/rust/engine/process_execution/src/local_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use testutil;

use crate::{
CacheDest, CacheName, CommandRunner as CommandRunnerTrait, Context,
FallibleProcessResultWithPlatform, NamedCaches, Platform, Process, RelativePath,
FallibleProcessResultWithPlatform, InputDigests, NamedCaches, Platform, Process, RelativePath,
};
use hashing::EMPTY_DIGEST;
use shell_quote::bash;
Expand Down Expand Up @@ -377,7 +377,7 @@ async fn test_directory_preservation() {

let mut process =
Process::new(argv.clone()).output_files(relative_paths(&["roland.ext"]).collect());
process.input_files = TestDirectory::nested().digest();
process.input_digests = InputDigests::with_input_files(TestDirectory::nested().digest());
process.working_directory = Some(RelativePath::new("cats").unwrap());

let result = run_command_locally_in_dir(
Expand Down Expand Up @@ -553,7 +553,7 @@ async fn working_directory() {
let mut process = Process::new(vec![find_bash(), "-c".to_owned(), "/bin/ls".to_string()]);
process.working_directory = Some(RelativePath::new("cats").unwrap());
process.output_directories = relative_paths(&["roland.ext"]).collect::<BTreeSet<_>>();
process.input_files = TestDirectory::nested().digest();
process.input_digests = InputDigests::with_input_files(TestDirectory::nested().digest());
process.timeout = one_second();
process.description = "confused-cat".to_string();

Expand Down
Loading

0 comments on commit 3442d08

Please sign in to comment.