Skip to content

Commit

Permalink
Make the number of threads used to run @rules in the Engine configura…
Browse files Browse the repository at this point in the history
…ble. (pantsbuild#11325)

### Problem

In situations where multiple (non-`pantsd`) instances of Pants need to run in parallel, it can be useful to constrain each of them to using fewer resources overall. But we had no way to configure the number of threads used in the `tokio` `Runtime` that runs all async tasks (including `@rules`).

### Solution

Make creation of the `Executor` that wraps a `Runtime` explicit, and allow the `core` and `max` threads to be configured.

[ci skip-build-wheels]
  • Loading branch information
stuhood authored Dec 17, 2020
1 parent 0bf6ac6 commit bce20b5
Show file tree
Hide file tree
Showing 20 changed files with 133 additions and 58 deletions.
7 changes: 6 additions & 1 deletion src/python/pants/bin/remote_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

from pants.base.exiter import ExitCode
from pants.engine.internals.native import Native
from pants.engine.internals.native_engine import PyExecutor
from pants.init.options_initializer import OptionsInitializer
from pants.nailgun.nailgun_protocol import NailgunProtocol
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.pantsd.pants_daemon_client import PantsDaemonClient
Expand Down Expand Up @@ -101,6 +103,7 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC
native = Native()

global_options = self._bootstrap_options.for_global_scope()
executor = PyExecutor(*OptionsInitializer.compute_executor_arguments(global_options))

# Merge the nailgun TTY capability environment variables with the passed environment dict.
ng_env = NailgunProtocol.ttynames_to_env(sys.stdin, sys.stdout.buffer, sys.stderr.buffer)
Expand Down Expand Up @@ -128,7 +131,9 @@ def _connect_and_execute(self, pantsd_handle: PantsDaemonClient.Handle) -> ExitC
# We ignore keyboard interrupts because the nailgun client will handle them.
with STTYSettings.preserved(), interrupts_ignored():
try:
return native.new_nailgun_client(port=port).execute(command, args, modified_env)
return native.new_nailgun_client(executor=executor, port=port).execute(
command, args, modified_env
)

# NailgunConnectionException represents a failure connecting to pantsd, so we retry
# up to the retry limit.
Expand Down
16 changes: 9 additions & 7 deletions src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ class Native(metaclass=SingletonMetaclass):
def __init__(self):
self.externs = Externs(self.lib)
self.lib.externs_set(self.externs)
self._executor = PyExecutor()

class BinaryLocationError(Exception):
pass
Expand Down Expand Up @@ -194,17 +193,19 @@ def nailgun_server_await_shutdown(self, nailgun_server) -> None:
Raises an exception if the server exited abnormally
"""
self.lib.nailgun_server_await_shutdown(self._executor, nailgun_server)
self.lib.nailgun_server_await_shutdown(nailgun_server)

def new_nailgun_server(self, port: int, runner: RawFdRunner) -> PyNailgunServer:
def new_nailgun_server(
self, executor: PyExecutor, port: int, runner: RawFdRunner
) -> PyNailgunServer:
"""Creates a nailgun server with a requested port.
Returns the server and the actual port it bound to.
"""
return cast(PyNailgunServer, self.lib.nailgun_server_create(self._executor, port, runner))
return cast(PyNailgunServer, self.lib.nailgun_server_create(executor, port, runner))

def new_nailgun_client(self, port: int) -> PyNailgunClient:
return cast(PyNailgunClient, self.lib.nailgun_client_create(self._executor, port))
def new_nailgun_client(self, executor: PyExecutor, port: int) -> PyNailgunClient:
return cast(PyNailgunClient, self.lib.nailgun_client_create(executor, port))

def new_tasks(self) -> PyTasks:
return PyTasks()
Expand Down Expand Up @@ -240,6 +241,7 @@ def new_scheduler(
ca_certs_path: Optional[str],
ignore_patterns: List[str],
use_gitignore: bool,
executor: PyExecutor,
execution_options,
types: PyTypes,
) -> PyScheduler:
Expand Down Expand Up @@ -286,7 +288,7 @@ def new_scheduler(
return cast(
PyScheduler,
self.lib.scheduler_create(
self._executor,
executor,
tasks,
types,
# Project tree.
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/engine/internals/native_engine.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class PyExecutionStrategyOptions:
def __init__(self, **kwargs: Any) -> None: ...

class PyExecutor:
def __init__(self, **kwargs: Any) -> None: ...
def __init__(self, core_threads: int, max_threads: int) -> None: ...

class PyGeneratorResponseBreak:
def __init__(self, **kwargs: Any) -> None: ...
Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
RemovePrefix,
Snapshot,
)
from pants.engine.internals.native_engine import PySessionCancellationLatch, PyTypes
from pants.engine.internals.native_engine import PyExecutor, PySessionCancellationLatch, PyTypes
from pants.engine.internals.nodes import Return, Throw
from pants.engine.internals.selectors import Params
from pants.engine.internals.session import SessionValues
Expand Down Expand Up @@ -100,6 +100,7 @@ def __init__(
rules: FrozenOrderedSet[Rule],
union_membership: UnionMembership,
execution_options: ExecutionOptions,
executor: PyExecutor,
include_trace_on_error: bool = True,
visualize_to_dir: Optional[str] = None,
validate_reachability: bool = True,
Expand Down Expand Up @@ -161,6 +162,7 @@ def __init__(
ca_certs_path=ca_certs_path,
ignore_patterns=ignore_patterns,
use_gitignore=use_gitignore,
executor=executor,
execution_options=execution_options,
types=types,
)
Expand Down
3 changes: 3 additions & 0 deletions src/python/pants/engine/internals/scheduler_test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from pants.base.file_system_project_tree import FileSystemProjectTree
from pants.engine.internals.native import Native
from pants.engine.internals.native_engine import PyExecutor
from pants.engine.internals.scheduler import Scheduler
from pants.engine.unions import UnionMembership
from pants.option.global_options import DEFAULT_EXECUTION_OPTIONS, ExecutionOptions
Expand All @@ -21,6 +22,7 @@ class SchedulerTestBase:
"""

_native = Native()
_executor = PyExecutor(2, 4)

def _create_work_dir(self):
work_dir = safe_mkdtemp()
Expand Down Expand Up @@ -73,6 +75,7 @@ def mk_scheduler(
ca_certs_path=ca_certs_path,
rules=rules,
union_membership=UnionMembership({}),
executor=self._executor,
execution_options=execution_options or DEFAULT_EXECUTION_OPTIONS,
include_trace_on_error=include_trace_on_error,
)
Expand Down
4 changes: 4 additions & 0 deletions src/python/pants/engine/rules_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.internals.engine_testutil import assert_equal_with_printing
from pants.engine.internals.native import Native
from pants.engine.internals.native_engine import PyExecutor
from pants.engine.internals.scheduler import Scheduler
from pants.engine.internals.selectors import GetConstraints, GetParseError
from pants.engine.rules import (
Expand All @@ -35,6 +36,8 @@
from pants.util.enums import match
from pants.util.logging import LogLevel

_EXECUTOR = PyExecutor(2, 4)


def create_scheduler(rules, validate=True, native=None):
"""Create a Scheduler."""
Expand All @@ -50,6 +53,7 @@ def create_scheduler(rules, validate=True, native=None):
ca_certs_path=None,
rules=rules,
union_membership=UnionMembership({}),
executor=_EXECUTOR,
execution_options=DEFAULT_EXECUTION_OPTIONS,
validate_reachability=validate,
)
Expand Down
11 changes: 9 additions & 2 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from pants.engine.goal import Goal
from pants.engine.internals import build_files, graph, options_parsing
from pants.engine.internals.native import Native
from pants.engine.internals.native_engine import PySessionCancellationLatch
from pants.engine.internals.native_engine import PyExecutor, PySessionCancellationLatch
from pants.engine.internals.parser import Parser
from pants.engine.internals.scheduler import Scheduler, SchedulerSession
from pants.engine.internals.selectors import Params
Expand Down Expand Up @@ -167,14 +167,20 @@ def _make_goal_map_from_rules(rules):
def setup_graph(
options_bootstrapper: OptionsBootstrapper,
build_configuration: BuildConfiguration,
executor: Optional[PyExecutor] = None,
) -> GraphScheduler:
native = Native()
build_root = get_buildroot()
bootstrap_options = options_bootstrapper.bootstrap_options.for_global_scope()
executor = executor or PyExecutor(
*OptionsInitializer.compute_executor_arguments(bootstrap_options)
)
return EngineInitializer.setup_graph_extended(
options_bootstrapper,
build_configuration,
ExecutionOptions.from_bootstrap_options(bootstrap_options),
native=native,
executor=executor,
pants_ignore_patterns=OptionsInitializer.compute_pants_ignore(
build_root, bootstrap_options
),
Expand All @@ -184,7 +190,6 @@ def setup_graph(
named_caches_dir=bootstrap_options.named_caches_dir,
ca_certs_path=bootstrap_options.ca_certs_path,
build_root=build_root,
native=native,
include_trace_on_error=bootstrap_options.print_stacktrace,
)

Expand All @@ -195,6 +200,7 @@ def setup_graph_extended(
execution_options: ExecutionOptions,
native: Native,
*,
executor: PyExecutor,
pants_ignore_patterns: List[str],
use_gitignore: bool,
local_store_dir: str,
Expand Down Expand Up @@ -284,6 +290,7 @@ def ensure_optional_absolute_path(v: Optional[str]) -> Optional[str]:
ca_certs_path=ensure_optional_absolute_path(ca_certs_path),
rules=rules,
union_membership=union_membership,
executor=executor,
execution_options=execution_options,
include_trace_on_error=include_trace_on_error,
visualize_to_dir=bootstrap_options.native_engine_visualize_to,
Expand Down
17 changes: 17 additions & 0 deletions src/python/pants/init/options_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ def setup(self) -> BuildConfiguration:
class OptionsInitializer:
"""Initializes options."""

@staticmethod
def compute_executor_arguments(bootstrap_options: OptionValueContainer) -> Tuple[int, int]:
"""Computes the arguments to construct a PyExecutor.
Does not directly construct a PyExecutor to avoid cycles.
"""
if bootstrap_options.rule_threads_core < 2:
# TODO: This is a defense against deadlocks due to #11329: we only run one `@goal_rule`
# at a time, and a `@goal_rule` will only block one thread.
raise ValueError("--rule-threads-core values less than 2 are not supported.")
rule_threads_max = (
bootstrap_options.rule_threads_max
if bootstrap_options.rule_threads_max
else 4 * bootstrap_options.rule_threads_core
)
return bootstrap_options.rule_threads_core, rule_threads_max

@staticmethod
def compute_pants_ignore(buildroot, global_options):
"""Computes the merged value of the `--pants-ignore` flag.
Expand Down
35 changes: 32 additions & 3 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright 2014 Pants project contributors (see CONTRIBUTORS.md).
# Licensed under the Apache License, Version 2.0 (see LICENSE).

import multiprocessing
import os
import sys
import tempfile
Expand Down Expand Up @@ -124,6 +123,9 @@ def from_bootstrap_options(cls, bootstrap_options):
)


_CPU_COUNT = len(os.sched_getaffinity(0)) if hasattr(os, "sched_getaffinity") else os.cpu_count()


DEFAULT_EXECUTION_OPTIONS = ExecutionOptions(
remote_execution=False,
remote_store_server=[],
Expand All @@ -133,7 +135,7 @@ def from_bootstrap_options(cls, bootstrap_options):
remote_store_chunk_upload_timeout_seconds=60,
remote_store_rpc_retries=2,
remote_store_connection_limit=5,
process_execution_local_parallelism=multiprocessing.cpu_count(),
process_execution_local_parallelism=_CPU_COUNT,
process_execution_remote_parallelism=128,
process_execution_cache_namespace=None,
process_execution_cleanup_local_dirs=True,
Expand Down Expand Up @@ -547,6 +549,33 @@ def register_bootstrap_options(cls, register):
"Pants's own code, plugins, and `--pants-config-files` are inherently invalidated.",
)

process_execution_local_parallelism = "--process-execution-local-parallelism"
rule_threads_core = "--rule-threads-core"
rule_threads_max = "--rule-threads-max"

register(
rule_threads_core,
type=int,
default=max(2, _CPU_COUNT // 2),
advanced=True,
help=(
"The number of threads to keep active and ready to execute `@rule` logic (see "
f"also: `{rule_threads_max}`). Values less than 2 are not currently supported. "
"This value is independent of the number of processes that may be spawned in "
f"parallel locally (controlled by `{process_execution_local_parallelism}`)."
),
)
register(
rule_threads_max,
type=int,
default=None,
advanced=True,
help=(
"The maximum number of threads to use to execute `@rule` logic. Defaults to "
f"a small multiple of `{rule_threads_core}`."
),
)

cache_instructions = (
"The path may be absolute or relative. If the directory is within the build root, be "
"sure to include it in `--pants-ignore`."
Expand Down Expand Up @@ -605,7 +634,7 @@ def register_bootstrap_options(cls, register):
)

register(
"--process-execution-local-parallelism",
process_execution_local_parallelism,
type=int,
default=DEFAULT_EXECUTION_OPTIONS.process_execution_local_parallelism,
advanced=True,
Expand Down
7 changes: 6 additions & 1 deletion src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from pants.base.exception_sink import ExceptionSink
from pants.bin.daemon_pants_runner import DaemonPantsRunner
from pants.engine.internals.native import Native
from pants.engine.internals.native_engine import PyExecutor
from pants.init.engine_initializer import GraphScheduler
from pants.init.logging import setup_logging, setup_logging_to_file
from pants.init.options_initializer import OptionsInitializer
Expand Down Expand Up @@ -57,9 +58,13 @@ def create(cls, options_bootstrapper: OptionsBootstrapper) -> PantsDaemon:
native = Native()
native.override_thread_logging_destination_to_just_pantsd()

core = PantsDaemonCore(cls._setup_services)
executor = PyExecutor(
*OptionsInitializer.compute_executor_arguments(bootstrap_options_values)
)
core = PantsDaemonCore(executor, cls._setup_services)

server = native.new_nailgun_server(
executor,
bootstrap_options_values.pantsd_pailgun_port,
DaemonPantsRunner(core),
)
Expand Down
8 changes: 6 additions & 2 deletions src/python/pants/pantsd/pants_daemon_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from typing_extensions import Protocol

from pants.engine.internals.native_engine import PyExecutor
from pants.init.engine_initializer import EngineInitializer, GraphScheduler
from pants.init.options_initializer import BuildConfigInitializer
from pants.option.option_value_container import OptionValueContainer
Expand Down Expand Up @@ -35,7 +36,8 @@ class PantsDaemonCore:
PantsServices.
"""

def __init__(self, services_constructor: PantsServicesConstructor):
def __init__(self, executor: PyExecutor, services_constructor: PantsServicesConstructor):
self._executor = executor
self._services_constructor = services_constructor
self._lifecycle_lock = threading.RLock()
# N.B. This Event is used as nothing more than an atomic flag - nothing waits on it.
Expand Down Expand Up @@ -74,7 +76,9 @@ def _init_scheduler(
if self._services:
self._services.shutdown()
build_config = BuildConfigInitializer.get(options_bootstrapper)
self._scheduler = EngineInitializer.setup_graph(options_bootstrapper, build_config)
self._scheduler = EngineInitializer.setup_graph(
options_bootstrapper, build_config, executor=self._executor
)
bootstrap_options_values = options_bootstrapper.bootstrap_options.for_global_scope()

self._services = self._services_constructor(bootstrap_options_values, self._scheduler)
Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/testutil/rule_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import multiprocessing
import os
from dataclasses import dataclass
from io import StringIO
Expand Down Expand Up @@ -36,6 +37,7 @@
from pants.engine.fs import PathGlobs, PathGlobsAndRoot, Snapshot, Workspace
from pants.engine.goal import Goal
from pants.engine.internals.native import Native
from pants.engine.internals.native_engine import PyExecutor
from pants.engine.internals.scheduler import SchedulerSession
from pants.engine.internals.selectors import Get, Params
from pants.engine.internals.session import SessionValues
Expand Down Expand Up @@ -69,6 +71,9 @@
_O = TypeVar("_O")


_EXECUTOR = PyExecutor(multiprocessing.cpu_count(), multiprocessing.cpu_count() * 4)


@dataclass(frozen=True)
class GoalRuleResult:
exit_code: int
Expand Down Expand Up @@ -143,6 +148,7 @@ def __init__(
options_bootstrapper=options_bootstrapper,
build_root=self.build_root,
build_configuration=self.build_config,
executor=_EXECUTOR,
execution_options=ExecutionOptions.from_bootstrap_options(global_options),
ca_certs_path=ca_certs_path,
).new_session(
Expand Down
Loading

0 comments on commit bce20b5

Please sign in to comment.