Skip to content

Commit

Permalink
Fix async_semaphore deadlock, and only fail speculation if primary co…
Browse files Browse the repository at this point in the history
…mmand fails. (pantsbuild#8374)

### Problem

Speculation should not interfere with a users current experience of pants. Currently if either one of the dispatched commands fails the failure is propagated to the user. Additionally, we've observed a deadlock in the presence of speculation above `BoundedCommandRunner`.

### Solution

Instead of failing fast on the first returned error. If the secondary request fails before the primary request we let the primary request finish and take the result/error from that request. I.e we only use successful results from the secondary.

### Result

If the user has enabled speculation, only failures from the primary command runner, which default to the local machine, will be fatal. Fixes pantsbuild#8089.
  • Loading branch information
Henry Fuller authored and stuhood committed Oct 15, 2019
1 parent ba19f53 commit 0e99324
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 38 deletions.
5 changes: 1 addition & 4 deletions build-support/bin/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,10 @@ def pants_command(
"--no-v1",
"--v2",
"--pants-config-files=pants.remote.ini",
# We turn off speculation to reduce the risk of flakiness, where a test
# passes either locally or remotely and fails in the other environment.
"--process-execution-speculation-strategy=none",
f"--remote-oauth-bearer-token-path={oauth_token_path}",
"test",
*sorted(targets),
],
]
}[self]
if shard is not None and self in [self.v1_no_chroot, self.v1_chroot]: # type: ignore
result.insert(2, f"--test-pytest-test-shard={shard}")
Expand Down
3 changes: 3 additions & 0 deletions pants.remote.ini
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ remote_execution_extra_platform_properties: [
# This should correspond to the number of workers running in Google RBE. See
# https://console.cloud.google.com/apis/api/remotebuildexecution.googleapis.com/quotas?project=pants-remoting-beta&folder&organizationId&duration=PT6H.
process_execution_remote_parallelism: 16
process_execution_speculation_strategy: remote_first
# p95 of RBE appears to be ~ 2 seconds, but we need to factor in local queue time which can be much longer, but no metrics yet.
process_execution_speculation_delay: 15

[python-setup]
# TODO(#7735): This config is not ideal, that we must specify the PATH for both local and remote
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def from_bootstrap_options(cls, bootstrap_options):
process_execution_local_parallelism=multiprocessing.cpu_count()*2,
process_execution_remote_parallelism=128,
process_execution_cleanup_local_dirs=True,
process_execution_speculation_delay=.1,
process_execution_speculation_delay=1,
process_execution_speculation_strategy='local_first',
process_execution_use_local_cache=True,
remote_execution_process_cache_namespace=None,
Expand Down
3 changes: 3 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions src/rust/engine/async_semaphore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,10 @@ publish = false

[dependencies]
futures = "^0.1.16"
log = "0.4"
parking_lot = "0.6"


[dev-dependencies]
tokio-timer = "0.2"
tokio = "0.1"
250 changes: 228 additions & 22 deletions src/rust/engine/async_semaphore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,17 @@ use futures::task::{self, Task};
use futures::{Async, Poll};
use parking_lot::Mutex;

struct Waiter {
id: usize,
task: Task,
}

struct Inner {
waiters: VecDeque<Task>,
waiters: VecDeque<Waiter>,
available_permits: usize,
// Used as the source of id in Waiters's because
// it is monotonically increasing, and only incremented under the mutex lock.
next_waiter_id: usize,
}

#[derive(Clone)]
Expand All @@ -50,10 +58,16 @@ impl AsyncSemaphore {
inner: Arc::new(Mutex::new(Inner {
waiters: VecDeque::new(),
available_permits: permits,
next_waiter_id: 0,
})),
}
}

pub fn num_waiters(&self) -> usize {
let inner = self.inner.lock();
inner.waiters.len()
}

///
/// Runs the given Future-creating function (and the Future it returns) under the semaphore.
///
Expand All @@ -62,11 +76,9 @@ impl AsyncSemaphore {
F: FnOnce() -> B + Send + 'static,
B: Future<Item = T, Error = E> + Send + 'static,
{
let permit = PermitFuture {
inner: Some(self.inner.clone()),
};
Box::new(
permit
self
.acquire()
.map_err(|()| panic!("Acquisition is infalliable."))
.and_then(|permit| {
f().map(move |t| {
Expand All @@ -76,6 +88,13 @@ impl AsyncSemaphore {
}),
)
}

fn acquire(&self) -> PermitFuture {
PermitFuture {
inner: self.inner.clone(),
waiter_id: None,
}
}
}

pub struct Permit {
Expand All @@ -84,43 +103,86 @@ pub struct Permit {

impl Drop for Permit {
fn drop(&mut self) {
let task = {
let mut inner = self.inner.lock();
inner.available_permits += 1;
if let Some(task) = inner.waiters.pop_front() {
task
} else {
return;
}
};
task.notify();
let mut inner = self.inner.lock();
inner.available_permits += 1;
if let Some(waiter) = inner.waiters.front() {
waiter.task.notify()
}
}
}

#[derive(Clone)]
pub struct PermitFuture {
inner: Option<Arc<Mutex<Inner>>>,
inner: Arc<Mutex<Inner>>,
waiter_id: Option<usize>,
}

impl Drop for PermitFuture {
fn drop(&mut self) {
// if task_id is Some then this PermitFuture was added to the waiters queue.
if let Some(waiter_id) = self.waiter_id {
let mut inner = self.inner.lock();
if let Some(waiter_index) = inner
.waiters
.iter()
.position(|waiter| waiter_id == waiter.id)
{
inner.waiters.remove(waiter_index);
}
}
}
}

impl Future for PermitFuture {
type Item = Permit;
type Error = ();

fn poll(&mut self) -> Poll<Permit, ()> {
let inner = self.inner.take().expect("cannot poll PermitFuture twice");
let inner = self.inner.clone();
let acquired = {
let mut inner = inner.lock();
if self.waiter_id.is_none() {
let waiter_id = inner.next_waiter_id;
let this_waiter = Waiter {
id: waiter_id,
task: task::current(),
};
self.waiter_id = Some(waiter_id);
inner.next_waiter_id += 1;
inner.waiters.push_back(this_waiter);
}
if inner.available_permits == 0 {
inner.waiters.push_back(task::current());
false
} else {
inner.available_permits -= 1;
true
let will_issue_permit = {
if let Some(front_waiter) = inner.waiters.front() {
// This task is the one we notified, so remove it. Otherwise keep it on the
// waiters queue so that it doesn't get forgotten.
if front_waiter.id == self.waiter_id.unwrap() {
inner.waiters.pop_front();
// Set the task_id none to indicate that the task is no longer in the
// queue, so we don't have to waste time searching for it in the Drop
// handler.
self.waiter_id = None;
true
} else {
// Don't issue a permit to this task if it isn't at the head of the line,
// we added it as a waiter above.
false
}
} else {
false
}
};
if will_issue_permit {
inner.available_permits -= 1;
}
will_issue_permit
}
};
if acquired {
Ok(Async::Ready(Permit { inner }))
} else {
self.inner = Some(inner);
Ok(Async::NotReady)
}
}
Expand All @@ -133,7 +195,9 @@ mod tests {
use futures::{future, Future};
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
use std::time::{Duration, Instant};

use tokio_timer::Delay;

#[test]
fn acquire_and_release() {
Expand Down Expand Up @@ -194,4 +258,146 @@ mod tests {
.recv_timeout(Duration::from_secs(5))
.expect("thread2 didn't acquire.");
}

#[test]
fn drop_while_waiting() {
// This tests that a task in the waiters queue of the semaphore is removed
// from the queue when the future that is was polling gets dropped.
//
// First we acquire the semaphore with a "process" which hangs until we send
// it a signal via the unblock_thread1 channel. This means that any futures that
// try to acquire the semaphore will be queued up until we unblock thread .
//
// Next we spawn a future on a second thread that tries to acquire the semaphore,
// and get added to the waiters queue, we drop that future after a Delay timer
// completes. The drop should cause the task to be removed from the waiters queue.
//
// Then we spawn a 3rd future that tries to acquire the semaphore but cannot
// because thread1 still has the only permit. After this future is added to the waiters
// we unblock thread1 and wait for a signal from the thread3 that it acquires.
//
// If the SECOND future was not removed from the waiters queue we would not get a signal
// that thread3 acquired the lock because the 2nd task would be blocking the queue trying to
// poll a non existant future.
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let sema = AsyncSemaphore::new(1);
let handle1 = sema.clone();
let handle2 = sema.clone();
let handle3 = sema.clone();

let (tx_thread1, acquired_thread1) = mpsc::channel();
let (unblock_thread1, rx_thread1) = mpsc::channel();
let (tx_inqueue_thread3, queued_thread3) = mpsc::channel();
let (tx_thread3, acquired_thread3) = mpsc::channel();
let (unblock_thread3, rx_thread3) = mpsc::channel();
let (tx_thread2_attempt_1, did_not_acquire_thread2_attempt_1) = mpsc::channel();

runtime.spawn(handle1.with_acquired(move || {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread1.send(()).unwrap();
rx_thread1.recv().unwrap();
future::ok::<_, ()>(())
}));

// Wait for thread1 to acquire, and then launch thread2.
acquired_thread1
.recv_timeout(Duration::from_secs(5))
.expect("thread1 didn't acquire.");

// thread2 will wait for a little while, but then drop its PermitFuture to give up on waiting.
runtime.spawn(future::lazy(move || {
let permit_future = handle2.acquire();
let delay_future = Delay::new(Instant::now() + Duration::from_millis(10));
delay_future
.select2(permit_future)
.map(move |raced_result| {
// We expect to have timed out, because the other Future will not resolve until asked.
match raced_result {
future::Either::B(_) => panic!("Expected to time out."),
future::Either::A(_) => {}
};
tx_thread2_attempt_1.send(()).unwrap();
})
.map_err(|_| panic!("Permit or duration failed."))
}));

runtime.spawn(future::lazy(move || {
tx_inqueue_thread3.send(()).unwrap();
handle3.with_acquired(move || {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread3.send(()).unwrap();
rx_thread3.recv().unwrap();
future::ok::<_, ()>(())
})
}));

queued_thread3
.recv_timeout(Duration::from_secs(5))
.expect("thread3 didn't ever queue up.");

// thread2 should signal that it did not successfully acquire for the first attempt.
did_not_acquire_thread2_attempt_1
.recv_timeout(Duration::from_secs(5))
.expect("thread2 should have failed to acquire by now.");

// Unblock thread1 and confirm that thread3 acquires.
unblock_thread1.send(()).unwrap();
acquired_thread3
.recv_timeout(Duration::from_secs(5))
.expect("thread3 didn't acquire.");
unblock_thread3.send(()).unwrap();
}

#[test]
fn dropped_future_is_removed_from_queue() {
let mut runtime = tokio::runtime::Runtime::new().unwrap();
let sema = AsyncSemaphore::new(1);
let handle1 = sema.clone();
let handle2 = sema.clone();

let (tx_thread1, acquired_thread1) = mpsc::channel();
let (unblock_thread1, rx_thread1) = mpsc::channel();
let (tx_thread2, acquired_thread2) = mpsc::channel();
let (unblock_thread2, rx_thread2) = mpsc::channel();

runtime.spawn(handle1.with_acquired(move || {
// Indicate that we've acquired, and then wait to be signaled to exit.
tx_thread1.send(()).unwrap();
rx_thread1.recv().unwrap();
future::ok::<_, ()>(())
}));

// Wait for thread1 to acquire, and then launch thread2.
acquired_thread1
.recv_timeout(Duration::from_secs(5))
.expect("thread1 didn't acquire.");
let waiter = handle2.with_acquired(move || future::ok::<_, ()>(()));
runtime.spawn(future::ok::<_, ()>(()).select(waiter).then(move |res| {
let mut waiter_fute = match res {
Ok((_, fute)) => fute,
Err(_) => panic!("future::ok is infallible"),
};
// We explicitly poll the future here because the select call resolves
// immediately when called on a future::ok result, and the second future
// is never polled.
let _waiter_res = waiter_fute.poll();
tx_thread2.send(()).unwrap();
rx_thread2.recv().unwrap();
drop(waiter_fute);
tx_thread2.send(()).unwrap();
rx_thread2.recv().unwrap();
future::ok::<_, ()>(())
}));
acquired_thread2
.recv_timeout(Duration::from_secs(5))
.expect("thread2 didn't acquire.");
assert_eq!(1, sema.num_waiters());
unblock_thread2.send(()).unwrap();
acquired_thread2
.recv_timeout(Duration::from_secs(5))
.expect("thread2 didn't drop future.");
assert_eq!(0, sema.num_waiters());
unblock_thread2.send(()).unwrap();
unblock_thread1.send(()).unwrap();
}
}
Loading

0 comments on commit 0e99324

Please sign in to comment.