Skip to content

Commit

Permalink
Parallize thrift linter
Browse files Browse the repository at this point in the history
Utilize `WorkerPool` to get more mileage out of `thrift-linter` with rougly 20% saving on time.
```bash
time for run in {1..10} ; do ./pants -q --no-cache-read clean-all thrift-linter <some large project> ;done
```
From 19m32s to 15m47s

Testing Done:
https://travis-ci.org/pantsbuild/pants/builds/173574314

Reviewed at https://rbcommons.com/s/twitter/r/4351/
  • Loading branch information
wisechengyi committed Nov 5, 2016
1 parent d88e759 commit 0093628
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@
from __future__ import (absolute_import, division, generators, nested_scopes, print_function,
unicode_literals, with_statement)

from multiprocessing import cpu_count

from pants.backend.jvm.tasks.nailgun_task import NailgunTask
from pants.base.exceptions import TaskError
from pants.base.worker_pool import Work, WorkerPool
from pants.base.workunit import WorkUnitLabel
from pants.option.ranked_value import RankedValue

Expand Down Expand Up @@ -67,10 +70,9 @@ def _is_strict(self, target):

return self._to_bool(self.get_options().strict_default)

def _lint(self, target):
def _lint(self, target, classpath):
self.context.log.debug('Linting {0}'.format(target.address.spec))

classpath = self.tool_classpath('scrooge-linter')
config_args = []

config_args.extend(self.get_options().linter_args)
Expand Down Expand Up @@ -102,13 +104,29 @@ def execute(self):

thrift_targets = self.context.targets(self._is_thrift)
with self.invalidated(thrift_targets) as invalidation_check:
errors = []
for vt in invalidation_check.invalid_vts:
try:
self._lint(vt.target)
except ThriftLintError as e:
errors.append(str(e))
else:
vt.update()
if errors:
raise TaskError('\n'.join(errors))
if not invalidation_check.invalid_vts:
return

with self.context.new_workunit('parallel-thrift-linter') as workunit:
worker_pool = WorkerPool(workunit.parent,
self.context.run_tracker,
cpu_count())

scrooge_linter_classpath = self.tool_classpath('scrooge-linter')
results = []
errors = []
for vt in invalidation_check.invalid_vts:
r = worker_pool.submit_async_work(Work(self._lint, [(vt.target, scrooge_linter_classpath)]))
results.append((r, vt))
for r, vt in results:
r.wait()
# MapResult will raise _value in `get` if the run is not successful.
try:
r.get()
except ThriftLintError as e:
errors.append(str(e))
else:
vt.update()

if errors:
raise TaskError('\n'.join(errors))
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def get_default_jvm_options():
expected_include_paths = {'src/thrift/tweet', 'src/thrift/users'}
expected_paths = {'src/thrift/tweet/a.thrift', 'src/thrift/tweet/b.thrift'}
mock_calculate_compile_sources.return_value = (expected_include_paths, expected_paths)
task._lint(thrift_target)
task._lint(thrift_target, task.tool_classpath('scrooge-linter'))

self._run_java_mock.assert_called_once_with(classpath='foo_classpath',
main='com.twitter.scrooge.linter.Main',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,23 @@ def test_bad_default_override(self):
self.assert_failure(pants_run)
self.assertIn(self.lint_error_token, pants_run.stdout_data)

def test_multiple_bad_strict_override(self):
# Using -q to make sure bad thrift files are in the final exception messages.
target_a = self.thrift_test_target('bad-thrift-strict')
target_b = self.thrift_test_target('bad-thrift-strict2')
cmd = ['-q',
'thrift-linter',
'--strict',
target_a,
target_b,
]
pants_run = self.run_pants(cmd)
self.assert_failure(pants_run)
self.assertIn('bad-strict2.thrift', pants_run.stdout_data)
self.assertIn('bad-strict.thrift', pants_run.stdout_data)
self.assertIn(target_a, pants_run.stdout_data)
self.assertIn(target_b, pants_run.stdout_data)

def test_bad_strict_override(self):
# thrift-linter passes with non-strict command line flag overriding the BUILD section.
cmd = ['thrift-linter', '--no-strict', self.thrift_test_target('bad-thrift-strict')]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ java_thrift_library(
thrift_linter_strict=True,
)

java_thrift_library(
name = 'bad-thrift-strict2',
sources = ['bad-strict2.thrift'],
thrift_linter_strict=True,
)

java_thrift_library(
name = 'bad-thrift-non-strict',
sources = ['bad-non-strict.thrift'],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
// No scala/java namespaces

struct DuckStrict2 {
1: optional string quack,
}
12 changes: 7 additions & 5 deletions src/python/pants/base/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,15 @@ def add_shutdown_hook(self, hook):
def submit_async_work(self, work, workunit_parent=None, on_success=None, on_failure=None):
"""Submit work to be executed in the background.
- work: The work to execute.
- workunit_parent: If specified, work is accounted for under this workunit.
- on_success: If specified, a callable taking a single argument, which will be a list
:param work: The work to execute.
:param workunit_parent: If specified, work is accounted for under this workunit.
:param on_success: If specified, a callable taking a single argument, which will be a list
of return values of each invocation, in order. Called only if all work succeeded.
- on_failure: If specified, a callable taking a single argument, which is an exception
:param on_failure: If specified, a callable taking a single argument, which is an exception
thrown in the work.
:return: `multiprocessing.pool.MapResult`
Don't do work in on_success: not only will it block the result handling thread, but
that thread is not a worker and doesn't have a logging context etc. Use it just to
submit further work to the pool.
Expand All @@ -76,7 +78,7 @@ def submit_async_work(self, work, workunit_parent=None, on_success=None, on_fail
def do_work(*args):
self._do_work(work.func, *args, workunit_name=work.workunit_name,
workunit_parent=workunit_parent, on_failure=on_failure)
self._pool.map_async(do_work, work.args_tuples, chunksize=1, callback=on_success)
return self._pool.map_async(do_work, work.args_tuples, chunksize=1, callback=on_success)

def submit_async_work_chain(self, work_chain, workunit_parent, done_hook=None):
"""Submit work to be executed in the background.
Expand Down

0 comments on commit 0093628

Please sign in to comment.