From c6302182151f1ccc48e1c52451d3f046e4a305fe Mon Sep 17 00:00:00 2001 From: Eric Arellano <14852634+Eric-Arellano@users.noreply.github.com> Date: Wed, 20 Jan 2021 16:28:26 -0700 Subject: [PATCH] Use speculation for remote cache reads (#11429) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes https://github.com/pantsbuild/pants/issues/11390. Some processes are extremely quick to run—such as parsing a Python file's imports—and running locally can often be faster than reading from the remote cache. We add counters so that we can get some insight on how much of a value-add remote caching is. Unlike remote execution's speculation, we do not use any delay. It's very cheap to check the remote cache and it does not tie up workers, unlike remote execution. --- .../process_execution/src/remote_cache.rs | 92 ++++++++++++------- .../src/remote_cache_tests.rs | 91 +++++++++++++++--- .../process_execution/src/remote_tests.rs | 4 +- .../engine/testutil/mock/src/action_cache.rs | 8 +- src/rust/engine/workunit_store/src/metrics.rs | 6 ++ 5 files changed, 148 insertions(+), 53 deletions(-) diff --git a/src/rust/engine/process_execution/src/remote_cache.rs b/src/rust/engine/process_execution/src/remote_cache.rs index fb05be33abb..0a0969e6773 100644 --- a/src/rust/engine/process_execution/src/remote_cache.rs +++ b/src/rust/engine/process_execution/src/remote_cache.rs @@ -450,47 +450,69 @@ impl crate::CommandRunner for CommandRunner { ) .await?; - // Check the remote Action Cache to see if this request was already computed. - // If so, return immediately with the result. - if self.cache_read { - let response = with_workunit( - context.workunit_store.clone(), - "check_action_cache".to_owned(), - WorkunitMetadata::with_level(Level::Debug), - crate::remote::check_action_cache( - action_digest, - &self.metadata, - self.platform, - &context, - self.action_cache_client.clone(), - self.store.clone(), - self.eager_fetch, - ), - |_, md| md, - ) - .await; - match response { - Ok(cached_response_opt) => { - log::debug!( - "remote cache response: digest={:?}: {:?}", + let mut local_execution_future = self.underlying.run(req, context.clone()); + + let result = if self.cache_read { + // A future to read from the cache and log the results accordingly. + let cache_read_future = async { + let response = with_workunit( + context.workunit_store.clone(), + "check_action_cache".to_owned(), + WorkunitMetadata::with_level(Level::Debug), + crate::remote::check_action_cache( action_digest, + &self.metadata, + self.platform, + &context, + self.action_cache_client.clone(), + self.store.clone(), + self.eager_fetch, + ), + |_, md| md, + ) + .await; + match response { + Ok(cached_response_opt) => { + log::debug!( + "remote cache response: digest={:?}: {:?}", + action_digest, + cached_response_opt + ); cached_response_opt - ); + } + Err(err) => { + log::warn!("Failed to read from remote cache: {}", err); + None + } + } + }; - if let Some(cached_response) = cached_response_opt { + // We speculate between reading from the remote cache vs. running locally. If there was a + // cache hit, we return early because there will be no need to write to the cache. Otherwise, + // we run the process locally and will possibly write it to the cache later. + tokio::select! { + cache_result = cache_read_future => { + if let Some(cached_response) = cache_result { + context.workunit_store.increment_counter(Metric::RemoteCacheSpeculationRemoteCompletedFirst, 1); return Ok(cached_response); + } else { + // Note that we don't increment a counter here, as there is nothing of note in this + // scenario: the remote cache did not save unnecessary local work, nor was the remote + // trip unusually slow such that local execution was faster. + local_execution_future.await? } } - Err(err) => { - log::warn!("Failed to read from remote cache: {}", err); + local_result = &mut local_execution_future => { + context.workunit_store.increment_counter(Metric::RemoteCacheSpeculationLocalCompletedFirst, 1); + local_result? } - }; - } + } + } else { + local_execution_future.await? + }; - let result = self.underlying.run(req, context.clone()).await?; if result.exit_code == 0 && self.cache_write { - // Store the result in the remote cache if not the product of a remote execution. - if let Err(err) = self + let write_result = self .update_action_cache( &context, &request, @@ -500,13 +522,13 @@ impl crate::CommandRunner for CommandRunner { action_digest, command_digest, ) - .await - { + .await; + if let Err(err) = write_result { log::warn!("Failed to write to remote cache: {}", err); context .workunit_store .increment_counter(Metric::RemoteCacheWriteErrors, 1); - } + }; } Ok(result) diff --git a/src/rust/engine/process_execution/src/remote_cache_tests.rs b/src/rust/engine/process_execution/src/remote_cache_tests.rs index 8cc416e871a..28f8b055e19 100644 --- a/src/rust/engine/process_execution/src/remote_cache_tests.rs +++ b/src/rust/engine/process_execution/src/remote_cache_tests.rs @@ -15,6 +15,7 @@ use remexec::ActionResult; use store::{BackoffConfig, Store}; use tempfile::TempDir; use testutil::data::{TestData, TestDirectory, TestTree}; +use tokio::time::delay_for; use workunit_store::WorkunitStore; use crate::remote::{ensure_action_stored_locally, make_execute_request}; @@ -28,10 +29,15 @@ use crate::{ struct MockLocalCommandRunner { result: Result, call_counter: Arc, + delay: Duration, } impl MockLocalCommandRunner { - pub fn new(exit_code: i32, call_counter: Arc) -> MockLocalCommandRunner { + pub fn new( + exit_code: i32, + call_counter: Arc, + delay_ms: u64, + ) -> MockLocalCommandRunner { MockLocalCommandRunner { result: Ok(FallibleProcessResultWithPlatform { stdout_digest: EMPTY_DIGEST, @@ -42,6 +48,7 @@ impl MockLocalCommandRunner { platform: Platform::current().unwrap(), }), call_counter, + delay: Duration::from_millis(delay_ms), } } } @@ -53,6 +60,7 @@ impl CommandRunnerTrait for MockLocalCommandRunner { _req: MultiPlatformProcess, _context: Context, ) -> Result { + delay_for(self.delay).await; self.call_counter.fetch_add(1, Ordering::SeqCst); self.result.clone() } @@ -97,18 +105,26 @@ impl StoreSetup { } } -fn create_local_runner(exit_code: i32) -> (Box, Arc) { +fn create_local_runner( + exit_code: i32, + delay_ms: u64, +) -> (Box, Arc) { let call_counter = Arc::new(AtomicUsize::new(0)); - let local_runner = Box::new(MockLocalCommandRunner::new(exit_code, call_counter.clone())); + let local_runner = Box::new(MockLocalCommandRunner::new( + exit_code, + call_counter.clone(), + delay_ms, + )); (local_runner, call_counter) } fn create_cached_runner( local: Box, store: Store, + read_delay_ms: u64, eager_fetch: bool, ) -> (Box, StubActionCache) { - let action_cache = StubActionCache::new().unwrap(); + let action_cache = StubActionCache::new(read_delay_ms).unwrap(); let runner = Box::new( crate::remote_cache::CommandRunner::new( local.into(), @@ -164,9 +180,9 @@ fn insert_into_action_cache( async fn cache_read_success() { WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); - let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (local_runner, local_runner_call_counter) = create_local_runner(1, 20); let (cache_runner, action_cache) = - create_cached_runner(local_runner.clone(), store_setup.store.clone(), false); + create_cached_runner(local_runner, store_setup.store.clone(), 0, false); let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); @@ -185,9 +201,9 @@ async fn cache_read_success() { async fn cache_read_skipped_on_errors() { WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); - let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (local_runner, local_runner_call_counter) = create_local_runner(1, 20); let (cache_runner, action_cache) = - create_cached_runner(local_runner.clone(), store_setup.store.clone(), false); + create_cached_runner(local_runner, store_setup.store.clone(), 0, false); let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); @@ -211,9 +227,9 @@ async fn cache_read_eager_fetch() { async fn run_process(eager_fetch: bool) -> (i32, usize) { let store_setup = StoreSetup::new(); - let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (local_runner, local_runner_call_counter) = create_local_runner(1, 20); let (cache_runner, action_cache) = - create_cached_runner(local_runner.clone(), store_setup.store.clone(), eager_fetch); + create_cached_runner(local_runner, store_setup.store.clone(), 0, eager_fetch); let (process, action_digest) = create_process(&store_setup.store).await; insert_into_action_cache( @@ -243,13 +259,58 @@ async fn cache_read_eager_fetch() { assert_eq!(eager_local_call_count, 1); } +#[tokio::test] +async fn cache_read_speculation() { + WorkunitStore::setup_for_tests(); + + async fn run_process(local_delay_ms: u64, remote_delay_ms: u64, cache_hit: bool) -> (i32, usize) { + let store_setup = StoreSetup::new(); + let (local_runner, local_runner_call_counter) = create_local_runner(1, local_delay_ms); + let (cache_runner, action_cache) = create_cached_runner( + local_runner, + store_setup.store.clone(), + remote_delay_ms, + false, + ); + + let (process, action_digest) = create_process(&store_setup.store).await; + if cache_hit { + insert_into_action_cache(&action_cache, &action_digest, 0, EMPTY_DIGEST, EMPTY_DIGEST); + } + + assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); + let remote_result = cache_runner + .run(process.clone().into(), Context::default()) + .await + .unwrap(); + + let final_local_count = local_runner_call_counter.load(Ordering::SeqCst); + (remote_result.exit_code, final_local_count) + } + + // Case 1: remote is faster than local. + let (exit_code, local_call_count) = run_process(20, 0, true).await; + assert_eq!(exit_code, 0); + assert_eq!(local_call_count, 0); + + // Case 2: local is faster than remote. + let (exit_code, local_call_count) = run_process(0, 20, true).await; + assert_eq!(exit_code, 1); + assert_eq!(local_call_count, 1); + + // Case 3: the remote lookup wins, but there is no cache entry so we fallback to local execution. + let (exit_code, local_call_count) = run_process(20, 0, false).await; + assert_eq!(exit_code, 1); + assert_eq!(local_call_count, 1); +} + #[tokio::test] async fn cache_write_success() { WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); - let (local_runner, local_runner_call_counter) = create_local_runner(0); + let (local_runner, local_runner_call_counter) = create_local_runner(0, 20); let (cache_runner, action_cache) = - create_cached_runner(local_runner, store_setup.store.clone(), false); + create_cached_runner(local_runner, store_setup.store.clone(), 0, false); let (process, action_digest) = create_process(&store_setup.store).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -277,9 +338,9 @@ async fn cache_write_success() { async fn cache_write_not_for_failures() { WorkunitStore::setup_for_tests(); let store_setup = StoreSetup::new(); - let (local_runner, local_runner_call_counter) = create_local_runner(1); + let (local_runner, local_runner_call_counter) = create_local_runner(1, 20); let (cache_runner, action_cache) = - create_cached_runner(local_runner, store_setup.store.clone(), false); + create_cached_runner(local_runner, store_setup.store.clone(), 0, false); let (process, _action_digest) = create_process(&store_setup.store).await; assert_eq!(local_runner_call_counter.load(Ordering::SeqCst), 0); @@ -442,7 +503,7 @@ async fn make_action_result_basic() { .expect("Error saving directory"); let mock_command_runner = Arc::new(MockCommandRunner); - let action_cache = StubActionCache::new().unwrap(); + let action_cache = StubActionCache::new(0).unwrap(); let runner = crate::remote_cache::CommandRunner::new( mock_command_runner.clone(), ProcessMetadata::default(), diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index cca0aafd3b7..2f5840042a8 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -1745,7 +1745,7 @@ async fn remote_workunits_are_stored() { .file(&TestData::roland()) .directory(&TestDirectory::containing_roland()) .build(); - let action_cache = mock::StubActionCache::new().unwrap(); + let action_cache = mock::StubActionCache::new(0).unwrap(); let (command_runner, _store) = create_command_runner(action_cache.address(), &cas, Platform::Linux); @@ -2301,7 +2301,7 @@ async fn extract_execute_response( operation: Operation, remote_platform: Platform, ) -> Result { - let action_cache = mock::StubActionCache::new().expect("failed to create action cache"); + let action_cache = mock::StubActionCache::new(0).expect("failed to create action cache"); let cas = mock::StubCAS::builder() .file(&TestData::roland()) diff --git a/src/rust/engine/testutil/mock/src/action_cache.rs b/src/rust/engine/testutil/mock/src/action_cache.rs index c346705eb52..234ce632723 100644 --- a/src/rust/engine/testutil/mock/src/action_cache.rs +++ b/src/rust/engine/testutil/mock/src/action_cache.rs @@ -31,6 +31,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use std::time::Duration; use bazel_protos::gen::build::bazel::remote::execution::v2 as remexec; use bazel_protos::require_digest; @@ -39,6 +40,7 @@ use hashing::{Digest, Fingerprint}; use parking_lot::Mutex; use remexec::action_cache_server::{ActionCache, ActionCacheServer}; use remexec::{ActionResult, GetActionResultRequest, UpdateActionResultRequest}; +use tokio::time::delay_for; use tonic::transport::Server; use tonic::{Request, Response, Status}; @@ -61,6 +63,7 @@ impl Drop for StubActionCache { struct ActionCacheResponder { action_map: Arc>>, always_errors: Arc, + read_delay: Duration, } #[tonic::async_trait] @@ -69,6 +72,8 @@ impl ActionCache for ActionCacheResponder { &self, request: Request, ) -> Result, Status> { + delay_for(self.read_delay).await; + let request = request.into_inner(); if self.always_errors.load(Ordering::SeqCst) { @@ -130,12 +135,13 @@ impl ActionCache for ActionCacheResponder { } impl StubActionCache { - pub fn new() -> Result { + pub fn new(read_delay_ms: u64) -> Result { let action_map = Arc::new(Mutex::new(HashMap::new())); let always_errors = Arc::new(AtomicBool::new(false)); let responder = ActionCacheResponder { action_map: action_map.clone(), always_errors: always_errors.clone(), + read_delay: Duration::from_millis(read_delay_ms), }; let addr = "127.0.0.1:0" diff --git a/src/rust/engine/workunit_store/src/metrics.rs b/src/rust/engine/workunit_store/src/metrics.rs index 529ffeaced8..09601926a43 100644 --- a/src/rust/engine/workunit_store/src/metrics.rs +++ b/src/rust/engine/workunit_store/src/metrics.rs @@ -40,6 +40,8 @@ pub enum Metric { RemoteCacheRequestsUncached, RemoteCacheReadErrors, RemoteCacheWriteErrors, + RemoteCacheSpeculationLocalCompletedFirst, + RemoteCacheSpeculationRemoteCompletedFirst, RemoteExecutionErrors, RemoteExecutionRequests, RemoteExecutionRPCErrors, @@ -66,6 +68,10 @@ impl Metric { RemoteCacheRequestsUncached => "remote_cache_requests_uncached", RemoteCacheReadErrors => "remote_cache_read_errors", RemoteCacheWriteErrors => "remote_cache_write_errors", + RemoteCacheSpeculationLocalCompletedFirst => "remote_cache_speculation_local_completed_first", + RemoteCacheSpeculationRemoteCompletedFirst => { + "remote_cache_speculation_remote_completed_first" + } RemoteExecutionErrors => "remote_execution_errors", RemoteExecutionRequests => "remote_execution_requests", RemoteExecutionRPCRetries => "remote_execution_rpc_retries",