Skip to content

Commit

Permalink
[pantsd] Daemon lifecycle invalidation on configurable glob watches. (p…
Browse files Browse the repository at this point in the history
…antsbuild#5550)

A quick fix for loose source plugin invalidation with the daemon.
  • Loading branch information
kwlzn authored Mar 7, 2018
1 parent 1667c13 commit f2bee66
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 26 deletions.
2 changes: 2 additions & 0 deletions src/python/pants/option/global_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ def register_bootstrap_options(cls, register):
help='The directory to log pantsd output to.')
register('--pantsd-fs-event-workers', advanced=True, type=int, default=4,
help='The number of workers to use for the filesystem event service executor pool.')
register('--pantsd-invalidation-globs', advanced=True, type=list, fromfile=True, default=[],
help='Filesystem events matching any of these globs will trigger a daemon restart.')

# Watchman options.
register('--watchman-version', advanced=True, default='4.9.0-pants1', help='Watchman version.')
Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/option/options_fingerprinter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@

from pants.base.build_environment import get_buildroot
from pants.base.hash_utils import stable_json_hash
from pants.option.custom_types import (UnsetBool, dict_with_files_option, dir_option,
file_option, target_option)
from pants.option.custom_types import (UnsetBool, dict_with_files_option, dir_option, file_option,
target_option)


class Encoder(json.JSONEncoder):
Expand Down
16 changes: 14 additions & 2 deletions src/python/pants/pantsd/pants_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,27 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_helper, watchman
:returns: A tuple of (`tuple` service_instances, `dict` port_map).
"""
fs_event_service = FSEventService(watchman, build_root, bootstrap_options.pantsd_fs_event_workers)
scheduler_service = SchedulerService(fs_event_service, legacy_graph_helper)
fs_event_service = FSEventService(
watchman,
build_root,
bootstrap_options.pantsd_fs_event_workers
)

scheduler_service = SchedulerService(
fs_event_service,
legacy_graph_helper,
build_root,
bootstrap_options.pantsd_invalidation_globs
)

pailgun_service = PailgunService(
bind_addr=(bootstrap_options.pantsd_pailgun_host, bootstrap_options.pantsd_pailgun_port),
exiter_class=DaemonExiter,
runner_class=DaemonPantsRunner,
target_roots_class=TargetRoots,
scheduler_service=scheduler_service
)

store_gc_service = StoreGCService(legacy_graph_helper.scheduler)

return (
Expand Down
2 changes: 1 addition & 1 deletion src/python/pants/pantsd/service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ python_library(
name = 'scheduler_service',
sources = ['scheduler_service.py'],
dependencies = [
'3rdparty/python:six',
'3rdparty/python/twitter/commons:twitter.common.dirutil',
':pants_service'
]
)
Expand Down
33 changes: 32 additions & 1 deletion src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import Queue
import threading

from twitter.common.dirutil import Fileset

from pants.pantsd.service.pants_service import PantsService


Expand All @@ -22,41 +24,70 @@ class SchedulerService(PantsService):

QUEUE_SIZE = 64

def __init__(self, fs_event_service, legacy_graph_helper):
def __init__(self, fs_event_service, legacy_graph_helper, build_root, invalidation_globs):
"""
:param FSEventService fs_event_service: An unstarted FSEventService instance for setting up
filesystem event handlers.
:param LegacyGraphHelper legacy_graph_helper: The LegacyGraphHelper instance for graph
construction.
:param str build_root: The current build root.
:param list invalidation_globs: A list of `globs` that when encountered in filesystem event
subscriptions will tear down the daemon.
"""
super(SchedulerService, self).__init__()
self._fs_event_service = fs_event_service
self._graph_helper = legacy_graph_helper
self._invalidation_globs = invalidation_globs
self._build_root = build_root

self._scheduler = legacy_graph_helper.scheduler
self._logger = logging.getLogger(__name__)
self._event_queue = Queue.Queue(maxsize=self.QUEUE_SIZE)
self._watchman_is_running = threading.Event()
self._invalidating_files = set()

@property
def change_calculator(self):
"""Surfaces the change calculator."""
return self._graph_helper.change_calculator

@staticmethod
def _combined_invalidating_fileset_from_globs(glob_strs, root):
return set.union(*(Fileset.globs(glob_str, root=root)() for glob_str in glob_strs))

def setup(self, lifecycle_lock, fork_lock):
"""Service setup."""
super(SchedulerService, self).setup(lifecycle_lock, fork_lock)
# Register filesystem event handlers on an FSEventService instance.
self._fs_event_service.register_all_files_handler(self._enqueue_fs_event)

# N.B. We compute this combined set eagerly at launch with an assumption that files
# that exist at startup are the only ones that can affect the running daemon.
if self._invalidation_globs:
self._invalidating_files = self._combined_invalidating_fileset_from_globs(
self._invalidation_globs,
self._build_root
)
self._logger.info('watching invalidating files: {}'.format(self._invalidating_files))

def _enqueue_fs_event(self, event):
"""Watchman filesystem event handler for BUILD/requirements.txt updates. Called via a thread."""
self._logger.info('enqueuing {} changes for subscription {}'
.format(len(event['files']), event['subscription']))
self._event_queue.put(event)

def _maybe_invalidate_scheduler(self, files):
invalidating_files = self._invalidating_files
if any(f in invalidating_files for f in files):
self._logger.fatal('saw file events covered by invalidation globs, terminating the daemon.')
self.terminate()

def _handle_batch_event(self, files):
self._logger.debug('handling change event for: %s', files)

with self.lifecycle_lock:
self._maybe_invalidate_scheduler(files)

with self.fork_lock:
self._scheduler.invalidate_files(files)

Expand Down
68 changes: 48 additions & 20 deletions tests/python/pants_test/pantsd/test_pantsd_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def _log(self):
'PantsDaemonMonitor: pid is {} is_alive={}'.format(self._pid, self.is_alive()))
)

def await_pantsd(self, timeout=3):
def assert_started(self, timeout=.1):
self._process = None
self._pid = self.await_pid(timeout)
self.assert_running()
Expand Down Expand Up @@ -64,6 +64,10 @@ def read_pantsd_log(workdir):
yield line.strip()


def full_pantsd_log(workdir):
return '\n'.join(read_pantsd_log(workdir))


def launch_file_toucher(f):
"""Launch a loop to touch the given file, and return a function to call to stop and join it."""
executor = ThreadPoolExecutor(max_workers=1)
Expand All @@ -85,21 +89,21 @@ def join():

class TestPantsDaemonIntegration(PantsRunIntegrationTest):
@contextmanager
def pantsd_test_context(self, log_level='info'):
def pantsd_test_context(self, log_level='info', extra_config=None):
with no_lingering_process_by_command('pantsd-runner'):
with self.temporary_workdir() as workdir_base:
pid_dir = os.path.join(workdir_base, '.pids')
workdir = os.path.join(workdir_base, '.workdir.pants.d')
print('\npantsd log is {}/pantsd/pantsd.log'.format(workdir))
pantsd_config = {
'GLOBAL': {
'GLOBAL': combined_dict({
'enable_pantsd': True,
# The absolute paths in CI can exceed the UNIX socket path limitation
# (>104-108 characters), so we override that here with a shorter path.
'watchman_socket_path': '/tmp/watchman.{}.sock'.format(os.getpid()),
'level': log_level,
'pants_subprocessdir': pid_dir
}
}, extra_config or {})
}
checker = PantsDaemonMonitor(pid_dir)
self.assert_success_runner(workdir, pantsd_config, ['kill-pantsd'])
Expand All @@ -114,8 +118,8 @@ def pantsd_test_context(self, log_level='info'):
checker.assert_stopped()

@contextmanager
def pantsd_successful_run_context(self, log_level='info'):
with self.pantsd_test_context(log_level) as (workdir, pantsd_config, checker):
def pantsd_successful_run_context(self, log_level='info', extra_config=None):
with self.pantsd_test_context(log_level, extra_config) as (workdir, pantsd_config, checker):
yield (
functools.partial(
self.assert_success_runner,
Expand Down Expand Up @@ -162,12 +166,12 @@ def test_pantsd_compile(self):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _):
# This tests a deeper pantsd-based run by actually invoking a full compile.
pantsd_run(['compile', 'examples/src/scala/org/pantsbuild/example/hello/welcome'])
checker.await_pantsd()
checker.assert_started()

def test_pantsd_run(self):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, workdir):
pantsd_run(['list', '3rdparty:'])
checker.await_pantsd()
checker.assert_started()

pantsd_run(['list', ':'])
checker.assert_running()
Expand All @@ -191,12 +195,12 @@ def test_pantsd_broken_pipe(self):
with self.pantsd_test_context() as (workdir, pantsd_config, checker):
run = self.run_pants_with_workdir('help | head -1', workdir, pantsd_config, shell=True)
self.assertNotIn('broken pipe', run.stderr_data.lower())
checker.await_pantsd()
checker.assert_started()

def test_pantsd_stacktrace_dump(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
pantsd_run(['help'])
checker.await_pantsd()
checker.assert_started()

os.kill(checker.pid, signal.SIGUSR2)

Expand All @@ -216,7 +220,7 @@ def test_pantsd_pantsd_runner_doesnt_die_after_failed_run(self):
workdir,
pantsd_config)
)
checker.await_pantsd()
checker.assert_started()

# Assert pantsd is in a good functional state.
self.assert_success(self.run_pants_with_workdir(['help'], workdir, pantsd_config))
Expand All @@ -235,7 +239,7 @@ def test_pantsd_lifecycle_invalidation(self):
for cmd in itertools.chain(*itertools.repeat(variants, 3)):
# Run with a CLI flag.
pantsd_run(['-l{}'.format(cmd[0]), cmd[1]])
next_pid = checker.await_pantsd()
next_pid = checker.assert_started()
if last_pid is not None:
self.assertNotEqual(last_pid, next_pid)
last_pid = next_pid
Expand All @@ -255,7 +259,7 @@ def test_pantsd_lifecycle_non_invalidation(self):
for cmd in itertools.chain(*itertools.repeat(variants, 3)):
# Run with a CLI flag.
pantsd_run(cmd)
next_pid = checker.await_pantsd()
next_pid = checker.assert_started()
if last_pid is not None:
self.assertEqual(last_pid, next_pid)
last_pid = next_pid
Expand All @@ -280,12 +284,12 @@ def test_pantsd_lifecycle_non_invalidation_on_config_string(self):
for cmd in itertools.chain(*itertools.repeat(variants, 2)):
pantsd_run(cmd)
if not pantsd_pid:
pantsd_pid = checker.await_pantsd()
pantsd_pid = checker.assert_started()
else:
checker.assert_running()

pantsd_run(['--pants-config-files={}'.format(invalidating_config), 'help'])
self.assertNotEqual(pantsd_pid, checker.await_pantsd())
self.assertNotEqual(pantsd_pid, checker.assert_started())

def test_pantsd_stray_runners(self):
# Allow env var overrides for local stress testing.
Expand All @@ -295,7 +299,7 @@ def test_pantsd_stray_runners(self):
with no_lingering_process_by_command('pantsd-runner'):
with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _):
pantsd_run(cmd)
checker.await_pantsd()
checker.assert_started()
for _ in range(attempts):
pantsd_run(cmd)
checker.assert_running()
Expand All @@ -317,7 +321,7 @@ def test_pantsd_aligned_output(self):

with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
daemon_runs = [pantsd_run(cmd) for cmd in cmds]
checker.await_pantsd()
checker.assert_started()

for cmd, run in zip(cmds, daemon_runs):
self.assertEqual(run.stderr_data.strip(), '', 'Non-empty stderr for {}'.format(cmd))
Expand All @@ -331,7 +335,7 @@ def test_pantsd_filesystem_invalidation(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
cmd = ['list', '::']
pantsd_run(cmd)
checker.await_pantsd()
checker.assert_started()

# Launch a separate thread to poke files in 3rdparty.
join = launch_file_toucher('3rdparty/BUILD')
Expand All @@ -348,7 +352,7 @@ def test_pantsd_client_env_var_is_inherited_by_pantsd_runner_children(self):
with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir):
# First, launch the daemon without any local env vars set.
pantsd_run(['help'])
checker.await_pantsd()
checker.assert_started()

# Then, set an env var on the secondary call.
with environment_as(TEST_ENV_VAR_FOR_PANTSD_INTEGRATION_TEST=EXPECTED_VALUE):
Expand All @@ -372,7 +376,7 @@ def test_pantsd_launch_env_var_is_not_inherited_by_pantsd_runner_children(self):
workdir,
pantsd_config)
)
checker.await_pantsd()
checker.assert_started()

self.assert_failure(
self.run_pants_with_workdir(
Expand All @@ -382,3 +386,27 @@ def test_pantsd_launch_env_var_is_not_inherited_by_pantsd_runner_children(self):
)
)
checker.assert_running()

def test_pantsd_invalidation_file_tracking(self):
test_file = 'testprojects/src/python/print_env/main.py'
config = {'pantsd_invalidation_globs': '["testprojects/src/python/print_env/*"]'}
with self.pantsd_successful_run_context(extra_config=config) as (pantsd_run, checker, workdir):
pantsd_run(['help'])
checker.assert_started()

# Let any fs events quiesce.
time.sleep(5)

# Check the logs.
self.assertRegexpMatches(
full_pantsd_log(workdir),
r'watching invalidating files:.*{}'.format(test_file)
)

checker.assert_running()
touch(test_file)
# Permit ample time for the async file event propagate in CI.
time.sleep(10)
checker.assert_stopped()

self.assertIn('saw file events covered by invalidation globs', full_pantsd_log(workdir))

0 comments on commit f2bee66

Please sign in to comment.