From e514965f7eab8d7374a9e6d3e154bf6cd9300230 Mon Sep 17 00:00:00 2001 From: Reid Kleckner Date: Wed, 5 Apr 2017 16:44:56 +0000 Subject: [PATCH] [lit] Use process pools for test execution by default Summary: This drastically reduces lit test execution startup time on Windows. Our previous strategy was to manually create one Process per job and manage the worker pool ourselves. Instead, let's use the worker pool provided by multiprocessing. multiprocessing.Pool(jobs) returns almost immediately, and initializes the appropriate number of workers, so they can all start executing tests immediately. This avoids the ramp-up period that the old implementation suffers from. This appears to speed up small test runs. Here are some timings of the llvm-readobj tests on Windows using the various execution strategies: # multiprocessing.Pool: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-process-pool |& grep real: ; done real: 0m1.156s real: 0m1.078s real: 0m1.094s # multiprocessing.Process: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-processes |& grep real: ; done real: 0m6.062s real: 0m5.860s real: 0m5.984s # threading.Thread: $ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-threads |& grep real: ; done real: 0m9.438s real: 0m10.765s real: 0m11.079s I kept the old code to launch processes in case this change doesn't work on all platforms that LLVM supports, but at some point I would like to remove both the threading and old multiprocessing execution strategies. Reviewers: modocache, rafael Subscribers: llvm-commits Differential Revision: https://reviews.llvm.org/D31677 git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@299560 91177308-0d34-0410-b5e6-96231b3b80d8 --- utils/lit/lit/main.py | 16 ++-- utils/lit/lit/run.py | 199 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 173 insertions(+), 42 deletions(-) diff --git a/utils/lit/lit/main.py b/utils/lit/lit/main.py index 95032c6931ac..689a2d55bcea 100755 --- a/utils/lit/lit/main.py +++ b/utils/lit/lit/main.py @@ -278,12 +278,15 @@ def main_with_tmp(builtinParameters): debug_group.add_argument("--show-tests", dest="showTests", help="Show all discovered tests", action="store_true", default=False) - debug_group.add_argument("--use-processes", dest="useProcesses", + debug_group.add_argument("--use-process-pool", dest="executionStrategy", + help="Run tests in parallel with a process pool", + action="store_const", const="PROCESS_POOL") + debug_group.add_argument("--use-processes", dest="executionStrategy", help="Run tests in parallel with processes (not threads)", - action="store_true", default=True) - debug_group.add_argument("--use-threads", dest="useProcesses", + action="store_const", const="PROCESSES") + debug_group.add_argument("--use-threads", dest="executionStrategy", help="Run tests in parallel with threads (not processes)", - action="store_false", default=True) + action="store_const", const="THREADS") opts = parser.parse_args() args = opts.test_paths @@ -298,6 +301,9 @@ def main_with_tmp(builtinParameters): if opts.numThreads is None: opts.numThreads = lit.util.detectCPUs() + if opts.executionStrategy is None: + opts.executionStrategy = 'PROCESS_POOL' + if opts.maxFailures == 0: parser.error("Setting --max-failures to 0 does not have any effect.") @@ -481,7 +487,7 @@ def main_with_tmp(builtinParameters): display = TestingProgressDisplay(opts, len(run.tests), progressBar) try: run.execute_tests(display, opts.numThreads, opts.maxTime, - opts.useProcesses) + opts.executionStrategy) except KeyboardInterrupt: sys.exit(2) display.finish() diff --git a/utils/lit/lit/run.py b/utils/lit/lit/run.py index 2be8a1133b94..10f21c7b2e8a 100644 --- a/utils/lit/lit/run.py +++ b/utils/lit/lit/run.py @@ -1,4 +1,5 @@ import os +import sys import threading import time import traceback @@ -84,11 +85,13 @@ def run(self): def run_test(self, test_index): test = self.run_instance.tests[test_index] try: - self.run_instance.execute_test(test) + execute_test(test, self.run_instance.lit_config, + self.run_instance.parallelism_semaphores) except KeyboardInterrupt: # This is a sad hack. Unfortunately subprocess goes # bonkers with ctrl-c and we start forking merrily. print('\nCtrl-C detected, goodbye.') + sys.stdout.flush() os.kill(0,9) self.consumer.update(test_index, test) @@ -167,6 +170,44 @@ def update(self, test): def handleFailures(provider, consumer, maxFailures): consumer.display = _Display(consumer.display, provider, maxFailures) +def execute_test(test, lit_config, parallelism_semaphores): + """Execute one test""" + pg = test.config.parallelism_group + if callable(pg): + pg = pg(test) + + result = None + semaphore = None + try: + if pg: + semaphore = parallelism_semaphores[pg] + if semaphore: + semaphore.acquire() + start_time = time.time() + result = test.config.test_format.execute(test, lit_config) + # Support deprecated result from execute() which returned the result + # code and additional output as a tuple. + if isinstance(result, tuple): + code, output = result + result = lit.Test.Result(code, output) + elif not isinstance(result, lit.Test.Result): + raise ValueError("unexpected result from test execution") + result.elapsed = time.time() - start_time + except KeyboardInterrupt: + raise + except: + if lit_config.debug: + raise + output = 'Exception during script execution:\n' + output += traceback.format_exc() + output += '\n' + result = lit.Test.Result(lit.Test.UNRESOLVED, output) + finally: + if semaphore: + semaphore.release() + + test.setResult(result) + class Run(object): """ This class represents a concrete, configured testing run. @@ -177,42 +218,10 @@ def __init__(self, lit_config, tests): self.tests = tests def execute_test(self, test): - pg = test.config.parallelism_group - if callable(pg): pg = pg(test) - - result = None - semaphore = None - try: - if pg: semaphore = self.parallelism_semaphores[pg] - if semaphore: semaphore.acquire() - start_time = time.time() - result = test.config.test_format.execute(test, self.lit_config) - - # Support deprecated result from execute() which returned the result - # code and additional output as a tuple. - if isinstance(result, tuple): - code, output = result - result = lit.Test.Result(code, output) - elif not isinstance(result, lit.Test.Result): - raise ValueError("unexpected result from test execution") - - result.elapsed = time.time() - start_time - except KeyboardInterrupt: - raise - except: - if self.lit_config.debug: - raise - output = 'Exception during script execution:\n' - output += traceback.format_exc() - output += '\n' - result = lit.Test.Result(lit.Test.UNRESOLVED, output) - finally: - if semaphore: semaphore.release() - - test.setResult(result) + return execute_test(test, self.lit_config, self.parallelism_semaphores) def execute_tests(self, display, jobs, max_time=None, - use_processes=False): + execution_strategy=None): """ execute_tests(display, jobs, [max_time]) @@ -234,6 +243,14 @@ def execute_tests(self, display, jobs, max_time=None, be given an UNRESOLVED result. """ + if execution_strategy == 'PROCESS_POOL': + self.execute_tests_with_mp_pool(display, jobs, max_time) + return + # FIXME: Standardize on the PROCESS_POOL execution strategy and remove + # the other two strategies. + + use_processes = execution_strategy == 'PROCESSES' + # Choose the appropriate parallel execution implementation. consumer = None if jobs != 1 and use_processes and multiprocessing: @@ -263,8 +280,8 @@ def execute_tests(self, display, jobs, max_time=None, provider = TestProvider(queue_impl, canceled_flag) handleFailures(provider, consumer, self.lit_config.maxFailures) - # Queue the tests outside the main thread because we can't guarantee - # that we can put() all the tests without blocking: + # Putting tasks into the threading or multiprocessing Queue may block, + # so do it in a separate thread. # https://docs.python.org/2/library/multiprocessing.html # e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue # without taking any out. @@ -317,3 +334,111 @@ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs): # Wait for all the tasks to complete. for t in tasks: t.join() + + def execute_tests_with_mp_pool(self, display, jobs, max_time=None): + # Don't do anything if we aren't going to run any tests. + if not self.tests or jobs == 0: + return + + # Set up semaphores to limit parallelism of certain classes of tests. + # For example, some ASan tests require lots of virtual memory and run + # faster with less parallelism on OS X. + self.parallelism_semaphores = \ + {k: multiprocessing.Semaphore(v) for k, v in + self.lit_config.parallelism_groups.items()} + + # Save the display object on the runner so that we can update it from + # our task completion callback. + self.display = display + + # Start a process pool. Copy over the data shared between all test runs. + pool = multiprocessing.Pool(jobs, worker_initializer, + (self.lit_config, + self.parallelism_semaphores)) + + # Install a console-control signal handler on Windows. + if win32api is not None: + def console_ctrl_handler(type): + print "Ctr-C received, terminating" + pool.terminate() + pool.join() + os.kill(0,9) + return True + win32api.SetConsoleCtrlHandler(console_ctrl_handler, True) + + # FIXME: Implement max_time using .wait() timeout argument and a + # deadline. + + try: + async_results = [pool.apply_async(worker_run_one_test, + args=(test_index, test), + callback=self.consume_test_result) + for test_index, test in enumerate(self.tests)] + + # Wait for all results to come in. The callback that runs in the + # parent process will update the display. + for a in async_results: + a.wait() + if not a.successful(): + a.get() # Exceptions raised here come from the worker. + finally: + pool.terminate() + pool.join() + + # Mark any tests that weren't run as UNRESOLVED. + for test in self.tests: + if test.result is None: + test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0)) + + def consume_test_result(self, pool_result): + """Test completion callback for worker_run_one_test + + Updates the test result status in the parent process. Each task in the + pool returns the test index and the result, and we use the index to look + up the original test object. Also updates the progress bar as tasks + complete. + """ + (test_index, test_with_result) = pool_result + # Update the parent process copy of the test. This includes the result, + # XFAILS, REQUIRES, and UNSUPPORTED statuses. + assert self.tests[test_index].file_path == test_with_result.file_path, \ + "parent and child disagree on test path" + self.tests[test_index] = test_with_result + self.display.update(test_with_result) + +child_lit_config = None +child_parallelism_semaphores = None + +def worker_initializer(lit_config, parallelism_semaphores): + """Copy expensive repeated data into worker processes""" + global child_lit_config + child_lit_config = lit_config + global child_parallelism_semaphores + child_parallelism_semaphores = parallelism_semaphores + +def worker_run_one_test(test_index, test): + """Run one test in a multiprocessing.Pool + + Side effects in this function and functions it calls are not visible in the + main lit process. + + Arguments and results of this function are pickled, so they should be cheap + to copy. For efficiency, we copy all data needed to execute all tests into + each worker and store it in the child_* global variables. This reduces the + cost of each task. + + Returns an index and a Result, which the parent process uses to update + the display. + """ + try: + execute_test(test, child_lit_config, child_parallelism_semaphores) + return (test_index, test) + except KeyboardInterrupt as e: + # This is a sad hack. Unfortunately subprocess goes + # bonkers with ctrl-c and we start forking merrily. + print('\nCtrl-C detected, goodbye.') + traceback.print_exc() + sys.stdout.flush() + os.kill(0,9) + except: + traceback.print_exc()