diff --git a/src/python/pants/option/global_options.py b/src/python/pants/option/global_options.py index 143e220a162..b4ba5753266 100644 --- a/src/python/pants/option/global_options.py +++ b/src/python/pants/option/global_options.py @@ -190,6 +190,8 @@ def register_bootstrap_options(cls, register): help='The directory to log pantsd output to.') register('--pantsd-fs-event-workers', advanced=True, type=int, default=4, help='The number of workers to use for the filesystem event service executor pool.') + register('--pantsd-invalidation-globs', advanced=True, type=list, fromfile=True, default=[], + help='Filesystem events matching any of these globs will trigger a daemon restart.') # Watchman options. register('--watchman-version', advanced=True, default='4.9.0-pants1', help='Watchman version.') diff --git a/src/python/pants/option/options_fingerprinter.py b/src/python/pants/option/options_fingerprinter.py index ec4cf6304a9..57e2b6367fe 100644 --- a/src/python/pants/option/options_fingerprinter.py +++ b/src/python/pants/option/options_fingerprinter.py @@ -14,8 +14,8 @@ from pants.base.build_environment import get_buildroot from pants.base.hash_utils import stable_json_hash -from pants.option.custom_types import (UnsetBool, dict_with_files_option, dir_option, - file_option, target_option) +from pants.option.custom_types import (UnsetBool, dict_with_files_option, dir_option, file_option, + target_option) class Encoder(json.JSONEncoder): diff --git a/src/python/pants/pantsd/pants_daemon.py b/src/python/pants/pantsd/pants_daemon.py index 455342bf77b..513f6c18b6a 100644 --- a/src/python/pants/pantsd/pants_daemon.py +++ b/src/python/pants/pantsd/pants_daemon.py @@ -129,8 +129,19 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_helper, watchman :returns: A tuple of (`tuple` service_instances, `dict` port_map). """ - fs_event_service = FSEventService(watchman, build_root, bootstrap_options.pantsd_fs_event_workers) - scheduler_service = SchedulerService(fs_event_service, legacy_graph_helper) + fs_event_service = FSEventService( + watchman, + build_root, + bootstrap_options.pantsd_fs_event_workers + ) + + scheduler_service = SchedulerService( + fs_event_service, + legacy_graph_helper, + build_root, + bootstrap_options.pantsd_invalidation_globs + ) + pailgun_service = PailgunService( bind_addr=(bootstrap_options.pantsd_pailgun_host, bootstrap_options.pantsd_pailgun_port), exiter_class=DaemonExiter, @@ -138,6 +149,7 @@ def _setup_services(build_root, bootstrap_options, legacy_graph_helper, watchman target_roots_class=TargetRoots, scheduler_service=scheduler_service ) + store_gc_service = StoreGCService(legacy_graph_helper.scheduler) return ( diff --git a/src/python/pants/pantsd/service/BUILD b/src/python/pants/pantsd/service/BUILD index 08346403e4f..4d8f7490f1d 100644 --- a/src/python/pants/pantsd/service/BUILD +++ b/src/python/pants/pantsd/service/BUILD @@ -32,7 +32,7 @@ python_library( name = 'scheduler_service', sources = ['scheduler_service.py'], dependencies = [ - '3rdparty/python:six', + '3rdparty/python/twitter/commons:twitter.common.dirutil', ':pants_service' ] ) diff --git a/src/python/pants/pantsd/service/scheduler_service.py b/src/python/pants/pantsd/service/scheduler_service.py index 7cabc8190b8..ad8675be2e6 100644 --- a/src/python/pants/pantsd/service/scheduler_service.py +++ b/src/python/pants/pantsd/service/scheduler_service.py @@ -9,6 +9,8 @@ import Queue import threading +from twitter.common.dirutil import Fileset + from pants.pantsd.service.pants_service import PantsService @@ -22,41 +24,70 @@ class SchedulerService(PantsService): QUEUE_SIZE = 64 - def __init__(self, fs_event_service, legacy_graph_helper): + def __init__(self, fs_event_service, legacy_graph_helper, build_root, invalidation_globs): """ :param FSEventService fs_event_service: An unstarted FSEventService instance for setting up filesystem event handlers. :param LegacyGraphHelper legacy_graph_helper: The LegacyGraphHelper instance for graph construction. + :param str build_root: The current build root. + :param list invalidation_globs: A list of `globs` that when encountered in filesystem event + subscriptions will tear down the daemon. """ super(SchedulerService, self).__init__() self._fs_event_service = fs_event_service self._graph_helper = legacy_graph_helper + self._invalidation_globs = invalidation_globs + self._build_root = build_root self._scheduler = legacy_graph_helper.scheduler self._logger = logging.getLogger(__name__) self._event_queue = Queue.Queue(maxsize=self.QUEUE_SIZE) self._watchman_is_running = threading.Event() + self._invalidating_files = set() @property def change_calculator(self): """Surfaces the change calculator.""" return self._graph_helper.change_calculator + @staticmethod + def _combined_invalidating_fileset_from_globs(glob_strs, root): + return set.union(*(Fileset.globs(glob_str, root=root)() for glob_str in glob_strs)) + def setup(self, lifecycle_lock, fork_lock): """Service setup.""" super(SchedulerService, self).setup(lifecycle_lock, fork_lock) # Register filesystem event handlers on an FSEventService instance. self._fs_event_service.register_all_files_handler(self._enqueue_fs_event) + # N.B. We compute this combined set eagerly at launch with an assumption that files + # that exist at startup are the only ones that can affect the running daemon. + if self._invalidation_globs: + self._invalidating_files = self._combined_invalidating_fileset_from_globs( + self._invalidation_globs, + self._build_root + ) + self._logger.info('watching invalidating files: {}'.format(self._invalidating_files)) + def _enqueue_fs_event(self, event): """Watchman filesystem event handler for BUILD/requirements.txt updates. Called via a thread.""" self._logger.info('enqueuing {} changes for subscription {}' .format(len(event['files']), event['subscription'])) self._event_queue.put(event) + def _maybe_invalidate_scheduler(self, files): + invalidating_files = self._invalidating_files + if any(f in invalidating_files for f in files): + self._logger.fatal('saw file events covered by invalidation globs, terminating the daemon.') + self.terminate() + def _handle_batch_event(self, files): self._logger.debug('handling change event for: %s', files) + + with self.lifecycle_lock: + self._maybe_invalidate_scheduler(files) + with self.fork_lock: self._scheduler.invalidate_files(files) diff --git a/tests/python/pants_test/pantsd/test_pantsd_integration.py b/tests/python/pants_test/pantsd/test_pantsd_integration.py index b4c761f22b8..6d7a255e3ae 100644 --- a/tests/python/pants_test/pantsd/test_pantsd_integration.py +++ b/tests/python/pants_test/pantsd/test_pantsd_integration.py @@ -33,7 +33,7 @@ def _log(self): 'PantsDaemonMonitor: pid is {} is_alive={}'.format(self._pid, self.is_alive())) ) - def await_pantsd(self, timeout=3): + def assert_started(self, timeout=.1): self._process = None self._pid = self.await_pid(timeout) self.assert_running() @@ -64,6 +64,10 @@ def read_pantsd_log(workdir): yield line.strip() +def full_pantsd_log(workdir): + return '\n'.join(read_pantsd_log(workdir)) + + def launch_file_toucher(f): """Launch a loop to touch the given file, and return a function to call to stop and join it.""" executor = ThreadPoolExecutor(max_workers=1) @@ -85,21 +89,21 @@ def join(): class TestPantsDaemonIntegration(PantsRunIntegrationTest): @contextmanager - def pantsd_test_context(self, log_level='info'): + def pantsd_test_context(self, log_level='info', extra_config=None): with no_lingering_process_by_command('pantsd-runner'): with self.temporary_workdir() as workdir_base: pid_dir = os.path.join(workdir_base, '.pids') workdir = os.path.join(workdir_base, '.workdir.pants.d') print('\npantsd log is {}/pantsd/pantsd.log'.format(workdir)) pantsd_config = { - 'GLOBAL': { + 'GLOBAL': combined_dict({ 'enable_pantsd': True, # The absolute paths in CI can exceed the UNIX socket path limitation # (>104-108 characters), so we override that here with a shorter path. 'watchman_socket_path': '/tmp/watchman.{}.sock'.format(os.getpid()), 'level': log_level, 'pants_subprocessdir': pid_dir - } + }, extra_config or {}) } checker = PantsDaemonMonitor(pid_dir) self.assert_success_runner(workdir, pantsd_config, ['kill-pantsd']) @@ -114,8 +118,8 @@ def pantsd_test_context(self, log_level='info'): checker.assert_stopped() @contextmanager - def pantsd_successful_run_context(self, log_level='info'): - with self.pantsd_test_context(log_level) as (workdir, pantsd_config, checker): + def pantsd_successful_run_context(self, log_level='info', extra_config=None): + with self.pantsd_test_context(log_level, extra_config) as (workdir, pantsd_config, checker): yield ( functools.partial( self.assert_success_runner, @@ -162,12 +166,12 @@ def test_pantsd_compile(self): with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _): # This tests a deeper pantsd-based run by actually invoking a full compile. pantsd_run(['compile', 'examples/src/scala/org/pantsbuild/example/hello/welcome']) - checker.await_pantsd() + checker.assert_started() def test_pantsd_run(self): with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, workdir): pantsd_run(['list', '3rdparty:']) - checker.await_pantsd() + checker.assert_started() pantsd_run(['list', ':']) checker.assert_running() @@ -191,12 +195,12 @@ def test_pantsd_broken_pipe(self): with self.pantsd_test_context() as (workdir, pantsd_config, checker): run = self.run_pants_with_workdir('help | head -1', workdir, pantsd_config, shell=True) self.assertNotIn('broken pipe', run.stderr_data.lower()) - checker.await_pantsd() + checker.assert_started() def test_pantsd_stacktrace_dump(self): with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir): pantsd_run(['help']) - checker.await_pantsd() + checker.assert_started() os.kill(checker.pid, signal.SIGUSR2) @@ -216,7 +220,7 @@ def test_pantsd_pantsd_runner_doesnt_die_after_failed_run(self): workdir, pantsd_config) ) - checker.await_pantsd() + checker.assert_started() # Assert pantsd is in a good functional state. self.assert_success(self.run_pants_with_workdir(['help'], workdir, pantsd_config)) @@ -235,7 +239,7 @@ def test_pantsd_lifecycle_invalidation(self): for cmd in itertools.chain(*itertools.repeat(variants, 3)): # Run with a CLI flag. pantsd_run(['-l{}'.format(cmd[0]), cmd[1]]) - next_pid = checker.await_pantsd() + next_pid = checker.assert_started() if last_pid is not None: self.assertNotEqual(last_pid, next_pid) last_pid = next_pid @@ -255,7 +259,7 @@ def test_pantsd_lifecycle_non_invalidation(self): for cmd in itertools.chain(*itertools.repeat(variants, 3)): # Run with a CLI flag. pantsd_run(cmd) - next_pid = checker.await_pantsd() + next_pid = checker.assert_started() if last_pid is not None: self.assertEqual(last_pid, next_pid) last_pid = next_pid @@ -280,12 +284,12 @@ def test_pantsd_lifecycle_non_invalidation_on_config_string(self): for cmd in itertools.chain(*itertools.repeat(variants, 2)): pantsd_run(cmd) if not pantsd_pid: - pantsd_pid = checker.await_pantsd() + pantsd_pid = checker.assert_started() else: checker.assert_running() pantsd_run(['--pants-config-files={}'.format(invalidating_config), 'help']) - self.assertNotEqual(pantsd_pid, checker.await_pantsd()) + self.assertNotEqual(pantsd_pid, checker.assert_started()) def test_pantsd_stray_runners(self): # Allow env var overrides for local stress testing. @@ -295,7 +299,7 @@ def test_pantsd_stray_runners(self): with no_lingering_process_by_command('pantsd-runner'): with self.pantsd_successful_run_context('debug') as (pantsd_run, checker, _): pantsd_run(cmd) - checker.await_pantsd() + checker.assert_started() for _ in range(attempts): pantsd_run(cmd) checker.assert_running() @@ -317,7 +321,7 @@ def test_pantsd_aligned_output(self): with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir): daemon_runs = [pantsd_run(cmd) for cmd in cmds] - checker.await_pantsd() + checker.assert_started() for cmd, run in zip(cmds, daemon_runs): self.assertEqual(run.stderr_data.strip(), '', 'Non-empty stderr for {}'.format(cmd)) @@ -331,7 +335,7 @@ def test_pantsd_filesystem_invalidation(self): with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir): cmd = ['list', '::'] pantsd_run(cmd) - checker.await_pantsd() + checker.assert_started() # Launch a separate thread to poke files in 3rdparty. join = launch_file_toucher('3rdparty/BUILD') @@ -348,7 +352,7 @@ def test_pantsd_client_env_var_is_inherited_by_pantsd_runner_children(self): with self.pantsd_successful_run_context() as (pantsd_run, checker, workdir): # First, launch the daemon without any local env vars set. pantsd_run(['help']) - checker.await_pantsd() + checker.assert_started() # Then, set an env var on the secondary call. with environment_as(TEST_ENV_VAR_FOR_PANTSD_INTEGRATION_TEST=EXPECTED_VALUE): @@ -372,7 +376,7 @@ def test_pantsd_launch_env_var_is_not_inherited_by_pantsd_runner_children(self): workdir, pantsd_config) ) - checker.await_pantsd() + checker.assert_started() self.assert_failure( self.run_pants_with_workdir( @@ -382,3 +386,27 @@ def test_pantsd_launch_env_var_is_not_inherited_by_pantsd_runner_children(self): ) ) checker.assert_running() + + def test_pantsd_invalidation_file_tracking(self): + test_file = 'testprojects/src/python/print_env/main.py' + config = {'pantsd_invalidation_globs': '["testprojects/src/python/print_env/*"]'} + with self.pantsd_successful_run_context(extra_config=config) as (pantsd_run, checker, workdir): + pantsd_run(['help']) + checker.assert_started() + + # Let any fs events quiesce. + time.sleep(5) + + # Check the logs. + self.assertRegexpMatches( + full_pantsd_log(workdir), + r'watching invalidating files:.*{}'.format(test_file) + ) + + checker.assert_running() + touch(test_file) + # Permit ample time for the async file event propagate in CI. + time.sleep(10) + checker.assert_stopped() + + self.assertIn('saw file events covered by invalidation globs', full_pantsd_log(workdir))