Skip to content

Commit

Permalink
Immutably hardlink "large" files in a sandbox (pantsbuild#17520)
Browse files Browse the repository at this point in the history
Fixes pantsbuild#17282 and fixes pantsbuild#14070

This change represents the smallest footprint change to get support in for treating "large" files as immutable inputs.

- `immutable_inputs.rs` has been moved to `store` (to avoid circular reference)
- An additional method was added to support a hardlink _file_
- Directory materialization takes an `ImmutableInputs` ref and a list of paths to ensure mutable
- When materializing a file, if its above our threshold and not being forced mutable, we hardlink it to the immutable inputs
- Process running seeds the mutable paths with the capture outputs

The future is primed for changes like:
- Eventually removing the `immutable_input_digests` to a process, and letting the heuristic take over
- And then cleaning the code up after that's ripped out
- Adding more facilities to includelist/excludelist files from a `Process` object (e.g. we could includelist most/all PEXs since those shouldn't be mutated and we'd just have one top-level hardlink)
- Have a directory huerstic
- IDK more shenanigans 😄 

Tested 3 ways:
- `./pants --keep-sandboxes=always <something>` and inspected the sandbox between 2 different runs using the same daemon and ensured the hardlink
- Crafted an `experimental_shell_command` with a file in `outputs` that matches a large file and ensured the file in the sandbox wasn't hardlinked
- Crafted an `experimental_shell_command` with a dir in `outputs` that matches the containing dir of a large file and ensured the file in the sandbox wasn't hardlinked
  • Loading branch information
thejcannon authored Dec 6, 2022
1 parent 5c65d4d commit f8df117
Show file tree
Hide file tree
Showing 19 changed files with 380 additions and 86 deletions.
1 change: 1 addition & 0 deletions conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright 2022 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).


import pytest


Expand Down
18 changes: 15 additions & 3 deletions src/rust/engine/fs/fs_util/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use hashing::{Digest, Fingerprint};
use parking_lot::Mutex;
use protos::require_digest;
use serde_derive::Serialize;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use store::{
Snapshot, SnapshotOps, Store, StoreError, StoreFileByDigest, SubsetParams, UploadSummary,
};
Expand Down Expand Up @@ -535,7 +535,13 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
output_digest_opt.ok_or_else(|| ExitError("not found".into(), ExitCode::NotFound))?;
Ok(
store
.materialize_directory(destination, output_digest, Permissions::Writable)
.materialize_directory(
destination,
output_digest,
&BTreeSet::new(),
None,
Permissions::Writable,
)
.await?,
)
}
Expand All @@ -553,7 +559,13 @@ async fn execute(top_match: &clap::ArgMatches) -> Result<(), ExitError> {
let digest = DirectoryDigest::from_persisted_digest(Digest::new(fingerprint, size_bytes));
Ok(
store
.materialize_directory(destination, digest, Permissions::Writable)
.materialize_directory(
destination,
digest,
&BTreeSet::new(),
None,
Permissions::Writable,
)
.await?,
)
}
Expand Down
8 changes: 2 additions & 6 deletions src/rust/engine/fs/src/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,12 +780,8 @@ impl DigestTrie {
let mut prefix_iter = prefix.iter();
let mut tree = self;
while let Some(parent) = prefix_iter.next_back() {
let directory = Directory {
name: first_path_component_to_name(parent.as_ref())?,
digest: tree.compute_root_digest(),
tree,
};

let directory =
Directory::from_digest_tree(first_path_component_to_name(parent.as_ref())?, tree);
tree = DigestTrie(vec![Entry::Directory(directory)].into());
}

Expand Down
16 changes: 13 additions & 3 deletions src/rust/engine/fs/store/benches/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

use criterion::{criterion_group, criterion_main, Criterion};

use std::collections::HashSet;
use std::collections::{BTreeSet, HashSet};
use std::io::{BufRead, BufReader, BufWriter, Write};
use std::os::unix::ffi::OsStrExt;
use std::path::PathBuf;
Expand All @@ -43,7 +43,7 @@ use protos::gen::build::bazel::remote::execution::v2 as remexec;
use task_executor::Executor;
use tempfile::TempDir;

use store::{OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store, SubsetParams};
use store::{ImmutableInputs, OneOffStoreFileByDigest, Snapshot, SnapshotOps, Store, SubsetParams};

fn executor() -> Executor {
Executor::global(num_cpus::get(), num_cpus::get() * 4, || ()).unwrap()
Expand All @@ -62,6 +62,10 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) {
let (store, _tempdir, digest) = snapshot(&executor, count, size);
let parent_dest = TempDir::new().unwrap();
let parent_dest_path = parent_dest.path();
let immutable_inputs_dest = TempDir::new().unwrap();
let immutable_inputs_path = immutable_inputs_dest.path();
let immutable_inputs = ImmutableInputs::new(store.clone(), immutable_inputs_path).unwrap();

cgroup
.sample_size(10)
.measurement_time(Duration::from_secs(30))
Expand All @@ -74,7 +78,13 @@ pub fn criterion_benchmark_materialize(c: &mut Criterion) {
let dest = new_temp.path().to_path_buf();
std::mem::forget(new_temp);
let _ = executor
.block_on(store.materialize_directory(dest, digest.clone(), perms))
.block_on(store.materialize_directory(
dest,
digest.clone(),
&BTreeSet::new(),
Some(&immutable_inputs),
perms,
))
.unwrap();
})
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::path::{Path, PathBuf};
use std::sync::Arc;

use async_oncecell::OnceCell;
use fs::{DirectoryDigest, Permissions, RelativePath};
use hashing::Digest;
use parking_lot::Mutex;
use store::{Store, StoreError};
use tempfile::TempDir;

use crate::WorkdirSymlink;
use crate::{Store, StoreError};

/// A symlink from a relative src to an absolute dst (outside of the workdir).
#[derive(Debug)]
pub struct WorkdirSymlink {
pub src: RelativePath,
pub dst: PathBuf,
}

struct Inner {
store: Store,
Expand Down Expand Up @@ -49,7 +55,74 @@ impl ImmutableInputs {
}

/// Returns an absolute Path to immutably consume the given Digest from.
async fn path(&self, directory_digest: DirectoryDigest) -> Result<PathBuf, StoreError> {
pub(crate) async fn path_for_file(
&self,
digest: Digest,
is_executable: bool,
) -> Result<PathBuf, StoreError> {
let cell = self.0.contents.lock().entry(digest).or_default().clone();

// We (might) need to initialize the value.
//
// Because this code executes a side-effect which could be observed elsewhere within this
// process (other threads can observe the contents of the temporary directory), we need to
// ensure that if this method is cancelled (via async Drop), whether the cell has been
// initialized or not stays in sync with whether the side-effect is visible.
//
// Making the initialization "cancellation safe", involves either:
//
// 1. Adding a Drop guard to "undo" the side-effect if we're dropped before we fully
// initialize the cell.
// * This is challenging to do correctly in this case, because the `Drop` guard cannot
// be created until after initialization begins, but cannot be cleared until after the
// cell has been initialized (i.e., after `get_or_try_init` returns).
// 2. Shielding ourselves from cancellation by `spawn`ing a new Task to guarantee that the
// cell initialization always runs to completion.
// * This would work, but would mean that we would finish initializing cells even when
// work was cancelled. Cancellation usually means that the work is no longer necessary,
// and so that could result in a lot of spurious IO (in e.g. warm cache cases which
// never end up actually needing any inputs).
// * An advanced variant of this approach would be to _pause_ work on materializing a
// Digest when demand for it disappeared, and resume the work if another caller
// requested that Digest.
// 3. Using anonymous destination paths, such that multiple attempts to initialize cannot
// collide.
// * This means that although the side-effect is visible, it can never collide.
//
// We take the final approach here currently (for simplicity's sake), but the advanced variant
// of approach 2 might eventually be worthwhile.
cell
.get_or_try_init(async {
let chroot = TempDir::new_in(self.0.workdir.path()).map_err(|e| {
format!(
"Failed to create a temporary directory for materialization of immutable input \
digest {:?}: {}",
digest, e
)
})?;

let dest = chroot.path().join(digest.hash.to_hex());
self
.0
.store
.materialize_file(dest.clone(), digest, Permissions::ReadOnly, is_executable)
.await?;

// Now that we've successfully initialized the destination, forget the TempDir so that it
// is not cleaned up.
let _ = chroot.into_path();

Ok(dest)
})
.await
.cloned()
}

/// Returns an absolute Path to immutably consume the given Digest from.
pub(crate) async fn path_for_dir(
&self,
directory_digest: DirectoryDigest,
) -> Result<PathBuf, StoreError> {
let digest = directory_digest.as_digest();
let cell = self.0.contents.lock().entry(digest).or_default().clone();

Expand Down Expand Up @@ -96,7 +169,13 @@ impl ImmutableInputs {
self
.0
.store
.materialize_directory(dest.clone(), directory_digest, Permissions::ReadOnly)
.materialize_directory(
dest.clone(),
directory_digest,
&BTreeSet::new(),
Some(self),
Permissions::ReadOnly,
)
.await?;

// Now that we've successfully initialized the destination, forget the TempDir so that it
Expand All @@ -112,14 +191,14 @@ impl ImmutableInputs {
///
/// Returns symlinks to create for the given set of immutable cache paths.
///
pub(crate) async fn local_paths(
pub async fn local_paths(
&self,
immutable_inputs: &BTreeMap<RelativePath, DirectoryDigest>,
) -> Result<Vec<WorkdirSymlink>, StoreError> {
let dsts = futures::future::try_join_all(
immutable_inputs
.values()
.map(|d| self.path(d.clone()))
.map(|d| self.path_for_dir(d.clone()))
.collect::<Vec<_>>(),
)
.await?;
Expand Down
Loading

0 comments on commit f8df117

Please sign in to comment.