Skip to content

Commit

Permalink
Use speculation for remote cache reads (pantsbuild#11429)
Browse files Browse the repository at this point in the history
Closes pantsbuild#11390.

Some processes are extremely quick to run—such as parsing a Python file's imports—and running locally can often be faster than reading from the remote cache.

We add counters so that we can get some insight on how much of a value-add remote caching is.

Unlike remote execution's speculation, we do not use any delay. It's very cheap to check the remote cache and it does not tie up workers, unlike remote execution.
  • Loading branch information
Eric-Arellano authored Jan 20, 2021
1 parent 8be9fd5 commit c630218
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 53 deletions.
92 changes: 57 additions & 35 deletions src/rust/engine/process_execution/src/remote_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,47 +450,69 @@ impl crate::CommandRunner for CommandRunner {
)
.await?;

// Check the remote Action Cache to see if this request was already computed.
// If so, return immediately with the result.
if self.cache_read {
let response = with_workunit(
context.workunit_store.clone(),
"check_action_cache".to_owned(),
WorkunitMetadata::with_level(Level::Debug),
crate::remote::check_action_cache(
action_digest,
&self.metadata,
self.platform,
&context,
self.action_cache_client.clone(),
self.store.clone(),
self.eager_fetch,
),
|_, md| md,
)
.await;
match response {
Ok(cached_response_opt) => {
log::debug!(
"remote cache response: digest={:?}: {:?}",
let mut local_execution_future = self.underlying.run(req, context.clone());

let result = if self.cache_read {
// A future to read from the cache and log the results accordingly.
let cache_read_future = async {
let response = with_workunit(
context.workunit_store.clone(),
"check_action_cache".to_owned(),
WorkunitMetadata::with_level(Level::Debug),
crate::remote::check_action_cache(
action_digest,
&self.metadata,
self.platform,
&context,
self.action_cache_client.clone(),
self.store.clone(),
self.eager_fetch,
),
|_, md| md,
)
.await;
match response {
Ok(cached_response_opt) => {
log::debug!(
"remote cache response: digest={:?}: {:?}",
action_digest,
cached_response_opt
);
cached_response_opt
);
}
Err(err) => {
log::warn!("Failed to read from remote cache: {}", err);
None
}
}
};

if let Some(cached_response) = cached_response_opt {
// We speculate between reading from the remote cache vs. running locally. If there was a
// cache hit, we return early because there will be no need to write to the cache. Otherwise,
// we run the process locally and will possibly write it to the cache later.
tokio::select! {
cache_result = cache_read_future => {
if let Some(cached_response) = cache_result {
context.workunit_store.increment_counter(Metric::RemoteCacheSpeculationRemoteCompletedFirst, 1);
return Ok(cached_response);
} else {
// Note that we don't increment a counter here, as there is nothing of note in this
// scenario: the remote cache did not save unnecessary local work, nor was the remote
// trip unusually slow such that local execution was faster.
local_execution_future.await?
}
}
Err(err) => {
log::warn!("Failed to read from remote cache: {}", err);
local_result = &mut local_execution_future => {
context.workunit_store.increment_counter(Metric::RemoteCacheSpeculationLocalCompletedFirst, 1);
local_result?
}
};
}
}
} else {
local_execution_future.await?
};

let result = self.underlying.run(req, context.clone()).await?;
if result.exit_code == 0 && self.cache_write {
// Store the result in the remote cache if not the product of a remote execution.
if let Err(err) = self
let write_result = self
.update_action_cache(
&context,
&request,
Expand All @@ -500,13 +522,13 @@ impl crate::CommandRunner for CommandRunner {
action_digest,
command_digest,
)
.await
{
.await;
if let Err(err) = write_result {
log::warn!("Failed to write to remote cache: {}", err);
context
.workunit_store
.increment_counter(Metric::RemoteCacheWriteErrors, 1);
}
};
}

Ok(result)
Expand Down
91 changes: 76 additions & 15 deletions src/rust/engine/process_execution/src/remote_cache_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use remexec::ActionResult;
use store::{BackoffConfig, Store};
use tempfile::TempDir;
use testutil::data::{TestData, TestDirectory, TestTree};
use tokio::time::delay_for;
use workunit_store::WorkunitStore;

use crate::remote::{ensure_action_stored_locally, make_execute_request};
Expand All @@ -28,10 +29,15 @@ use crate::{
struct MockLocalCommandRunner {
result: Result<FallibleProcessResultWithPlatform, String>,
call_counter: Arc<AtomicUsize>,
delay: Duration,
}

impl MockLocalCommandRunner {
pub fn new(exit_code: i32, call_counter: Arc<AtomicUsize>) -> MockLocalCommandRunner {
pub fn new(
exit_code: i32,
call_counter: Arc<AtomicUsize>,
delay_ms: u64,
) -> MockLocalCommandRunner {
MockLocalCommandRunner {
result: Ok(FallibleProcessResultWithPlatform {
stdout_digest: EMPTY_DIGEST,
Expand All @@ -42,6 +48,7 @@ impl MockLocalCommandRunner {
platform: Platform::current().unwrap(),
}),
call_counter,
delay: Duration::from_millis(delay_ms),
}
}
}
Expand All @@ -53,6 +60,7 @@ impl CommandRunnerTrait for MockLocalCommandRunner {
_req: MultiPlatformProcess,
_context: Context,
) -> Result<FallibleProcessResultWithPlatform, String> {
delay_for(self.delay).await;
self.call_counter.fetch_add(1, Ordering::SeqCst);
self.result.clone()
}
Expand Down Expand Up @@ -97,18 +105,26 @@ impl StoreSetup {
}
}

fn create_local_runner(exit_code: i32) -> (Box<MockLocalCommandRunner>, Arc<AtomicUsize>) {
fn create_local_runner(
exit_code: i32,
delay_ms: u64,
) -> (Box<MockLocalCommandRunner>, Arc<AtomicUsize>) {
let call_counter = Arc::new(AtomicUsize::new(0));
let local_runner = Box::new(MockLocalCommandRunner::new(exit_code, call_counter.clone()));
let local_runner = Box::new(MockLocalCommandRunner::new(
exit_code,
call_counter.clone(),
delay_ms,
));
(local_runner, call_counter)
}

fn create_cached_runner(
local: Box<dyn CommandRunnerTrait>,
store: Store,
read_delay_ms: u64,
eager_fetch: bool,
) -> (Box<dyn CommandRunnerTrait>, StubActionCache) {
let action_cache = StubActionCache::new().unwrap();
let action_cache = StubActionCache::new(read_delay_ms).unwrap();
let runner = Box::new(
crate::remote_cache::CommandRunner::new(
local.into(),
Expand Down Expand Up @@ -164,9 +180,9 @@ fn insert_into_action_cache(
async fn cache_read_success() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner.clone(), store_setup.store.clone(), false);
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
Expand All @@ -185,9 +201,9 @@ async fn cache_read_success() {
async fn cache_read_skipped_on_errors() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner.clone(), store_setup.store.clone(), false);
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
Expand All @@ -211,9 +227,9 @@ async fn cache_read_eager_fetch() {

async fn run_process(eager_fetch: bool) -> (i32, usize) {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner.clone(), store_setup.store.clone(), eager_fetch);
create_cached_runner(local_runner, store_setup.store.clone(), 0, eager_fetch);

let (process, action_digest) = create_process(&store_setup.store).await;
insert_into_action_cache(
Expand Down Expand Up @@ -243,13 +259,58 @@ async fn cache_read_eager_fetch() {
assert_eq!(eager_local_call_count, 1);
}

#[tokio::test]
async fn cache_read_speculation() {
WorkunitStore::setup_for_tests();

async fn run_process(local_delay_ms: u64, remote_delay_ms: u64, cache_hit: bool) -> (i32, usize) {
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms);
let (cache_runner, action_cache) = create_cached_runner(
local_runner,
store_setup.store.clone(),
remote_delay_ms,
false,
);

let (process, action_digest) = create_process(&store_setup.store).await;
if cache_hit {
insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST);
}

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
let remote_result = cache_runner
.run(process.clone().into(), Context::default())
.await
.unwrap();

let final_local_count = local_runner_call_counter.load(Ordering::SeqCst);
(remote_result.exit_code, final_local_count)
}

// Case 1: remote is faster than local.
let (exit_code, local_call_count) = run_process(20, 0, true).await;
assert_eq!(exit_code, 0);
assert_eq!(local_call_count, 0);

// Case 2: local is faster than remote.
let (exit_code, local_call_count) = run_process(0, 20, true).await;
assert_eq!(exit_code, 1);
assert_eq!(local_call_count, 1);

// Case 3: the remote lookup wins, but there is no cache entry so we fallback to local execution.
let (exit_code, local_call_count) = run_process(20, 0, false).await;
assert_eq!(exit_code, 1);
assert_eq!(local_call_count, 1);
}

#[tokio::test]
async fn cache_write_success() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(0);
let (local_runner, local_runner_call_counter) = create_local_runner(0, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), false);
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);
let (process, action_digest) = create_process(&store_setup.store).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
Expand Down Expand Up @@ -277,9 +338,9 @@ async fn cache_write_success() {
async fn cache_write_not_for_failures() {
WorkunitStore::setup_for_tests();
let store_setup = StoreSetup::new();
let (local_runner, local_runner_call_counter) = create_local_runner(1);
let (local_runner, local_runner_call_counter) = create_local_runner(1, 20);
let (cache_runner, action_cache) =
create_cached_runner(local_runner, store_setup.store.clone(), false);
create_cached_runner(local_runner, store_setup.store.clone(), 0, false);
let (process, _action_digest) = create_process(&store_setup.store).await;

assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0);
Expand Down Expand Up @@ -442,7 +503,7 @@ async fn make_action_result_basic() {
.expect("Error saving directory");

let mock_command_runner = Arc::new(MockCommandRunner);
let action_cache = StubActionCache::new().unwrap();
let action_cache = StubActionCache::new(0).unwrap();
let runner = crate::remote_cache::CommandRunner::new(
mock_command_runner.clone(),
ProcessMetadata::default(),
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1745,7 +1745,7 @@ async fn remote_workunits_are_stored() {
.file(&TestData::roland())
.directory(&TestDirectory::containing_roland())
.build();
let action_cache = mock::StubActionCache::new().unwrap();
let action_cache = mock::StubActionCache::new(0).unwrap();
let (command_runner, _store) =
create_command_runner(action_cache.address(), &cas, Platform::Linux);

Expand Down Expand Up @@ -2301,7 +2301,7 @@ async fn extract_execute_response(
operation: Operation,
remote_platform: Platform,
) -> Result<RemoteTestResult, ExecutionError> {
let action_cache = mock::StubActionCache::new().expect("failed to create action cache");
let action_cache = mock::StubActionCache::new(0).expect("failed to create action cache");

let cas = mock::StubCAS::builder()
.file(&TestData::roland())
Expand Down
8 changes: 7 additions & 1 deletion src/rust/engine/testutil/mock/src/action_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;

use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec;
use bazel_protos::require_digest;
Expand All @@ -39,6 +40,7 @@ use hashing::{Digest, Fingerprint};
use parking_lot::Mutex;
use remexec::action_cache_server::{ActionCache, ActionCacheServer};
use remexec::{ActionResult, GetActionResultRequest, UpdateActionResultRequest};
use tokio::time::delay_for;
use tonic::transport::Server;
use tonic::{Request, Response, Status};

Expand All @@ -61,6 +63,7 @@ impl Drop for StubActionCache {
struct ActionCacheResponder {
action_map: Arc<Mutex<HashMap<Fingerprint, ActionResult>>>,
always_errors: Arc<AtomicBool>,
read_delay: Duration,
}

#[tonic::async_trait]
Expand All @@ -69,6 +72,8 @@ impl ActionCache for ActionCacheResponder {
&self,
request: Request<GetActionResultRequest>,
) -> Result<Response<ActionResult>, Status> {
delay_for(self.read_delay).await;

let request = request.into_inner();

if self.always_errors.load(Ordering::SeqCst) {
Expand Down Expand Up @@ -130,12 +135,13 @@ impl ActionCache for ActionCacheResponder {
}

impl StubActionCache {
pub fn new() -> Result<Self, String> {
pub fn new(read_delay_ms: u64) -> Result<Self, String> {
let action_map = Arc::new(Mutex::new(HashMap::new()));
let always_errors = Arc::new(AtomicBool::new(false));
let responder = ActionCacheResponder {
action_map: action_map.clone(),
always_errors: always_errors.clone(),
read_delay: Duration::from_millis(read_delay_ms),
};

let addr = "127.0.0.1:0"
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/workunit_store/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub enum Metric {
RemoteCacheRequestsUncached,
RemoteCacheReadErrors,
RemoteCacheWriteErrors,
RemoteCacheSpeculationLocalCompletedFirst,
RemoteCacheSpeculationRemoteCompletedFirst,
RemoteExecutionErrors,
RemoteExecutionRequests,
RemoteExecutionRPCErrors,
Expand All @@ -66,6 +68,10 @@ impl Metric {
RemoteCacheRequestsUncached => "remote_cache_requests_uncached",
RemoteCacheReadErrors => "remote_cache_read_errors",
RemoteCacheWriteErrors => "remote_cache_write_errors",
RemoteCacheSpeculationLocalCompletedFirst => "remote_cache_speculation_local_completed_first",
RemoteCacheSpeculationRemoteCompletedFirst => {
"remote_cache_speculation_remote_completed_first"
}
RemoteExecutionErrors => "remote_execution_errors",
RemoteExecutionRequests => "remote_execution_requests",
RemoteExecutionRPCRetries => "remote_execution_rpc_retries",
Expand Down

0 comments on commit c630218

Please sign in to comment.