diff --git a/build-support/bin/ci.py b/build-support/bin/ci.py index df6837b0de0..8fbb09c4d30 100755 --- a/build-support/bin/ci.py +++ b/build-support/bin/ci.py @@ -223,13 +223,10 @@ def pants_command( "--no-v1", "--v2", "--pants-config-files=pants.remote.ini", - # We turn off speculation to reduce the risk of flakiness, where a test - # passes either locally or remotely and fails in the other environment. - "--process-execution-speculation-strategy=none", f"--remote-oauth-bearer-token-path={oauth_token_path}", "test", *sorted(targets), - ], + ] }[self] if shard is not None and self in [self.v1_no_chroot, self.v1_chroot]: # type: ignore result.insert(2, f"--test-pytest-test-shard={shard}") diff --git a/pants.remote.ini b/pants.remote.ini index 18370055a5a..c993ad7cd96 100644 --- a/pants.remote.ini +++ b/pants.remote.ini @@ -26,6 +26,9 @@ remote_execution_extra_platform_properties: [ # This should correspond to the number of workers running in Google RBE. See # https://console.cloud.google.com/apis/api/remotebuildexecution.googleapis.com/quotas?project=pants-remoting-beta&folder&organizationId&duration=PT6H. process_execution_remote_parallelism: 16 +process_execution_speculation_strategy: remote_first +# p95 of RBE appears to be ~ 2 seconds, but we need to factor in local queue time which can be much longer, but no metrics yet. +process_execution_speculation_delay: 15 [python-setup] # TODO(#7735): This config is not ideal, that we must specify the PATH for both local and remote diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 465a7761eaa..51317d3ba22 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -100,7 +100,7 @@ def from_bootstrap_options(cls, bootstrap_options): process_execution_local_parallelism=multiprocessing.cpu_count()*2, process_execution_remote_parallelism=128, process_execution_cleanup_local_dirs=True, - process_execution_speculation_delay=.1, + process_execution_speculation_delay=1, process_execution_speculation_strategy='local_first', process_execution_use_local_cache=True, remote_execution_process_cache_namespace=None, diff --git a/src/rust/engine/Cargo.lock b/src/rust/engine/Cargo.lock index 990f5c6ea17..901bddf12e9 100644 --- a/src/rust/engine/Cargo.lock +++ b/src/rust/engine/Cargo.lock @@ -48,7 +48,10 @@ name = "async_semaphore" version = "0.0.1" dependencies = [ "futures 0.1.28 (registry+https://github.com/rust-lang/crates.io-index)", + "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)", + "tokio-timer 0.2.11 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] diff --git a/src/rust/engine/async_semaphore/Cargo.toml b/src/rust/engine/async_semaphore/Cargo.toml index faed1e3cb16..e5be635e04d 100644 --- a/src/rust/engine/async_semaphore/Cargo.toml +++ b/src/rust/engine/async_semaphore/Cargo.toml @@ -7,4 +7,10 @@ publish = false [dependencies] futures = "^0.1.16" +log = "0.4" parking_lot = "0.6" + + +[dev-dependencies] +tokio-timer = "0.2" +tokio = "0.1" diff --git a/src/rust/engine/async_semaphore/src/lib.rs b/src/rust/engine/async_semaphore/src/lib.rs index a9becedc8b7..530890e5bdb 100644 --- a/src/rust/engine/async_semaphore/src/lib.rs +++ b/src/rust/engine/async_semaphore/src/lib.rs @@ -34,9 +34,17 @@ use futures::task::{self, Task}; use futures::{Async, Poll}; use parking_lot::Mutex; +struct Waiter { + id: usize, + task: Task, +} + struct Inner { - waiters: VecDeque, + waiters: VecDeque, available_permits: usize, + // Used as the source of id in Waiters's because + // it is monotonically increasing, and only incremented under the mutex lock. + next_waiter_id: usize, } #[derive(Clone)] @@ -50,10 +58,16 @@ impl AsyncSemaphore { inner: Arc::new(Mutex::new(Inner { waiters: VecDeque::new(), available_permits: permits, + next_waiter_id: 0, })), } } + pub fn num_waiters(&self) -> usize { + let inner = self.inner.lock(); + inner.waiters.len() + } + /// /// Runs the given Future-creating function (and the Future it returns) under the semaphore. /// @@ -62,11 +76,9 @@ impl AsyncSemaphore { F: FnOnce() -> B + Send + 'static, B: Future + Send + 'static, { - let permit = PermitFuture { - inner: Some(self.inner.clone()), - }; Box::new( - permit + self + .acquire() .map_err(|()| panic!("Acquisition is infalliable.")) .and_then(|permit| { f().map(move |t| { @@ -76,6 +88,13 @@ impl AsyncSemaphore { }), ) } + + fn acquire(&self) -> PermitFuture { + PermitFuture { + inner: self.inner.clone(), + waiter_id: None, + } + } } pub struct Permit { @@ -84,21 +103,34 @@ pub struct Permit { impl Drop for Permit { fn drop(&mut self) { - let task = { - let mut inner = self.inner.lock(); - inner.available_permits += 1; - if let Some(task) = inner.waiters.pop_front() { - task - } else { - return; - } - }; - task.notify(); + let mut inner = self.inner.lock(); + inner.available_permits += 1; + if let Some(waiter) = inner.waiters.front() { + waiter.task.notify() + } } } +#[derive(Clone)] pub struct PermitFuture { - inner: Option>>, + inner: Arc>, + waiter_id: Option, +} + +impl Drop for PermitFuture { + fn drop(&mut self) { + // if task_id is Some then this PermitFuture was added to the waiters queue. + if let Some(waiter_id) = self.waiter_id { + let mut inner = self.inner.lock(); + if let Some(waiter_index) = inner + .waiters + .iter() + .position(|waiter| waiter_id == waiter.id) + { + inner.waiters.remove(waiter_index); + } + } + } } impl Future for PermitFuture { @@ -106,21 +138,51 @@ impl Future for PermitFuture { type Error = (); fn poll(&mut self) -> Poll { - let inner = self.inner.take().expect("cannot poll PermitFuture twice"); + let inner = self.inner.clone(); let acquired = { let mut inner = inner.lock(); + if self.waiter_id.is_none() { + let waiter_id = inner.next_waiter_id; + let this_waiter = Waiter { + id: waiter_id, + task: task::current(), + }; + self.waiter_id = Some(waiter_id); + inner.next_waiter_id += 1; + inner.waiters.push_back(this_waiter); + } if inner.available_permits == 0 { - inner.waiters.push_back(task::current()); false } else { - inner.available_permits -= 1; - true + let will_issue_permit = { + if let Some(front_waiter) = inner.waiters.front() { + // This task is the one we notified, so remove it. Otherwise keep it on the + // waiters queue so that it doesn't get forgotten. + if front_waiter.id == self.waiter_id.unwrap() { + inner.waiters.pop_front(); + // Set the task_id none to indicate that the task is no longer in the + // queue, so we don't have to waste time searching for it in the Drop + // handler. + self.waiter_id = None; + true + } else { + // Don't issue a permit to this task if it isn't at the head of the line, + // we added it as a waiter above. + false + } + } else { + false + } + }; + if will_issue_permit { + inner.available_permits -= 1; + } + will_issue_permit } }; if acquired { Ok(Async::Ready(Permit { inner })) } else { - self.inner = Some(inner); Ok(Async::NotReady) } } @@ -133,7 +195,9 @@ mod tests { use futures::{future, Future}; use std::sync::mpsc; use std::thread; - use std::time::Duration; + use std::time::{Duration, Instant}; + + use tokio_timer::Delay; #[test] fn acquire_and_release() { @@ -194,4 +258,146 @@ mod tests { .recv_timeout(Duration::from_secs(5)) .expect("thread2 didn't acquire."); } + + #[test] + fn drop_while_waiting() { + // This tests that a task in the waiters queue of the semaphore is removed + // from the queue when the future that is was polling gets dropped. + // + // First we acquire the semaphore with a "process" which hangs until we send + // it a signal via the unblock_thread1 channel. This means that any futures that + // try to acquire the semaphore will be queued up until we unblock thread . + // + // Next we spawn a future on a second thread that tries to acquire the semaphore, + // and get added to the waiters queue, we drop that future after a Delay timer + // completes. The drop should cause the task to be removed from the waiters queue. + // + // Then we spawn a 3rd future that tries to acquire the semaphore but cannot + // because thread1 still has the only permit. After this future is added to the waiters + // we unblock thread1 and wait for a signal from the thread3 that it acquires. + // + // If the SECOND future was not removed from the waiters queue we would not get a signal + // that thread3 acquired the lock because the 2nd task would be blocking the queue trying to + // poll a non existant future. + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let sema = AsyncSemaphore::new(1); + let handle1 = sema.clone(); + let handle2 = sema.clone(); + let handle3 = sema.clone(); + + let (tx_thread1, acquired_thread1) = mpsc::channel(); + let (unblock_thread1, rx_thread1) = mpsc::channel(); + let (tx_inqueue_thread3, queued_thread3) = mpsc::channel(); + let (tx_thread3, acquired_thread3) = mpsc::channel(); + let (unblock_thread3, rx_thread3) = mpsc::channel(); + let (tx_thread2_attempt_1, did_not_acquire_thread2_attempt_1) = mpsc::channel(); + + runtime.spawn(handle1.with_acquired(move || { + // Indicate that we've acquired, and then wait to be signaled to exit. + tx_thread1.send(()).unwrap(); + rx_thread1.recv().unwrap(); + future::ok::<_, ()>(()) + })); + + // Wait for thread1 to acquire, and then launch thread2. + acquired_thread1 + .recv_timeout(Duration::from_secs(5)) + .expect("thread1 didn't acquire."); + + // thread2 will wait for a little while, but then drop its PermitFuture to give up on waiting. + runtime.spawn(future::lazy(move || { + let permit_future = handle2.acquire(); + let delay_future = Delay::new(Instant::now() + Duration::from_millis(10)); + delay_future + .select2(permit_future) + .map(move |raced_result| { + // We expect to have timed out, because the other Future will not resolve until asked. + match raced_result { + future::Either::B(_) => panic!("Expected to time out."), + future::Either::A(_) => {} + }; + tx_thread2_attempt_1.send(()).unwrap(); + }) + .map_err(|_| panic!("Permit or duration failed.")) + })); + + runtime.spawn(future::lazy(move || { + tx_inqueue_thread3.send(()).unwrap(); + handle3.with_acquired(move || { + // Indicate that we've acquired, and then wait to be signaled to exit. + tx_thread3.send(()).unwrap(); + rx_thread3.recv().unwrap(); + future::ok::<_, ()>(()) + }) + })); + + queued_thread3 + .recv_timeout(Duration::from_secs(5)) + .expect("thread3 didn't ever queue up."); + + // thread2 should signal that it did not successfully acquire for the first attempt. + did_not_acquire_thread2_attempt_1 + .recv_timeout(Duration::from_secs(5)) + .expect("thread2 should have failed to acquire by now."); + + // Unblock thread1 and confirm that thread3 acquires. + unblock_thread1.send(()).unwrap(); + acquired_thread3 + .recv_timeout(Duration::from_secs(5)) + .expect("thread3 didn't acquire."); + unblock_thread3.send(()).unwrap(); + } + + #[test] + fn dropped_future_is_removed_from_queue() { + let mut runtime = tokio::runtime::Runtime::new().unwrap(); + let sema = AsyncSemaphore::new(1); + let handle1 = sema.clone(); + let handle2 = sema.clone(); + + let (tx_thread1, acquired_thread1) = mpsc::channel(); + let (unblock_thread1, rx_thread1) = mpsc::channel(); + let (tx_thread2, acquired_thread2) = mpsc::channel(); + let (unblock_thread2, rx_thread2) = mpsc::channel(); + + runtime.spawn(handle1.with_acquired(move || { + // Indicate that we've acquired, and then wait to be signaled to exit. + tx_thread1.send(()).unwrap(); + rx_thread1.recv().unwrap(); + future::ok::<_, ()>(()) + })); + + // Wait for thread1 to acquire, and then launch thread2. + acquired_thread1 + .recv_timeout(Duration::from_secs(5)) + .expect("thread1 didn't acquire."); + let waiter = handle2.with_acquired(move || future::ok::<_, ()>(())); + runtime.spawn(future::ok::<_, ()>(()).select(waiter).then(move |res| { + let mut waiter_fute = match res { + Ok((_, fute)) => fute, + Err(_) => panic!("future::ok is infallible"), + }; + // We explicitly poll the future here because the select call resolves + // immediately when called on a future::ok result, and the second future + // is never polled. + let _waiter_res = waiter_fute.poll(); + tx_thread2.send(()).unwrap(); + rx_thread2.recv().unwrap(); + drop(waiter_fute); + tx_thread2.send(()).unwrap(); + rx_thread2.recv().unwrap(); + future::ok::<_, ()>(()) + })); + acquired_thread2 + .recv_timeout(Duration::from_secs(5)) + .expect("thread2 didn't acquire."); + assert_eq!(1, sema.num_waiters()); + unblock_thread2.send(()).unwrap(); + acquired_thread2 + .recv_timeout(Duration::from_secs(5)) + .expect("thread2 didn't drop future."); + assert_eq!(0, sema.num_waiters()); + unblock_thread2.send(()).unwrap(); + unblock_thread1.send(()).unwrap(); + } } diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 7288d80ff1f..f7a9a45c16b 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -263,6 +263,10 @@ pub trait CommandRunner: Send + Sync { &self, req: &MultiPlatformExecuteProcessRequest, ) -> Option; + + fn num_waiters(&self) -> usize { + panic!("This method is abstract and not implemented for this type") + } } /// @@ -282,6 +286,10 @@ impl BoundedCommandRunner { } impl CommandRunner for BoundedCommandRunner { + fn num_waiters(&self) -> usize { + self.inner.1.num_waiters() + } + fn run( &self, req: MultiPlatformExecuteProcessRequest, diff --git a/src/rust/engine/process_execution/src/speculate.rs b/src/rust/engine/process_execution/src/speculate.rs index ea1efe0f88b..517fb6ac63e 100644 --- a/src/rust/engine/process_execution/src/speculate.rs +++ b/src/rust/engine/process_execution/src/speculate.rs @@ -3,7 +3,8 @@ use crate::{ MultiPlatformExecuteProcessRequest, }; use boxfuture::{BoxFuture, Boxable}; -use futures::future::{err, ok, Future}; +use futures::future::{err, ok, Either, Future}; +use log::{debug, trace}; use std::sync::Arc; use std::time::{Duration, Instant}; use tokio_timer::Delay; @@ -33,19 +34,50 @@ impl SpeculatingCommandRunner { req: MultiPlatformExecuteProcessRequest, context: Context, ) -> BoxFuture { - let command_runner = self.clone(); - let req_2 = req.clone(); let delay = Delay::new(Instant::now() + self.speculation_timeout); + let req2 = req.clone(); + trace!( + "Primary command runner queue length: {:?}", + self.primary.num_waiters() + ); self .primary .run(req, context.clone()) - .select(delay.then(move |_| command_runner.secondary.run(req_2, context))) + .select2({ + let command_runner = self.clone(); + delay.then(move |_| { + trace!( + "Secondary command runner queue length: {:?}", + command_runner.secondary.num_waiters() + ); + command_runner.secondary.run(req2, context) + }) + }) .then(|raced_result| match raced_result { - Ok((successful_res, _outstanding_req)) => { - ok::(successful_res).to_boxed() + Ok(either_success) => { + // .split() takes out the homogeneous success type for either primary or + // secondary successes. + ok::(either_success.split().0).to_boxed() } - Err((failed_res, _outstanding_req)) => { - err::(failed_res).to_boxed() + Err(Either::A((failed_primary_res, _))) => { + debug!("primary request FAILED, aborting"); + err::(failed_primary_res).to_boxed() + } + // We handle the case of the secondary failing specially. We only want to show + // a failure to the user if the primary execution source fails. This maintains + // feel between speculation on and off states. + Err(Either::B((_failed_secondary_res, outstanding_primary_request))) => { + debug!("secondary request FAILED, waiting for primary!"); + outstanding_primary_request + .then(|primary_result| { + if primary_result.is_ok() { + debug!("primary request eventually SUCCEEDED after secondary failed"); + } else { + debug!("primary request eventually FAILED after secondary failed"); + } + primary_result + }) + .to_boxed() } }) .to_boxed() @@ -152,12 +184,12 @@ mod tests { } #[test] - fn second_req_fast_fail() { + fn only_fail_on_primary_result() { let (result, call_counter, finished_counter) = run_speculation_test(1000, 0, 100, true, true, true, true); assert_eq![2, *call_counter.lock().unwrap()]; - assert_eq![1, *finished_counter.lock().unwrap()]; - assert_eq![result.unwrap_err(), Bytes::from("m2")] + assert_eq![2, *finished_counter.lock().unwrap()]; + assert_eq![result.unwrap_err(), Bytes::from("m1")] } #[test]