Skip to content

Commit

Permalink
Always explicitly shutdown executors (pantsbuild#18216)
Browse files Browse the repository at this point in the history
Always explicitly shutdown executors, to avoid them being dropped on arbitrary threads (including under the GIL).

Fixes pantsbuild#18211.
  • Loading branch information
stuhood authored Feb 9, 2023
1 parent 2983f16 commit 34b2832
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 14 deletions.
2 changes: 1 addition & 1 deletion src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ def create(
bootstrap_options = options.bootstrap_option_values()
assert bootstrap_options is not None
scheduler = EngineInitializer.setup_graph(
bootstrap_options, build_config, dynamic_remote_options
bootstrap_options, build_config, dynamic_remote_options, executor
)
with options_initializer.handle_unknown_flags(options_bootstrapper, env, raise_=True):
global_options = options.for_global_scope()
Expand Down
20 changes: 14 additions & 6 deletions src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@
from typing import List, Mapping

from pants.base.exiter import ExitCode
from pants.engine.internals.native_engine import PantsdConnectionException, PyNailgunClient
from pants.engine.internals.native_engine import (
PantsdConnectionException,
PyExecutor,
PyNailgunClient,
)
from pants.option.global_options import GlobalOptions
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.pants_daemon_client import PantsDaemonClient
Expand Down Expand Up @@ -114,15 +118,19 @@ def run(self, start_time: float) -> ExitCode:
pantsd_handle = self._client.maybe_launch()
logger.debug(f"Connecting to pantsd on port {pantsd_handle.port}")

return self._connect_and_execute(pantsd_handle, start_time)
executor = GlobalOptions.create_py_executor(self._bootstrap_options.for_global_scope())
try:
return self._connect_and_execute(pantsd_handle, executor, start_time)
finally:
executor.shutdown(3)

def _connect_and_execute(
self, pantsd_handle: PantsDaemonClient.Handle, start_time: float
self,
pantsd_handle: PantsDaemonClient.Handle,
executor: PyExecutor,
start_time: float,
) -> ExitCode:
global_options = self._bootstrap_options.for_global_scope()
# We do not explicitly shut this PyExecutor down, because the client should not run any long lived
# tasks which we would want to wait for (in particular: it runs no Python code). See #16105.
executor = GlobalOptions.create_py_executor(global_options)

# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = ttynames_to_env(sys.stdin, sys.stdout, sys.stderr)
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ def setup_graph(
bootstrap_options: OptionValueContainer,
build_configuration: BuildConfiguration,
dynamic_remote_options: DynamicRemoteOptions,
executor: PyExecutor | None = None,
executor: PyExecutor,
is_bootstrap: bool = False,
) -> GraphScheduler:
build_root = get_buildroot()
Expand Down
4 changes: 2 additions & 2 deletions src/rust/engine/src/externs/nailgun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ struct PyNailgunClient {
#[pymethods]
impl PyNailgunClient {
#[new]
fn __new__(port: u16, py_executor: PyExecutor) -> Self {
fn __new__(port: u16, py_executor: &PyExecutor) -> Self {
Self {
port,
executor: py_executor.0,
executor: py_executor.0.clone(),
}
}

Expand Down
16 changes: 13 additions & 3 deletions src/rust/engine/src/externs/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ extern "C" {
}

#[pyclass]
#[derive(Debug, Clone)]
#[derive(Debug)]
pub struct PyExecutor(pub task_executor::Executor);

#[pymethods]
Expand Down Expand Up @@ -52,7 +52,17 @@ impl PyExecutor {

/// Shut down this executor, waiting for all tasks to exit. Any tasks which have not exited at
/// the end of the timeout will be leaked.
fn shutdown(&self, duration_secs: f64) {
self.0.shutdown(Duration::from_secs_f64(duration_secs))
fn shutdown(&self, py: Python, duration_secs: f64) {
py.allow_threads(|| self.0.shutdown(Duration::from_secs_f64(duration_secs)))
}
}

impl Drop for PyExecutor {
fn drop(&mut self) {
if !self.0.is_shutdown() {
// This can lead to hangs, since `Drop` will run on an arbitrary thread under arbitrary
// locks. See #18211.
log::warn!("Executor was not shut down explicitly.");
}
}
}
2 changes: 1 addition & 1 deletion src/rust/engine/src/externs/testutil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl PyStubCASBuilder {
Ok(PyStubCASBuilder(self.0.clone()))
}

fn build(&mut self, py_executor: PyExecutor) -> PyResult<PyStubCAS> {
fn build(&mut self, py_executor: &PyExecutor) -> PyResult<PyStubCAS> {
let mut builder_opt = self.0.lock();
let builder = builder_opt
.take()
Expand Down
6 changes: 6 additions & 0 deletions src/rust/engine/task_executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ impl Executor {
log::warn!("Executor shutdown took unexpectedly long: tasks were likely leaked!");
}
}

/// Returns true if `shutdown` has been called for this Executor. Always returns true for
/// borrowed Executors.
pub fn is_shutdown(&self) -> bool {
self.runtime.lock().is_none()
}
}

/// Store "tail" tasks which are async tasks that can execute concurrently with regular
Expand Down

0 comments on commit 34b2832

Please sign in to comment.