Skip to content

Commit

Permalink
Log aggregate statistics for remote executions (pantsbuild#6812)
Browse files Browse the repository at this point in the history
Also, tone down background logging.

This is not super structured, and at some point we should do something more structured, but this is useful for now.
  • Loading branch information
illicitonion authored Dec 13, 2018
1 parent 6f0694e commit 903c4cf
Show file tree
Hide file tree
Showing 6 changed files with 268 additions and 94 deletions.
1 change: 1 addition & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

36 changes: 24 additions & 12 deletions src/rust/engine/fs/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::io::Write;
use std::os::unix::fs::OpenOptionsExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, Instant};

use parking_lot::Mutex;
use pool::ResettablePool;
Expand All @@ -33,10 +33,12 @@ pub const DEFAULT_LOCAL_STORE_GC_TARGET_BYTES: usize = 4 * 1024 * 1024 * 1024;
// uploaded_file_{count, bytes}: Number and combined size of files uploaded to the remote
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq, Serialize)]
pub struct UploadSummary {
ingested_file_count: usize,
ingested_file_bytes: usize,
uploaded_file_count: usize,
uploaded_file_bytes: usize,
pub ingested_file_count: usize,
pub ingested_file_bytes: usize,
pub uploaded_file_count: usize,
pub uploaded_file_bytes: usize,
#[serde(skip)]
pub upload_wall_time: Duration,
}

///
Expand Down Expand Up @@ -282,6 +284,8 @@ impl Store {
&self,
digests: Vec<Digest>,
) -> BoxFuture<UploadSummary, String> {
let start_time = Instant::now();

let remote = match self.remote {
Some(ref remote) => remote,
None => {
Expand Down Expand Up @@ -343,7 +347,7 @@ impl Store {
}).collect::<Vec<_>>(),
).and_then(future::join_all)
.map(|uploaded_digests| (uploaded_digests, ingested_digests))
}).map(|(uploaded_digests, ingested_digests)| {
}).map(move |(uploaded_digests, ingested_digests)| {
let ingested_file_sizes = ingested_digests.iter().map(|(digest, _)| digest.1);
let uploaded_file_sizes = uploaded_digests.iter().map(|digest| digest.1);

Expand All @@ -352,6 +356,7 @@ impl Store {
ingested_file_bytes: ingested_file_sizes.sum(),
uploaded_file_count: uploaded_file_sizes.len(),
uploaded_file_bytes: uploaded_file_sizes.sum(),
upload_wall_time: start_time.elapsed(),
}
}).to_boxed()
}
Expand Down Expand Up @@ -3407,7 +3412,7 @@ mod tests {
.store_file_bytes(testcatnip.bytes(), false)
.wait()
.expect("Error storing file locally");
let summary = new_store(dir.path(), cas.address())
let mut summary = new_store(dir.path(), cas.address())
.ensure_remote_has_recursive(vec![testdir.digest()])
.wait()
.expect("Error uploading file");
Expand All @@ -3419,13 +3424,15 @@ mod tests {
testcatnip.digest().1,
];
let test_bytes = test_data.iter().sum();
summary.upload_wall_time = Duration::default();
assert_eq!(
summary,
UploadSummary {
ingested_file_count: test_data.len(),
ingested_file_bytes: test_bytes,
uploaded_file_count: test_data.len(),
uploaded_file_bytes: test_bytes
uploaded_file_bytes: test_bytes,
upload_wall_time: Duration::default(),
}
);
}
Expand Down Expand Up @@ -3455,36 +3462,41 @@ mod tests {
.expect("Error storing file locally");

// Store testroland first, which should return a summary of one file
let data_summary = new_store(dir.path(), cas.address())
let mut data_summary = new_store(dir.path(), cas.address())
.ensure_remote_has_recursive(vec![testroland.digest()])
.wait()
.expect("Error uploading file");
data_summary.upload_wall_time = Duration::default();

assert_eq!(
data_summary,
UploadSummary {
ingested_file_count: 1,
ingested_file_bytes: testroland.digest().1,
uploaded_file_count: 1,
uploaded_file_bytes: testroland.digest().1
uploaded_file_bytes: testroland.digest().1,
upload_wall_time: Duration::default(),
}
);

// Store the directory and catnip.
// It should see the digest of testroland already in cas,
// and not report it in uploads.
let dir_summary = new_store(dir.path(), cas.address())
let mut dir_summary = new_store(dir.path(), cas.address())
.ensure_remote_has_recursive(vec![testdir.digest()])
.wait()
.expect("Error uploading directory");

dir_summary.upload_wall_time = Duration::default();

assert_eq!(
dir_summary,
UploadSummary {
ingested_file_count: 3,
ingested_file_bytes: testdir.digest().1 + testroland.digest().1 + testcatnip.digest().1,
uploaded_file_count: 2,
uploaded_file_bytes: testdir.digest().1 + testcatnip.digest().1
uploaded_file_bytes: testdir.digest().1 + testcatnip.digest().1,
upload_wall_time: Duration::default(),
}
);
}
Expand Down
1 change: 1 addition & 0 deletions src/rust/engine/process_execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ protobuf = { version = "2.0.4", features = ["with-bytes"] }
sha2 = "0.8"
tempfile = "3"
futures-timer = "0.1"
time = "0.1.40"
tokio-codec = "0.1"
tokio-process = "0.2.1"

Expand Down
33 changes: 33 additions & 0 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,17 @@ extern crate sha2;
extern crate tempfile;
#[cfg(test)]
extern crate testutil;
extern crate time;
extern crate tokio_codec;
extern crate tokio_process;

use boxfuture::BoxFuture;
use bytes::Bytes;
use std::collections::{BTreeMap, BTreeSet};
use std::ops::AddAssign;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;

use async_semaphore::AsyncSemaphore;

Expand Down Expand Up @@ -115,6 +118,36 @@ pub struct FallibleExecuteProcessResult {
// It's unclear whether this should be a Snapshot or a digest of a Directory. A Directory digest
// is handy, so let's try that out for now.
pub output_directory: hashing::Digest,

pub execution_attempts: Vec<ExecutionStats>,
}

#[cfg(test)]
impl FallibleExecuteProcessResult {
pub fn without_execution_attempts(mut self) -> Self {
self.execution_attempts = vec![];
self
}
}

#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
pub struct ExecutionStats {
uploaded_bytes: usize,
uploaded_file_count: usize,
upload: Duration,
remote_queue: Option<Duration>,
remote_input_fetch: Option<Duration>,
remote_execution: Option<Duration>,
remote_output_store: Option<Duration>,
was_cache_hit: bool,
}

impl AddAssign<fs::UploadSummary> for ExecutionStats {
fn add_assign(&mut self, summary: fs::UploadSummary) {
self.uploaded_file_count += summary.uploaded_file_count;
self.uploaded_bytes += summary.uploaded_file_bytes;
self.upload += summary.upload_wall_time;
}
}

pub trait CommandRunner: Send + Sync {
Expand Down
12 changes: 12 additions & 0 deletions src/rust/engine/process_execution/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ impl super::CommandRunner for CommandRunner {
stderr: child_results.stderr,
exit_code: child_results.exit_code,
output_directory: snapshot.digest,
execution_attempts: vec![],
}).to_boxed()
}).then(move |result| {
// Force workdir not to get dropped until after we've ingested the outputs
Expand Down Expand Up @@ -337,6 +338,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: fs::EMPTY_DIGEST,
execution_attempts: vec![],
}
)
}
Expand All @@ -362,6 +364,7 @@ mod tests {
stderr: as_bytes("bar"),
exit_code: 1,
output_directory: fs::EMPTY_DIGEST,
execution_attempts: vec![],
}
)
}
Expand All @@ -388,6 +391,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: -15,
output_directory: fs::EMPTY_DIGEST,
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -489,6 +493,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: fs::EMPTY_DIGEST,
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -517,6 +522,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: TestDirectory::containing_roland().digest(),
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -550,6 +556,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: TestDirectory::recursive().digest(),
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -584,6 +591,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: TestDirectory::recursive().digest(),
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -616,6 +624,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 1,
output_directory: TestDirectory::containing_roland().digest(),
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -646,6 +655,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: TestDirectory::containing_roland().digest(),
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -677,6 +687,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: TestDirectory::nested().digest(),
execution_attempts: vec![],
}
)
}
Expand Down Expand Up @@ -705,6 +716,7 @@ mod tests {
stderr: as_bytes(""),
exit_code: 0,
output_directory: fs::EMPTY_DIGEST,
execution_attempts: vec![],
})
)
}
Expand Down
Loading

0 comments on commit 903c4cf

Please sign in to comment.