Skip to content

Commit

Permalink
[lit] Use process pools for test execution by default
Browse files Browse the repository at this point in the history
Summary:
This drastically reduces lit test execution startup time on Windows. Our
previous strategy was to manually create one Process per job and manage
the worker pool ourselves. Instead, let's use the worker pool provided
by multiprocessing.  multiprocessing.Pool(jobs) returns almost
immediately, and initializes the appropriate number of workers, so they
can all start executing tests immediately. This avoids the ramp-up
period that the old implementation suffers from.  This appears to speed
up small test runs.

Here are some timings of the llvm-readobj tests on Windows using the
various execution strategies:

 # multiprocessing.Pool:
$ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-process-pool |& grep real: ; done
real: 0m1.156s
real: 0m1.078s
real: 0m1.094s

 # multiprocessing.Process:
$ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-processes |& grep real: ; done
real: 0m6.062s
real: 0m5.860s
real: 0m5.984s

 # threading.Thread:
$ for i in `seq 1 3`; do tim python ./bin/llvm-lit.py -sv ../llvm/test/tools/llvm-readobj/ --use-threads |& grep real: ; done
real: 0m9.438s
real: 0m10.765s
real: 0m11.079s

I kept the old code to launch processes in case this change doesn't work
on all platforms that LLVM supports, but at some point I would like to
remove both the threading and old multiprocessing execution strategies.

Reviewers: modocache, rafael

Subscribers: llvm-commits

Differential Revision: https://reviews.llvm.org/D31677

git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@299560 91177308-0d34-0410-b5e6-96231b3b80d8
  • Loading branch information
rnk committed Apr 5, 2017
1 parent 9331ebe commit e514965
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 42 deletions.
16 changes: 11 additions & 5 deletions utils/lit/lit/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,15 @@ def main_with_tmp(builtinParameters):
debug_group.add_argument("--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
debug_group.add_argument("--use-processes", dest="useProcesses",
debug_group.add_argument("--use-process-pool", dest="executionStrategy",
help="Run tests in parallel with a process pool",
action="store_const", const="PROCESS_POOL")
debug_group.add_argument("--use-processes", dest="executionStrategy",
help="Run tests in parallel with processes (not threads)",
action="store_true", default=True)
debug_group.add_argument("--use-threads", dest="useProcesses",
action="store_const", const="PROCESSES")
debug_group.add_argument("--use-threads", dest="executionStrategy",
help="Run tests in parallel with threads (not processes)",
action="store_false", default=True)
action="store_const", const="THREADS")

opts = parser.parse_args()
args = opts.test_paths
Expand All @@ -298,6 +301,9 @@ def main_with_tmp(builtinParameters):
if opts.numThreads is None:
opts.numThreads = lit.util.detectCPUs()

if opts.executionStrategy is None:
opts.executionStrategy = 'PROCESS_POOL'

if opts.maxFailures == 0:
parser.error("Setting --max-failures to 0 does not have any effect.")

Expand Down Expand Up @@ -481,7 +487,7 @@ def main_with_tmp(builtinParameters):
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
run.execute_tests(display, opts.numThreads, opts.maxTime,
opts.useProcesses)
opts.executionStrategy)
except KeyboardInterrupt:
sys.exit(2)
display.finish()
Expand Down
199 changes: 162 additions & 37 deletions utils/lit/lit/run.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import threading
import time
import traceback
Expand Down Expand Up @@ -84,11 +85,13 @@ def run(self):
def run_test(self, test_index):
test = self.run_instance.tests[test_index]
try:
self.run_instance.execute_test(test)
execute_test(test, self.run_instance.lit_config,
self.run_instance.parallelism_semaphores)
except KeyboardInterrupt:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print('\nCtrl-C detected, goodbye.')
sys.stdout.flush()
os.kill(0,9)
self.consumer.update(test_index, test)

Expand Down Expand Up @@ -167,6 +170,44 @@ def update(self, test):
def handleFailures(provider, consumer, maxFailures):
consumer.display = _Display(consumer.display, provider, maxFailures)

def execute_test(test, lit_config, parallelism_semaphores):
"""Execute one test"""
pg = test.config.parallelism_group
if callable(pg):
pg = pg(test)

result = None
semaphore = None
try:
if pg:
semaphore = parallelism_semaphores[pg]
if semaphore:
semaphore.acquire()
start_time = time.time()
result = test.config.test_format.execute(test, lit_config)
# Support deprecated result from execute() which returned the result
# code and additional output as a tuple.
if isinstance(result, tuple):
code, output = result
result = lit.Test.Result(code, output)
elif not isinstance(result, lit.Test.Result):
raise ValueError("unexpected result from test execution")
result.elapsed = time.time() - start_time
except KeyboardInterrupt:
raise
except:
if lit_config.debug:
raise
output = 'Exception during script execution:\n'
output += traceback.format_exc()
output += '\n'
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
finally:
if semaphore:
semaphore.release()

test.setResult(result)

class Run(object):
"""
This class represents a concrete, configured testing run.
Expand All @@ -177,42 +218,10 @@ def __init__(self, lit_config, tests):
self.tests = tests

def execute_test(self, test):
pg = test.config.parallelism_group
if callable(pg): pg = pg(test)

result = None
semaphore = None
try:
if pg: semaphore = self.parallelism_semaphores[pg]
if semaphore: semaphore.acquire()
start_time = time.time()
result = test.config.test_format.execute(test, self.lit_config)

# Support deprecated result from execute() which returned the result
# code and additional output as a tuple.
if isinstance(result, tuple):
code, output = result
result = lit.Test.Result(code, output)
elif not isinstance(result, lit.Test.Result):
raise ValueError("unexpected result from test execution")

result.elapsed = time.time() - start_time
except KeyboardInterrupt:
raise
except:
if self.lit_config.debug:
raise
output = 'Exception during script execution:\n'
output += traceback.format_exc()
output += '\n'
result = lit.Test.Result(lit.Test.UNRESOLVED, output)
finally:
if semaphore: semaphore.release()

test.setResult(result)
return execute_test(test, self.lit_config, self.parallelism_semaphores)

def execute_tests(self, display, jobs, max_time=None,
use_processes=False):
execution_strategy=None):
"""
execute_tests(display, jobs, [max_time])
Expand All @@ -234,6 +243,14 @@ def execute_tests(self, display, jobs, max_time=None,
be given an UNRESOLVED result.
"""

if execution_strategy == 'PROCESS_POOL':
self.execute_tests_with_mp_pool(display, jobs, max_time)
return
# FIXME: Standardize on the PROCESS_POOL execution strategy and remove
# the other two strategies.

use_processes = execution_strategy == 'PROCESSES'

# Choose the appropriate parallel execution implementation.
consumer = None
if jobs != 1 and use_processes and multiprocessing:
Expand Down Expand Up @@ -263,8 +280,8 @@ def execute_tests(self, display, jobs, max_time=None,
provider = TestProvider(queue_impl, canceled_flag)
handleFailures(provider, consumer, self.lit_config.maxFailures)

# Queue the tests outside the main thread because we can't guarantee
# that we can put() all the tests without blocking:
# Putting tasks into the threading or multiprocessing Queue may block,
# so do it in a separate thread.
# https://docs.python.org/2/library/multiprocessing.html
# e.g: On Mac OS X, we will hang if we put 2^15 elements in the queue
# without taking any out.
Expand Down Expand Up @@ -317,3 +334,111 @@ def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
# Wait for all the tasks to complete.
for t in tasks:
t.join()

def execute_tests_with_mp_pool(self, display, jobs, max_time=None):
# Don't do anything if we aren't going to run any tests.
if not self.tests or jobs == 0:
return

# Set up semaphores to limit parallelism of certain classes of tests.
# For example, some ASan tests require lots of virtual memory and run
# faster with less parallelism on OS X.
self.parallelism_semaphores = \
{k: multiprocessing.Semaphore(v) for k, v in
self.lit_config.parallelism_groups.items()}

# Save the display object on the runner so that we can update it from
# our task completion callback.
self.display = display

# Start a process pool. Copy over the data shared between all test runs.
pool = multiprocessing.Pool(jobs, worker_initializer,
(self.lit_config,
self.parallelism_semaphores))

# Install a console-control signal handler on Windows.
if win32api is not None:
def console_ctrl_handler(type):
print "Ctr-C received, terminating"
pool.terminate()
pool.join()
os.kill(0,9)
return True
win32api.SetConsoleCtrlHandler(console_ctrl_handler, True)

# FIXME: Implement max_time using .wait() timeout argument and a
# deadline.

try:
async_results = [pool.apply_async(worker_run_one_test,
args=(test_index, test),
callback=self.consume_test_result)
for test_index, test in enumerate(self.tests)]

# Wait for all results to come in. The callback that runs in the
# parent process will update the display.
for a in async_results:
a.wait()
if not a.successful():
a.get() # Exceptions raised here come from the worker.
finally:
pool.terminate()
pool.join()

# Mark any tests that weren't run as UNRESOLVED.
for test in self.tests:
if test.result is None:
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))

def consume_test_result(self, pool_result):
"""Test completion callback for worker_run_one_test
Updates the test result status in the parent process. Each task in the
pool returns the test index and the result, and we use the index to look
up the original test object. Also updates the progress bar as tasks
complete.
"""
(test_index, test_with_result) = pool_result
# Update the parent process copy of the test. This includes the result,
# XFAILS, REQUIRES, and UNSUPPORTED statuses.
assert self.tests[test_index].file_path == test_with_result.file_path, \
"parent and child disagree on test path"
self.tests[test_index] = test_with_result
self.display.update(test_with_result)

child_lit_config = None
child_parallelism_semaphores = None

def worker_initializer(lit_config, parallelism_semaphores):
"""Copy expensive repeated data into worker processes"""
global child_lit_config
child_lit_config = lit_config
global child_parallelism_semaphores
child_parallelism_semaphores = parallelism_semaphores

def worker_run_one_test(test_index, test):
"""Run one test in a multiprocessing.Pool
Side effects in this function and functions it calls are not visible in the
main lit process.
Arguments and results of this function are pickled, so they should be cheap
to copy. For efficiency, we copy all data needed to execute all tests into
each worker and store it in the child_* global variables. This reduces the
cost of each task.
Returns an index and a Result, which the parent process uses to update
the display.
"""
try:
execute_test(test, child_lit_config, child_parallelism_semaphores)
return (test_index, test)
except KeyboardInterrupt as e:
# This is a sad hack. Unfortunately subprocess goes
# bonkers with ctrl-c and we start forking merrily.
print('\nCtrl-C detected, goodbye.')
traceback.print_exc()
sys.stdout.flush()
os.kill(0,9)
except:
traceback.print_exc()

0 comments on commit e514965

Please sign in to comment.