Skip to content

Commit

Permalink
[lit] Add support for multiprocessing, under --use-processes for now.
Browse files Browse the repository at this point in the history
git-svn-id: https://llvm.org/svn/llvm-project/llvm/trunk@189556 91177308-0d34-0410-b5e6-96231b3b80d8
  • Loading branch information
ddunbar committed Aug 29, 2013
1 parent df44de6 commit 4ac723b
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 39 deletions.
6 changes: 2 additions & 4 deletions utils/lit/TODO
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ Infrastructure
module. This is currently blocked on:

* The external execution mode is faster in some situations, because it avoids
being bottlenecked on the GIL. We could fix this by moving to a good
multiprocessing model.
being bottlenecked on the GIL. This can hopefully be obviated simply by
using --use-processes.

* Some tests in LLVM/Clang are explicitly disabled with the internal shell
(because they use features specific to bash). We would need to rewrite these
Expand Down Expand Up @@ -158,8 +158,6 @@ Miscellaneous

* Add --show-unsupported, don't show by default?

* Optionally use multiprocessing.

* Support valgrind in all configs, and LLVM style valgrind.

* Support a timeout / ulimit.
Expand Down
9 changes: 8 additions & 1 deletion utils/lit/lit/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,12 @@ def main(builtinParameters = {}):
group.add_option("", "--show-tests", dest="showTests",
help="Show all discovered tests",
action="store_true", default=False)
group.add_option("", "--use-processes", dest="useProcesses",
help="Run tests in parallel with processes (not threads)",
action="store_true", default=False)
group.add_option("", "--use-threads", dest="useProcesses",
help="Run tests in parallel with threads (not processes)",
action="store_false", default=False)
parser.add_option_group(group)

(opts, args) = parser.parse_args()
Expand Down Expand Up @@ -264,7 +270,8 @@ def main(builtinParameters = {}):
startTime = time.time()
display = TestingProgressDisplay(opts, len(run.tests), progressBar)
try:
run.execute_tests(display, opts.numThreads, opts.maxTime)
run.execute_tests(display, opts.numThreads, opts.maxTime,
opts.useProcesses)
except KeyboardInterrupt:
sys.exit(2)
display.finish()
Expand Down
139 changes: 105 additions & 34 deletions utils/lit/lit/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,68 @@
import threading
import time
import traceback
try:
import Queue as queue
except ImportError:
import queue

try:
import win32api
except ImportError:
win32api = None

try:
import multiprocessing
except ImportError:
multiprocessing = None

import lit.Test

###
# Test Execution Implementation

class TestProvider(object):
def __init__(self, tests):
self.iter = iter(range(len(tests)))
class LockedValue(object):
def __init__(self, value):
self.lock = threading.Lock()
self.canceled = False
self._value = value

def cancel(self):
def _get_value(self):
self.lock.acquire()
self.canceled = True
self.lock.release()
try:
return self._value
finally:
self.lock.release()

def get(self):
# Check if we are cancelled.
def _set_value(self, value):
self.lock.acquire()
if self.canceled:
self.lock.release()
try:
self._value = value
finally:
self.lock.release()

value = property(_get_value, _set_value)

class TestProvider(object):
def __init__(self, tests, num_jobs, queue_impl, canceled_flag):
self.canceled_flag = canceled_flag

# Create a shared queue to provide the test indices.
self.queue = queue_impl()
for i in range(len(tests)):
self.queue.put(i)
for i in range(num_jobs):
self.queue.put(None)

def cancel(self):
self.canceled_flag.value = 1

def get(self):
# Check if we are canceled.
if self.canceled_flag.value:
return None

# Otherwise take the next test.
for item in self.iter:
break
else:
item = None
self.lock.release()
return item
return self.queue.get()

class Tester(object):
def __init__(self, run_instance, provider, consumer):
Expand All @@ -46,7 +72,7 @@ def __init__(self, run_instance, provider, consumer):
self.consumer = consumer

def run(self):
while 1:
while True:
item = self.provider.get()
if item is None:
break
Expand Down Expand Up @@ -82,6 +108,42 @@ def task_finished(self):
def handle_results(self):
pass

class MultiprocessResultsConsumer(object):
def __init__(self, run, display, num_jobs):
self.run = run
self.display = display
self.num_jobs = num_jobs
self.queue = multiprocessing.Queue()

def update(self, test_index, test):
# This method is called in the child processes, and communicates the
# results to the actual display implementation via an output queue.
self.queue.put((test_index, test.result))

def task_finished(self):
# This method is called in the child processes, and communicates that
# individual tasks are complete.
self.queue.put(None)

def handle_results(self):
# This method is called in the parent, and consumes the results from the
# output queue and dispatches to the actual display. The method will
# complete after each of num_jobs tasks has signalled completion.
completed = 0
while completed != self.num_jobs:
# Wait for a result item.
item = self.queue.get()
if item is None:
completed += 1
continue

# Update the test result in the parent process.
index,result = item
test = self.run.tests[index]
test.result = result

self.display.update(test)

def run_one_tester(run, provider, display):
tester = Tester(run, provider, display)
tester.run()
Expand Down Expand Up @@ -123,7 +185,8 @@ def execute_test(self, test):

test.setResult(result)

def execute_tests(self, display, jobs, max_time=None):
def execute_tests(self, display, jobs, max_time=None,
use_processes=False):
"""
execute_tests(display, jobs, [max_time])
Expand All @@ -145,8 +208,20 @@ def execute_tests(self, display, jobs, max_time=None):
be given an UNRESOLVED result.
"""

# Create the test provider object.
provider = TestProvider(self.tests)
# Choose the appropriate parallel execution implementation.
if jobs == 1 or not use_processes or multiprocessing is None:
task_impl = threading.Thread
queue_impl = queue.Queue
canceled_flag = LockedValue(0)
consumer = ThreadResultsConsumer(display)
else:
task_impl = multiprocessing.Process
queue_impl = multiprocessing.Queue
canceled_flag = multiprocessing.Value('i', 0)
consumer = MultiprocessResultsConsumer(self, display, jobs)

# Create the test provider.
provider = TestProvider(self.tests, jobs, queue_impl, canceled_flag)

# Install a console-control signal handler on Windows.
if win32api is not None:
Expand All @@ -162,8 +237,12 @@ def timeout_handler():
timeout_timer = threading.Timer(max_time, timeout_handler)
timeout_timer.start()

# Actually execute the tests.
self._execute_tests_with_provider(provider, display, jobs)
# If not using multiple tasks, just run the tests directly.
if jobs == 1:
run_one_tester(self, provider, consumer)
else:
# Otherwise, execute the tests in parallel
self._execute_tests_in_parallel(task_impl, provider, consumer, jobs)

# Cancel the timeout handler.
if max_time is not None:
Expand All @@ -174,18 +253,10 @@ def timeout_handler():
if test.result is None:
test.setResult(lit.Test.Result(lit.Test.UNRESOLVED, '', 0.0))

def _execute_tests_with_provider(self, provider, display, jobs):
consumer = ThreadResultsConsumer(display)

# If only using one testing thread, don't use tasks at all; this lets us
# profile, among other things.
if jobs == 1:
run_one_tester(self, provider, consumer)
return

def _execute_tests_in_parallel(self, task_impl, provider, consumer, jobs):
# Start all of the tasks.
tasks = [threading.Thread(target=run_one_tester,
args=(self, provider, consumer))
tasks = [task_impl(target=run_one_tester,
args=(self, provider, consumer))
for i in range(jobs)]
for t in tasks:
t.start()
Expand Down

0 comments on commit 4ac723b

Please sign in to comment.