Skip to content

Commit

Permalink
Cleanup a few scheduler warts.
Browse files Browse the repository at this point in the history
Functions are now first class targets for a plan's execution.  The
example thrift planners are converted from tasks to functions to
exercise this.

Scheduler is now a proper `AbstractClass`.  A bad implementatiion of
`Plan.__getattr__` is fixed to allow this.  The same bad implementation
of `__getattr__` is also fixed in `Configuration`.

Testing Done:
CI went green here:
  https://travis-ci.org/pantsbuild/pants/builds/87148211

Bugs closed: 2413, 2453

Reviewed at https://rbcommons.com/s/twitter/r/3032/
  • Loading branch information
jsirois committed Oct 24, 2015
1 parent 9fc3dee commit 2e42869
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 79 deletions.
4 changes: 3 additions & 1 deletion src/python/pants/engine/exp/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ def validate_concrete(self):
"""

def __getattr__(self, item):
return self._kwargs[item]
if item in self._kwargs:
return self._kwargs[item]
raise AttributeError('{} does not have attribute {!r}'.format(self, item))

def _key(self):
if self._hashable_key is None:
Expand Down
18 changes: 9 additions & 9 deletions src/python/pants/engine/exp/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ def success(cls, root_products):
def failure(cls, exit_code):
return cls(exit_code=exit_code, root_products=None)

def __init__(self, global_scheduler):
def __init__(self, local_scheduler):
"""
:param global_scheduler: The global scheduler for creating execution graphs.
:type global_scheduler: :class:`GlobalScheduler`
:param local_scheduler: The local scheduler for creating execution graphs.
:type local_scheduler: :class:`pants.engine.exp.scheduler.LocalScheduler`
"""
self._global_scheduler = global_scheduler
self._local_scheduler = local_scheduler

def execute(self, build_request):
"""Executes the the requested build.
Expand All @@ -46,7 +46,7 @@ def execute(self, build_request):
:returns: The result of the run.
:rtype: :class:`Engine.Result`
"""
execution_graph = self._global_scheduler.execution_graph(build_request)
execution_graph = self._local_scheduler.execution_graph(build_request)
try:
root_products = self.reduce(execution_graph)
return self.Result.success(root_products)
Expand Down Expand Up @@ -96,14 +96,14 @@ def _execute_plan(func, product_type, subjects, *args, **kwargs):
class LocalMultiprocessEngine(Engine):
"""An engine that runs tasks locally and in parallel when possible using a process pool."""

def __init__(self, global_scheduler, pool_size=0):
def __init__(self, local_scheduler, pool_size=0):
"""
:param global_scheduler: The global scheduler for creating execution graphs.
:type global_scheduler: :class:`GlobalScheduler`
:param local_scheduler: The local scheduler for creating execution graphs.
:type local_scheduler: :class:`pants.engine.exp.scheduler.LocalScheduler`
:param pool: A multiprocessing process pool.
:type pool: :class:`multiprocessing.Pool`
"""
super(LocalMultiprocessEngine, self).__init__(global_scheduler)
super(LocalMultiprocessEngine, self).__init__(local_scheduler)
self._pool_size = pool_size if pool_size > 0 else multiprocessing.cpu_count()
self._pool = multiprocessing.Pool(self._pool_size)

Expand Down
50 changes: 29 additions & 21 deletions src/python/pants/engine/exp/examples/planners.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def execute(self, **inputs):
return self.fake_product()


def printing_func(func):
@functools.wraps(func)
def wrapper(**inputs):
print('{} being executed with inputs: {}'.format(func.__name__, inputs))
return '<<<Fake{}Product>>>'.format(func.__name__)
return wrapper


class Requirement(Configuration):
"""A setuptools requirement."""

Expand Down Expand Up @@ -75,15 +83,15 @@ def plan(self, scheduler, product_type, subject, configuration=None):
if isinstance(subject, Jar):
# This plan is only used internally, the finalized plan will s/jar/jars/ for a single global
# resolve.
return Plan(task_type=IvyResolve, subjects=(subject,), jar=subject)
return Plan(func_or_task_type=IvyResolve, subjects=(subject,), jar=subject)

def finalize_plans(self, plans):
subjects = set()
jars = OrderedSet()
for plan in plans:
subjects.update(plan.subjects)
jars.add(plan.jar)
global_plan = Plan(task_type=IvyResolve, subjects=subjects, jars=list(jars))
global_plan = Plan(func_or_task_type=IvyResolve, subjects=subjects, jars=list(jars))
return [global_plan]


Expand Down Expand Up @@ -150,10 +158,10 @@ def extract_thrift_config(self, product_type, target, configuration=None):
"""

@abstractproperty
def gen_task_type(self):
"""Return the type of the code gen task.
def gen_func(self):
"""Return the code gen function.
:rtype: type
:rtype: function
"""

@abstractmethod
Expand All @@ -178,7 +186,7 @@ def plan(self, scheduler, product_type, subject, configuration=None):

subject = Subject(subject, alternate=Target(dependencies=config.deps))
inputs = self.plan_parameters(scheduler, product_type, subject, config)
return Plan(task_type=self.gen_task_type, subjects=(subject,), sources=thrift_sources, **inputs)
return Plan(func_or_task_type=self.gen_func, subjects=(subject,), sources=thrift_sources, **inputs)


class ApacheThriftConfiguration(ThriftConfiguration):
Expand All @@ -194,8 +202,8 @@ def __init__(self, rev=None, gen=None, strict=True, **kwargs):

class ApacheThriftPlanner(ThriftPlanner):
@property
def gen_task_type(self):
return ApacheThrift
def gen_func(self):
return gen_apache_thrift

@memoized_property
def _product_type_by_lang(self):
Expand Down Expand Up @@ -230,9 +238,9 @@ def plan_parameters(self, scheduler, product_type, subject, apache_thrift_config
strict=apache_thrift_config.strict)


class ApacheThrift(PrintingTask):
def execute(self, sources, rev, gen, strict):
return super(ApacheThrift, self).execute(sources=sources, rev=rev, gen=gen, strict=strict)
@printing_func
def gen_apache_thrift(sources, rev, gen, strict):
pass


class ScroogeConfiguration(ThriftConfiguration):
Expand All @@ -247,8 +255,8 @@ def __init__(self, rev=None, lang=None, strict=True, **kwargs):

class ScroogePlanner(ThriftPlanner):
@property
def gen_task_type(self):
return Scrooge
def gen_func(self):
return gen_scrooge_thrift

@memoized_property
def _product_type_by_lang(self):
Expand All @@ -274,18 +282,18 @@ def extract_thrift_config(self, product_type, target, configuration=None):
return configs[0]

def plan_parameters(self, scheduler, product_type, subject, scrooge_config):
# This will come via an option default.
# TODO(John Sirois): once the options system is plumbed, make the tool spec configurable.
# It could also just be pointed at the scrooge jar at that point.
scrooge_classpath = scheduler.promise(Address.parse('src/scala/scrooge'), Classpath)
return dict(scrooge_classpath=scrooge_classpath,
lang=scrooge_config.lang,
strict=scrooge_config.strict)


class Scrooge(PrintingTask):
def execute(self, sources, scrooge_classpath, lang, strict):
return super(Scrooge, self).execute(sources=sources,
scrooge_classpath=scrooge_classpath,
lang=lang,
strict=strict)
@printing_func
def gen_scrooge_thrift(sources, scrooge_classpath, lang, strict):
pass


class JvmCompilerPlanner(TaskPlanner):
Expand Down Expand Up @@ -345,7 +353,7 @@ def plan(self, scheduler, product_type, subject, configuration=None):
classpath = scheduler.promise(dep, Classpath, configuration=dep_config, required=True)
classpath_promises.append(classpath)

return Plan(task_type=self.compile_task_type,
return Plan(func_or_task_type=self.compile_task_type,
subjects=(subject,),
sources=sources,
classpath=classpath_promises)
Expand Down Expand Up @@ -385,7 +393,7 @@ def setup_json_scheduler(build_root):
"""Return a build graph and scheduler configured for BLD.json files under the given build root.
:rtype tuple of (:class:`pants.engine.exp.graph.Graph`,
:class:`pants.engine.exp.scheduler.GlobalScheduler`)
:class:`pants.engine.exp.scheduler.LocalScheduler`)
"""
symbol_table = {'apache_thrift_configuration': ApacheThriftConfiguration,
'jar': Jar,
Expand Down
12 changes: 6 additions & 6 deletions src/python/pants/engine/exp/examples/visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ def format_subject(subject):
return subject.primary.address.spec if subject.primary.address else repr(subject.primary)

def format_promise(promise):
return '{}({})'.format(promise._product_type.__name__, format_subject(promise.subject))
return '{}({})'.format(promise.product_type.__name__, format_subject(promise.subject))

def format_label(product_type, plan):
return '{}:{}'.format(plan._task_type.__name__, product_type.__name__)
return '{}:{}'.format(plan.func_or_task_type.value.__name__, product_type.__name__)

colorscheme = 'set312'
colors = {}
Expand All @@ -36,22 +36,22 @@ def color_index(key):
yield 'digraph plans {'
yield ' node[colorscheme={}];'.format(colorscheme)
yield ' concentrate=true;'
yield ' splines=polyline;'
yield ' rankdir=LR;'

for product_type, plan in execution_graph.walk():
label = format_label(product_type, plan)
color = color_index(plan._task_type)
color = color_index(plan.func_or_task_type)
if len(plan.subjects) > 1:
# NB: naming a subgraph cluster* triggers drawing of a box around the subgraph. We levarge
# this to highlight plans that chunk or are fully global.
# See: http://www.graphviz.org/pdf/dot.1.pdf
yield ' subgraph "cluster_{}" {{'.format(plan._task_type.__name__)
yield ' subgraph "cluster_{}" {{'.format(plan.func_or_task_type.value.__name__)
yield ' colorscheme={};'.format(colorscheme)
yield ' style=filled;'
yield ' fillcolor={};'.format(color)
yield ' label="{}";'.format(label)

subgraph_node_color = color_index((plan._task_type, plan.subjects))
subgraph_node_color = color_index((plan.func_or_task_type, plan.subjects))
for subject in plan.subjects:
yield (' node [style=filled, fillcolor={color}, label="{label}"] "{node}";'
.format(color=subgraph_node_color,
Expand Down
Loading

0 comments on commit 2e42869

Please sign in to comment.