Skip to content

Commit

Permalink
Fix deadlock between Session and WorkunitStore. (pantsbuild#9959)
Browse files Browse the repository at this point in the history
As described in pantsbuild#9926, we observed a deadlock with:
* One thread in `WorkunitStore::log_workunit_state`, which via our logging mechanism was trying to use the `Session` to write to stderr.
* Another thread in `Session::maybe_display_render` requesting `WorkunitStore::heavy_hitters`.

Do not acquire the `Session` lock in a logging callback (which might occur anywhere at all, and could cause other unidentified lock interleaving): instead, enqueue for `Scheduler::execute` on the main thread to write to the `Session`.

Fixes pantsbuild#9926. The particular deadlock described there is only possible now that we log workunit completions, but if this cherry-picks cleanly to `1.29.x`, we should apply it there as well to prevent any unanticipated interleaving.

[ci skip-jvm-tests]
  • Loading branch information
stuhood committed Jun 5, 2020
1 parent 2f18165 commit e2f9fb7
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 33 deletions.
5 changes: 4 additions & 1 deletion src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ impl From<ExecutionTermination> for RawExecutionTermination {
fn from(et: ExecutionTermination) -> Self {
match et {
ExecutionTermination::KeyboardInterrupt => RawExecutionTermination::KeyboardInterrupt,
ExecutionTermination::Timeout => RawExecutionTermination::Timeout,
ExecutionTermination::PollTimeout => RawExecutionTermination::Timeout,
// NB: 1.29.x does not make it easy to expose an accurate exception here, but this case
// should be "effectively impossible".
ExecutionTermination::Fatal(_) => RawExecutionTermination::Timeout,
}
}
}
Expand Down
16 changes: 10 additions & 6 deletions src/rust/engine/logging/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use uuid::Uuid;

const TIME_FORMAT_STR: &str = "%H:%M:%S:%3f";

pub type StdioHandler = Box<dyn Fn(&str) -> () + Send>;
pub type StdioHandler = Box<dyn Fn(&str) -> Result<(), ()> + Send>;

lazy_static! {
pub static ref LOGGER: Logger = Logger::new();
Expand Down Expand Up @@ -162,14 +162,18 @@ impl Log for Logger {
let log_string: String = format!("{} [{}] {}", cur_time, level, record.args());

{
// If there are no handlers, or sending to any of the handlers failed, send to stderr
// directly.
let handlers_map = self.stderr_handlers.lock();
if handlers_map.len() == 0 {
self.stderr_log.lock().log(record);
} else {
for callback in handlers_map.values() {
callback(&log_string);
let mut any_handler_failed = false;
for callback in handlers_map.values() {
if callback(&log_string).is_err() {
any_handler_failed = true;
}
}
if handlers_map.len() == 0 || any_handler_failed {
self.stderr_log.lock().log(record);
}
}
}
Destination::Pantsd => self.pantsd_log.lock().log(record),
Expand Down
79 changes: 53 additions & 26 deletions src/rust/engine/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,24 @@ use watch::Invalidatable;
use workunit_store::WorkunitStore;

pub enum ExecutionTermination {
// Raised as a vanilla keyboard interrupt on the python side.
KeyboardInterrupt,
Timeout,
// An execute-method specific timeout: raised as PollTimeout.
PollTimeout,
// No clear reason: possibly a panic on a background thread.
Fatal(String),
}

enum ExecutionEvent {
Completed(Vec<ObservedValueResult>),
Stderr(String),
}

type ObservedValueResult = Result<(Value, Option<LastObserved>), Failure>;

// Root requests are limited to Select nodes, which produce (python) Values.
type Root = Select;

///
/// A Session represents a related series of requests (generally: one run of the pants CLI) on an
/// underlying Scheduler, and is a useful scope for metrics.
Expand Down Expand Up @@ -175,13 +189,19 @@ impl Session {
self.should_report_workunits() || self.should_record_zipkin_spans()
}

fn maybe_display_initialize(&self, executor: &Executor) {
fn maybe_display_initialize(&self, executor: &Executor, sender: &mpsc::Sender<ExecutionEvent>) {
if let Some(display) = &self.0.display {
let session = self.clone();
let mut display = display.lock();
let sender = sender.clone();
let res = display.initialize(
executor.clone(),
Box::new(move |msg: &str| session.write_stderr(msg)),
Box::new(move |msg: &str| {
// If we fail to send, it's because the execute loop has exited: we fail the callback to
// have the logging module directly log to stderr at that point.
sender
.send(ExecutionEvent::Stderr(msg.to_owned()))
.map_err(|_| ())
}),
);
if let Err(e) = res {
warn!("{}", e);
Expand Down Expand Up @@ -380,7 +400,7 @@ impl Scheduler {
&self,
request: &ExecutionRequest,
session: &Session,
sender: mpsc::Sender<Vec<ObservedValueResult>>,
sender: mpsc::Sender<ExecutionEvent>,
) {
let context = Context::new(self.core.clone(), session.clone());
let roots = session.zip_last_observed(&request.roots);
Expand All @@ -398,7 +418,8 @@ impl Scheduler {
)
.await;

let _ = sender.send(res);
// The receiver may have gone away due to timeout.
let _ = sender.send(ExecutionEvent::Completed(res));
});
}

Expand Down Expand Up @@ -442,24 +463,37 @@ impl Scheduler {
request.poll
);

// Spawn and wait for all roots to complete. Failure here should be impossible, because each
// individual Future in the join was (eventually) mapped into success.
let (sender, receiver) = mpsc::channel();
self.execute_helper(request, session, sender);

let interval = ConsoleUI::render_interval();
let deadline = request.timeout.map(|timeout| Instant::now() + timeout);

session.maybe_display_initialize(&self.core.executor);
// Spawn and wait for all roots to complete.
let (sender, receiver) = mpsc::channel();
session.maybe_display_initialize(&self.core.executor, &sender);
self.execute_helper(request, session, sender);
let result = loop {
if let Ok(res) = receiver.recv_timeout(Self::refresh_delay(interval, deadline)) {
// Completed successfully.
break Ok(Self::execute_record_results(&request.roots, &session, res));
} else if deadline.map(|d| d < Instant::now()).unwrap_or(false) {
// The timeout on the request has been exceeded.
break Err(ExecutionTermination::Timeout);
match receiver.recv_timeout(Self::refresh_delay(interval, deadline)) {
Ok(ExecutionEvent::Completed(res)) => {
// Completed successfully.
break Ok(Self::execute_record_results(&request.roots, &session, res));
}
Ok(ExecutionEvent::Stderr(stderr)) => {
session.write_stderr(&stderr);
}
Err(mpsc::RecvTimeoutError::Timeout) => {
if deadline.map(|d| d < Instant::now()).unwrap_or(false) {
// The timeout on the request has been exceeded.
break Err(ExecutionTermination::PollTimeout);
} else {
// Just a receive timeout. render and continue.
session.maybe_display_render();
}
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
break Err(ExecutionTermination::Fatal(
"Execution threads exited early.".to_owned(),
));
}
}
session.maybe_display_render();
};
self
.core
Expand All @@ -484,10 +518,3 @@ impl Drop for Scheduler {
self.core.graph.clear();
}
}

///
/// Root requests are limited to Selectors that produce (python) Values.
///
type Root = Select;

pub type ObservedValueResult = Result<(Value, Option<LastObserved>), Failure>;

0 comments on commit e2f9fb7

Please sign in to comment.