Skip to content

Commit

Permalink
Optimize CreateDigest implementation. (pantsbuild#16617)
Browse files Browse the repository at this point in the history
Closes pantsbuild#16570

* Use a `DigestTrie` to create all snapshots at once, instead of creating them individually
* Store all in-memory file contents in a single (batched) call, instead of storing them individually

[ci skip-build-wheels]

---

I restored the benchmark script from pantsbuild#14569 to test this.

| size | create_digest_before | create_digest_after |
| --- | --- | --- |
| 20000 | 608 | 130 |
| 40000 | 1164 | 268 |
| 60000 | 2260 | 475 |
| 80000 | 3582 | 674 |
| 100000 | 5085 | 862 |
| 120000 | 6765 | 1057 |
| 140000 | 8818 | 1067 |
| 160000 | 10752 | 1361 |
| 180000 | 12619 | 1604 |
  • Loading branch information
danxmoran authored Aug 24, 2022
1 parent 04aca82 commit cc3e78b
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 37 deletions.
17 changes: 17 additions & 0 deletions src/rust/engine/fs/store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,23 @@ impl Store {
.await
}

///
/// A convenience method for storing batches of small files.
///
/// NB: This method should not be used for large blobs: prefer to stream them from their source
/// using `store_file`.
///
pub async fn store_file_bytes_batch(
&self,
items: Vec<(Option<Digest>, Bytes)>,
initial_lease: bool,
) -> Result<Vec<Digest>, String> {
self
.local
.store_bytes_batch(EntryType::File, items, initial_lease)
.await
}

///
/// Store a file locally by streaming its contents.
///
Expand Down
80 changes: 43 additions & 37 deletions src/rust/engine/src/intrinsics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2021 Pants project contributors (see CONTRIBUTORS.md).
// Licensed under the Apache License, Version 2.0 (see LICENSE).

use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::str::FromStr;
Expand All @@ -18,14 +19,15 @@ use crate::tasks::Intrinsic;
use crate::types::Types;
use crate::Failure;

use futures::future::{self, BoxFuture, FutureExt, TryFutureExt};
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt, TryFutureExt};
use futures::try_join;
use indexmap::IndexMap;
use pyo3::{PyAny, PyRef, Python, ToPyObject};
use tokio::process;

use fs::{DirectoryDigest, RelativePath};
use hashing::Digest;
use fs::{DigestTrie, DirectoryDigest, PathStat, RelativePath};
use hashing::{Digest, EMPTY_DIGEST};
use process_execution::local::{apply_chroot, create_sandbox, prepare_workdir, KeepSandboxes};
use process_execution::ManagedChild;
use stdio::TryCloneAsFile;
Expand Down Expand Up @@ -406,6 +408,8 @@ fn create_digest_to_digest(
context: Context,
args: Vec<Value>,
) -> BoxFuture<'static, NodeResult<Value>> {
let mut new_file_count = 0;

let items: Vec<CreateDigestItem> = {
let gil = Python::acquire_gil();
let py = gil.python();
Expand All @@ -419,6 +423,7 @@ fn create_digest_to_digest(
if obj.hasattr("content").unwrap() {
let bytes = bytes::Bytes::from(externs::getattr::<Vec<u8>>(obj, "content").unwrap());
let is_executable: bool = externs::getattr(obj, "is_executable").unwrap();
new_file_count += 1;
CreateDigestItem::FileContent(path, bytes, is_executable)
} else if obj.hasattr("file_digest").unwrap() {
let py_file_digest: PyFileDigest = externs::getattr(obj, "file_digest").unwrap();
Expand All @@ -431,45 +436,46 @@ fn create_digest_to_digest(
.collect()
};

// TODO: Rather than creating independent Digests and then merging them, this should use
// `DigestTrie::from_path_stats`.
// see https://github.com/pantsbuild/pants/pull/14569#issuecomment-1057286943
let digest_futures: Vec<_> = items
.into_iter()
.map(|item| {
let store = context.core.store();
async move {
match item {
CreateDigestItem::FileContent(path, bytes, is_executable) => {
let digest = store.store_file_bytes(bytes, true).await?;
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<DirectoryDigest, String> = Ok(snapshot.into());
res
}
CreateDigestItem::FileEntry(path, digest, is_executable) => {
let snapshot = store
.snapshot_of_one_file(path, digest, is_executable)
.await?;
let res: Result<_, String> = Ok(snapshot.into());
res
}
CreateDigestItem::Dir(path) => store
.create_empty_dir(&path)
.await
.map_err(|e| e.to_string()),
}
let mut path_stats: Vec<PathStat> = Vec::with_capacity(items.len());
let mut file_digests: HashMap<PathBuf, Digest> = HashMap::with_capacity(items.len());
let mut bytes_to_store: Vec<(Option<Digest>, Bytes)> = Vec::with_capacity(new_file_count);

for item in items {
match item {
CreateDigestItem::FileContent(path, bytes, is_executable) => {
let digest = Digest::of_bytes(&bytes);
bytes_to_store.push((Some(digest), bytes));
let stat = fs::File {
path: path.to_path_buf(),
is_executable,
};
path_stats.push(PathStat::file(path.to_path_buf(), stat));
file_digests.insert(path.to_path_buf(), digest);
}
})
.collect();
CreateDigestItem::FileEntry(path, digest, is_executable) => {
let stat = fs::File {
path: path.to_path_buf(),
is_executable,
};
path_stats.push(PathStat::file(path.to_path_buf(), stat));
file_digests.insert(path.to_path_buf(), digest);
}
CreateDigestItem::Dir(path) => {
let stat = fs::Dir(path.to_path_buf());
path_stats.push(PathStat::dir(path.to_path_buf(), stat));
file_digests.insert(path.to_path_buf(), EMPTY_DIGEST);
}
}
}

let store = context.core.store();
async move {
let digests = future::try_join_all(digest_futures).await?;
let digest = store.merge(digests).await?;
// The digests returned here are already in the `file_digests` map.
let _ = store.store_file_bytes_batch(bytes_to_store, true).await?;
let trie = DigestTrie::from_path_stats(path_stats, &file_digests)?;

let gil = Python::acquire_gil();
let value = Snapshot::store_directory_digest(gil.python(), digest)?;
let value = Snapshot::store_directory_digest(gil.python(), trie.into())?;
Ok(value)
}
.boxed()
Expand Down

0 comments on commit cc3e78b

Please sign in to comment.