Skip to content

Commit

Permalink
process_executor accepts action digests and buildbarn links (pantsbui…
Browse files Browse the repository at this point in the history
…ld#11283)

This allows you to take an action which ran (perhaps because you have metadata from the remote cluster) and reproduce it, locally or remotely. It also:
* Allows specifying prefixes for commands, so you can run under `strace` or similar.
* Allows specifying a CAS server without an execution server, so that you can fetch remote digests but perform execution locally.
* Broadens `expand_local_digests` to `expand_digests` with a local-only mode, so that if you're fetching the digest from a remote CAS, you don't get errors complaining that things it references aren't available locally.
  • Loading branch information
illicitonion authored Jan 13, 2021
1 parent a3f6119 commit e80767f
Show file tree
Hide file tree
Showing 10 changed files with 641 additions and 87 deletions.
63 changes: 60 additions & 3 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

48 changes: 35 additions & 13 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,9 @@ impl Store {
let store = self.clone();
let remote = remote.clone();
async move {
let ingested_digests = store.expand_local_digests(digests.iter(), false).await?;
let ingested_digests = store
.expand_digests(digests.iter(), LocalMissingBehavior::Fetch)
.await?;
let digests_to_upload =
if Store::upload_is_faster_than_checking_whether_to_upload(&ingested_digests) {
ingested_digests.keys().cloned().collect()
Expand Down Expand Up @@ -678,7 +680,9 @@ impl Store {
&self,
digests: Ds,
) -> Result<(), String> {
let reachable_digests_and_types = self.expand_local_digests(digests, true).await?;
let reachable_digests_and_types = self
.expand_digests(digests, LocalMissingBehavior::Ignore)
.await?;
self
.local
.lease_all(reachable_digests_and_types.into_iter())
Expand Down Expand Up @@ -727,15 +731,18 @@ impl Store {
}

///
/// Return all Digests reachable locally from the given root Digests (which may represent either
/// Return all Digests reachable from the given root Digests (which may represent either
/// Files or Directories).
///
/// If `allow_missing`, root digests which do not exist locally will be ignored.
/// `missing_behavior` defines what to do if the digests are not available locally.
///
/// If `missing_behavior` is `Fetch`, and one of the explicitly passed Digests was of a Directory
/// which was not known locally, this function may return an error.
///
pub async fn expand_local_digests<'a, Ds: Iterator<Item = &'a Digest>>(
pub async fn expand_digests<'a, Ds: Iterator<Item = &'a Digest>>(
&self,
digests: Ds,
allow_missing: bool,
missing_behavior: LocalMissingBehavior,
) -> Result<HashMap<Digest, EntryType>, String> {
// Expand each digest into either a single file digest, or a collection of recursive digests
// below a directory.
Expand All @@ -747,17 +754,21 @@ impl Store {
match store.local.entry_type(digest.0).await {
Ok(Some(EntryType::File)) => Ok(Either::Left(*digest)),
Ok(Some(EntryType::Directory)) => {
// Locally expand the directory.
let reachable = store.into_local_only().expand_directory(*digest).await?;
let store_for_expanding = match missing_behavior {
LocalMissingBehavior::Fetch => store,
LocalMissingBehavior::Error | LocalMissingBehavior::Ignore => {
store.into_local_only()
}
};
let reachable = store_for_expanding.expand_directory(*digest).await?;
Ok(Either::Right(reachable))
}
Ok(None) => {
if allow_missing {
Ok(Either::Right(HashMap::new()))
} else {
Ok(None) => match missing_behavior {
LocalMissingBehavior::Ignore => Ok(Either::Right(HashMap::new())),
LocalMissingBehavior::Fetch | LocalMissingBehavior::Error => {
Err(format!("Failed to expand digest {:?}: Not found", digest))
}
}
},
Err(err) => Err(format!("Failed to expand digest {:?}: {:?}", digest, err)),
}
}
Expand Down Expand Up @@ -1108,6 +1119,17 @@ impl Store {
}
}

/// Behavior in case a needed digest is missing in the local store.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LocalMissingBehavior {
/// Hard error that the digest is missing.
Error,
/// Attempt to fetch the digest from a remote, if one is present, and error if it couldn't be found.
Fetch,
/// Ignore the digest being missing, and try to proceed regardless.
Ignore,
}

#[async_trait]
impl StoreWrapper for Store {
async fn load_file_bytes_with<T: Send + 'static, F: Fn(&[u8]) -> T + Send + Sync + 'static>(
Expand Down
2 changes: 2 additions & 0 deletions src/rust/engine/process_execution/bazel_protos/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
&[
"protos/bazelbuild_remote-apis/build/bazel/remote/execution/v2/remote_execution.proto",
"protos/bazelbuild_remote-apis/build/bazel/semver/semver.proto",
"protos/buildbarn/cas.proto",
"protos/googleapis/google/bytestream/bytestream.proto",
"protos/googleapis/google/rpc/code.proto",
"protos/googleapis/google/rpc/error_details.proto",
Expand All @@ -24,6 +25,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
],
&[
"protos/bazelbuild_remote-apis",
"protos/buildbarn",
"protos/googleapis",
"protos/standard",
],
Expand Down
Loading

0 comments on commit e80767f

Please sign in to comment.