Skip to content

Commit

Permalink
Stream large remote cache downloads directly to disk (pantsbuild#18054)
Browse files Browse the repository at this point in the history
This fixes pantsbuild#17065 by having remote cache loads be able to be streamed to
disk. In essence, the remote store now has a `load_file` method in addition to
`load_bytes`, and thus the caller can decide to download to a file instead.

This doesn't make progress towards pantsbuild#18048 (this PR doesn't touch the local
store at all), but I think it will help with integrating the remote store with
that code: in theory the `File` could be provided in a way that can be part of
the "large file pool" directly (and indeed, the decision about whether to
download to a file or into memory ties into that).

This also does a theoretically unnecessary extra pass over the data (as
discussed in pantsbuild#18231) to verify the digest, but I think it'd make sense to do
that as a future optimisation, since it'll require refactoring more deeply
(down into `sharded_lmdb` and `hashing`, I think) and is best to build on
pantsbuild#18153 once that lands.
  • Loading branch information
huonw authored Feb 12, 2023
1 parent 4f986e9 commit 7631913
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 82 deletions.
78 changes: 51 additions & 27 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use std::fmt::{self, Debug, Display};
use std::fs::hard_link;
use std::fs::OpenOptions;
use std::future::Future;
use std::io::{self, Read, Write};
use std::io::{self, Read, Seek, Write};
use std::os::unix::fs::{symlink, OpenOptionsExt, PermissionsExt};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -237,30 +237,60 @@ impl RemoteStore {
}

/// Download the digest to the local byte store from this remote store. The function `f_remote`
/// can be used to validate the bytes.
async fn download_digest_to_local<
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
/// can be used to validate the bytes (NB. if provided, the whole value will be buffered into
/// memory to provide the `Bytes` argument, and thus `f_remote` should only be used for small digests).
async fn download_digest_to_local(
&self,
local_store: local::ByteStore,
digest: Digest,
entry_type: EntryType,
f_remote: FRemote,
f_remote: Option<&(dyn Fn(Bytes) -> Result<(), String> + Send + Sync + 'static)>,
) -> Result<(), StoreError> {
let remote_store = self.store.clone();
let create_missing = || {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
};
self
.maybe_download(digest, async move {
let bytes = remote_store.load_bytes(digest).await?.ok_or_else(|| {
StoreError::MissingDigest(
"Was not present in either the local or remote store".to_owned(),
digest,
)
})?;
let stored_digest = if digest.size_bytes <= IMMUTABLE_FILE_SIZE_LIMIT || f_remote.is_some()
{
// (if there's a function to call, always just buffer fully into memory)
let bytes = remote_store
.load_bytes(digest)
.await?
.ok_or_else(create_missing)?;
if let Some(f_remote) = f_remote {
f_remote(bytes.clone())?;
}
local_store
.store_bytes(entry_type, None, bytes, true)
.await?
} else {
assert!(f_remote.is_none());
// TODO(#18048): choose a file that can be plopped into the local store directly, when
// large files are stored there
let file = tokio::task::spawn_blocking(tempfile::tempfile)
.await
.map_err(|e| e.to_string())??;
let file = tokio::fs::File::from_std(file);

f_remote(bytes.clone())?;
let stored_digest = local_store
.store_bytes(entry_type, None, bytes, true)
.await?;
let file = remote_store
.load_file(digest, file)
.await?
.ok_or_else(create_missing)?;

let file = file.into_std().await;
local_store
.store(entry_type, true, true, move || {
let mut file = file.try_clone()?;
file.rewind()?;
Ok(file)
})
.await?
};
if digest == stored_digest {
Ok(())
} else {
Expand Down Expand Up @@ -510,12 +540,7 @@ impl Store {
) -> Result<T, StoreError> {
// No transformation or verification is needed for files.
self
.load_bytes_with(
EntryType::File,
digest,
move |v: &[u8]| Ok(f(v)),
|_: Bytes| Ok(()),
)
.load_bytes_with(EntryType::File, digest, move |v: &[u8]| Ok(f(v)), None)
.await
}

Expand Down Expand Up @@ -687,13 +712,13 @@ impl Store {
},
// Eagerly verify that CAS-returned Directories are canonical, so that we don't write them
// into our local store.
move |bytes: Bytes| {
Some(&move |bytes| {
let directory = remexec::Directory::decode(bytes).map_err(|e| {
format!("CAS returned Directory proto for {digest:?} which was not valid: {e:?}")
})?;
protos::verify_directory_canonical(digest, &directory)?;
Ok(())
},
}),
)
.await
}
Expand Down Expand Up @@ -721,13 +746,12 @@ impl Store {
async fn load_bytes_with<
T: Send + 'static,
FLocal: Fn(&[u8]) -> Result<T, String> + Clone + Send + Sync + 'static,
FRemote: Fn(Bytes) -> Result<(), String> + Send + Sync + 'static,
>(
&self,
entry_type: EntryType,
digest: Digest,
f_local: FLocal,
f_remote: FRemote,
f_remote: Option<&(dyn Fn(Bytes) -> Result<(), String> + Send + Sync + 'static)>,
) -> Result<T, StoreError> {
if let Some(bytes_res) = self
.local
Expand Down Expand Up @@ -997,7 +1021,7 @@ impl Store {
.into_iter()
.map(|file_digest| async move {
if let Err(e) = remote
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, |_| Ok(()))
.download_digest_to_local(self.local.clone(), file_digest, EntryType::File, None)
.await
{
log::debug!("Missing file digest from remote store: {:?}", file_digest);
Expand Down
Loading

0 comments on commit 7631913

Please sign in to comment.