Skip to content

Commit

Permalink
Support optionally restarting interactive processes when input files …
Browse files Browse the repository at this point in the history
…change (pantsbuild#13178)

Adds `pex_binary(.., restartable=True)` and `repl --restartable` to optionally allow user processes to be restarted when files change. See `run_integration_test.py` for an example.

`InteractiveProcess` is used for any process that needs to run in the foreground, including those for `run`, and `repl`. To support restarting those processes, we make invoking an `InteractiveProcess` async, which allows it to (optionally) be restarted when files change.

To make `InteractiveProcess` async, we convert it into a rust-side `Intrinsic`, and introduce a new `Effect` awaitable type, which is used in place of `Get` when a type is `SideEffecting`. This also prepares the other `SideEffecting` param types to be converted to `Effect`s as well (in order to accomplish pantsbuild#10542).

Fixes pantsbuild#9462.
  • Loading branch information
stuhood authored Oct 11, 2021
1 parent 06f0727 commit a7619cf
Show file tree
Hide file tree
Showing 37 changed files with 618 additions and 446 deletions.
2 changes: 2 additions & 0 deletions src/python/pants/backend/docker/target_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from textwrap import dedent

from pants.backend.docker.registries import ALL_DEFAULT_REGISTRIES
from pants.core.goals.run import RestartableField
from pants.engine.target import (
COMMON_TARGET_FIELDS,
BoolField,
Expand Down Expand Up @@ -123,5 +124,6 @@ class DockerImage(Target):
DockerRegistriesField,
DockerRepository,
DockerSkipPushField,
RestartableField,
)
help = "A Docker image."
2 changes: 2 additions & 0 deletions src/python/pants/backend/go/target_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import Sequence

from pants.core.goals.package import OutputPathField
from pants.core.goals.run import RestartableField
from pants.engine.addresses import Address
from pants.engine.engine_aware import EngineAwareParameter
from pants.engine.fs import GlobExpansionConjunction, GlobMatchErrorBehavior, PathGlobs
Expand Down Expand Up @@ -288,5 +289,6 @@ class GoBinaryTarget(Target):
OutputPathField,
GoBinaryMainPackageField,
GoBinaryDependenciesField,
RestartableField,
)
help = "A Go binary."
4 changes: 3 additions & 1 deletion src/python/pants/backend/python/goals/pytest_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,9 @@ async def run_python_test(
async def debug_python_test(field_set: PythonTestFieldSet) -> TestDebugRequest:
setup = await Get(TestSetup, TestSetupRequest(field_set, is_debug=True))
return TestDebugRequest(
InteractiveProcess.from_process(setup.process, forward_signals_to_process=False)
InteractiveProcess.from_process(
setup.process, forward_signals_to_process=False, restartable=True
)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from pants.core.util_rules import config_files, distdir
from pants.engine.addresses import Address
from pants.engine.fs import CreateDigest, Digest, DigestContents, FileContent
from pants.engine.process import InteractiveRunner
from pants.engine.rules import Get, rule
from pants.engine.target import Target
from pants.engine.unions import UnionRule
Expand Down Expand Up @@ -108,9 +107,7 @@ def run_pytest(
debug_request = rule_runner.request(TestDebugRequest, inputs)
if debug_request.process is not None:
with mock_console(rule_runner.options_bootstrapper):
debug_result = InteractiveRunner(rule_runner.scheduler, _enforce_effects=False).run(
debug_request.process
)
debug_result = rule_runner.run_interactive_process(debug_request.process)
assert test_result.exit_code == debug_result.exit_code
return test_result

Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/backend/python/target_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pants.backend.python.macros.python_artifact import PythonArtifact
from pants.base.deprecated import warn_or_error
from pants.core.goals.package import OutputPathField
from pants.core.goals.run import RestartableField
from pants.core.goals.test import RuntimePackageDependenciesField
from pants.engine.addresses import Address, Addresses
from pants.engine.target import (
Expand Down Expand Up @@ -523,6 +524,7 @@ class PexBinary(Target):
PexEmitWarningsField,
PexExecutionModeField,
PexIncludeToolsField,
RestartableField,
)
help = (
"A Python target that can be converted into an executable PEX file.\n\nPEX files are "
Expand Down
4 changes: 3 additions & 1 deletion src/python/pants/backend/shell/shunit2_test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ async def run_tests_with_shunit2(
async def setup_shunit2_debug_test(field_set: Shunit2FieldSet) -> TestDebugRequest:
setup = await Get(TestSetup, TestSetupRequest(field_set))
return TestDebugRequest(
InteractiveProcess.from_process(setup.process, forward_signals_to_process=False)
InteractiveProcess.from_process(
setup.process, forward_signals_to_process=False, restartable=True
)
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from pants.engine.addresses import Address
from pants.engine.fs import FileContent
from pants.engine.internals.scheduler import ExecutionError
from pants.engine.process import InteractiveRunner
from pants.engine.target import Target
from pants.testutil.rule_runner import QueryRule, RuleRunner, mock_console

Expand Down Expand Up @@ -96,9 +95,7 @@ def run_shunit2(
debug_request = rule_runner.request(TestDebugRequest, inputs)
if debug_request.process is not None:
with mock_console(rule_runner.options_bootstrapper):
debug_result = InteractiveRunner(rule_runner.scheduler, _enforce_effects=False).run(
debug_request.process
)
debug_result = rule_runner.run_interactive_process(debug_request.process)
assert test_result.exit_code == debug_result.exit_code
return test_result

Expand Down
53 changes: 21 additions & 32 deletions src/python/pants/core/goals/publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,16 @@ async def publish_example(request: PublishToMyRepoRequest, ...) -> PublishProces
from abc import ABCMeta
from dataclasses import dataclass
from itertools import chain
from typing import ClassVar, Generic, Iterable, Type, TypeVar
from typing import ClassVar, Generic, Type, TypeVar

from typing_extensions import final

from pants.core.goals.package import BuiltPackage, PackageFieldSet
from pants.engine.collection import Collection
from pants.engine.console import Console
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.process import InteractiveProcess, InteractiveRunner
from pants.engine.rules import Get, MultiGet, collect_rules, goal_rule, rule
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
from pants.engine.rules import Effect, Get, MultiGet, collect_rules, goal_rule, rule
from pants.engine.target import (
FieldSet,
NoApplicableTargetsBehavior,
Expand Down Expand Up @@ -155,7 +155,7 @@ class Publish(Goal):


@goal_rule
async def run_publish(console: Console, interactive_runner: InteractiveRunner) -> Publish:
async def run_publish(console: Console) -> Publish:
target_roots_to_package_field_sets, target_roots_to_publish_field_sets = await MultiGet(
Get(
TargetRootsToFieldSets,
Expand Down Expand Up @@ -189,41 +189,19 @@ async def run_publish(console: Console, interactive_runner: InteractiveRunner) -
)

# Run all processes interactively.
exit_code, results = run_publish_processes(
console, interactive_runner, chain.from_iterable(processes)
)

console.print_stderr("")
if not results:
sigil = console.sigil_skipped()
console.print_stderr(f"{sigil} Nothing published.")

# We collect all results to the end, so all output from the interactive processes are done,
# before printing the results.
for line in results:
console.print_stderr(line)

return Publish(exit_code)


def run_publish_processes(
console: Console,
interactive_runner: InteractiveRunner,
publish_processes: Iterable[PublishPackages],
) -> tuple[int, list[str]]:
exit_code = 0
output = []
for pub in publish_processes:
results = []
for pub in chain.from_iterable(processes):
if not pub.process:
sigil = console.sigil_skipped()
status = "skipped"
if pub.description:
status += f" {pub.description}"
for name in pub.names:
output.append(f"{sigil} {name} {status}.")
results.append(f"{sigil} {name} {status}.")
continue

res = interactive_runner.run(pub.process)
res = await Effect(InteractiveProcessResult, InteractiveProcess, pub.process)
if res.exit_code == 0:
sigil = console.sigil_succeeded()
status = "published"
Expand All @@ -238,8 +216,19 @@ def run_publish_processes(
status += f" {prep} {pub.description}"

for name in pub.names:
output.append(f"{sigil} {name} {status}.")
return exit_code, output
results.append(f"{sigil} {name} {status}.")

console.print_stderr("")
if not results:
sigil = console.sigil_skipped()
console.print_stderr(f"{sigil} Nothing published.")

# We collect all results to the end, so all output from the interactive processes are done,
# before printing the results.
for line in results:
console.print_stderr(line)

return Publish(exit_code)


@rule
Expand Down
33 changes: 26 additions & 7 deletions src/python/pants/core/goals/repl.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
from pants.engine.environment import CompleteEnvironment
from pants.engine.fs import Digest, Workspace
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.process import InteractiveProcess, InteractiveRunner
from pants.engine.rules import Get, collect_rules, goal_rule
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
from pants.engine.rules import Effect, Get, collect_rules, goal_rule
from pants.engine.target import Targets, TransitiveTargets, TransitiveTargetsRequest
from pants.engine.unions import UnionMembership, union
from pants.option.global_options import GlobalOptions
Expand Down Expand Up @@ -51,13 +51,23 @@ def register_options(cls, register) -> None:
"--shell",
type=str,
default=None,
help="Override the automatically-detected REPL program for the target(s) specified. ",
help="Override the automatically-detected REPL program for the target(s) specified.",
)
register(
"--restartable",
type=bool,
default=False,
help="True if the REPL should be restarted if its inputs have changed.",
)

@property
def shell(self) -> Optional[str]:
return cast(Optional[str], self.options.shell)

@property
def restartable(self) -> bool:
return cast(bool, self.options.restartable)


class Repl(Goal):
subsystem_cls = ReplSubsystem
Expand Down Expand Up @@ -86,7 +96,6 @@ def __init__(
async def run_repl(
console: Console,
workspace: Workspace,
interactive_runner: InteractiveRunner,
repl_subsystem: ReplSubsystem,
all_specified_addresses: Addresses,
build_root: BuildRoot,
Expand Down Expand Up @@ -118,11 +127,21 @@ async def run_repl(
request = await Get(ReplRequest, ReplImplementation, repl_impl)

workspace.write_digest(
request.digest, path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix()
request.digest,
path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix(),
# We don't want to influence whether the InteractiveProcess is able to restart. Because
# we're writing into a temp directory, we can safely mark this side_effecting=False.
side_effecting=False,
)
env = {**complete_env, **request.extra_env}
result = interactive_runner.run(
InteractiveProcess(argv=request.args, env=env, run_in_workspace=True)
result = await Effect(
InteractiveProcessResult,
InteractiveProcess(
argv=request.args,
env=env,
run_in_workspace=True,
restartable=repl_subsystem.restartable,
),
)
return Repl(result.exit_code)

Expand Down
55 changes: 37 additions & 18 deletions src/python/pants/core/goals/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@
from typing import Iterable, Mapping, Optional, Tuple

from pants.base.build_root import BuildRoot
from pants.build_graph.address import Address
from pants.engine.console import Console
from pants.engine.environment import CompleteEnvironment
from pants.engine.fs import Digest, Workspace
from pants.engine.goal import Goal, GoalSubsystem
from pants.engine.process import InteractiveProcess, InteractiveRunner
from pants.engine.rules import Get, collect_rules, goal_rule
from pants.engine.process import InteractiveProcess, InteractiveProcessResult
from pants.engine.rules import Effect, Get, collect_rules, goal_rule
from pants.engine.target import (
BoolField,
FieldSet,
NoApplicableTargetsBehavior,
TargetRootsToFieldSets,
TargetRootsToFieldSetsRequest,
WrappedTarget,
)
from pants.engine.unions import union
from pants.option.custom_types import shell_str
Expand All @@ -32,6 +35,15 @@ class RunFieldSet(FieldSet, metaclass=ABCMeta):
"""The fields necessary from a target to run a program/script."""


class RestartableField(BoolField):
alias = "restartable"
default = False
help = (
"If true, runs of this target with the `run` goal may be interrupted and "
"restarted when its input files change."
)


@frozen_after_init
@dataclass(unsafe_hash=True)
class RunRequest:
Expand All @@ -56,8 +68,12 @@ def __init__(
class RunSubsystem(GoalSubsystem):
name = "run"
help = (
"Runs a binary target.\n\nThis goal propagates the return code of the underlying "
"executable. Run `echo $?` to inspect the resulting return code."
"Runs a binary target.\n\n"
"This goal propagates the return code of the underlying executable.\n\n"
"If your application can safely be restarted while it is running, you can pass "
"`restartable=True` on your binary target (for supported types), and the `run` goal "
"will automatically restart them as all relevant files change. This can be particularly "
"useful for server applications."
)

required_union_implementations = (RunFieldSet,)
Expand Down Expand Up @@ -88,7 +104,6 @@ async def run(
run_subsystem: RunSubsystem,
global_options: GlobalOptions,
console: Console,
interactive_runner: InteractiveRunner,
workspace: Workspace,
build_root: BuildRoot,
complete_env: CompleteEnvironment,
Expand All @@ -104,26 +119,30 @@ async def run(
)
field_set = targets_to_valid_field_sets.field_sets[0]
request = await Get(RunRequest, RunFieldSet, field_set)
wrapped_target = await Get(WrappedTarget, Address, field_set.address)
restartable = wrapped_target.target.get(RestartableField).value

with temporary_dir(root_dir=global_options.options.pants_workdir, cleanup=True) as tmpdir:
workspace.write_digest(
request.digest, path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix()
request.digest,
path_prefix=PurePath(tmpdir).relative_to(build_root.path).as_posix(),
# We don't want to influence whether the InteractiveProcess is able to restart. Because
# we're writing into a temp directory, we can safely mark this side_effecting=False.
side_effecting=False,
)

args = (arg.format(chroot=tmpdir) for arg in request.args)
env = {**complete_env, **{k: v.format(chroot=tmpdir) for k, v in request.extra_env.items()}}
try:
result = interactive_runner.run(
InteractiveProcess(
argv=(*args, *run_subsystem.args),
env=env,
run_in_workspace=True,
)
)
exit_code = result.exit_code
except Exception as e:
console.print_stderr(f"Exception when attempting to run {field_set.address}: {e!r}")
exit_code = -1
result = await Effect(
InteractiveProcessResult,
InteractiveProcess(
argv=(*args, *run_subsystem.args),
env=env,
run_in_workspace=True,
restartable=restartable,
),
)
exit_code = result.exit_code

return Run(exit_code)

Expand Down
Loading

0 comments on commit a7619cf

Please sign in to comment.