Skip to content

Commit

Permalink
[engine] no longer content address subject
Browse files Browse the repository at this point in the history
TL;DR: no longer CA subject, CA nodes but will be smart about it in the next
review.

Still part of [1], at present we CA 100%, subject is sth we can save, because
the only way for a task to view a subject is by `Select`ing it and in that case
it will be CA-ed as `Return(subject)`.

While we remove subject from CA, because [2] we would have to either make all
subjects orderable (certainly a big restriction and not preferred) or CA the
node itself. This review does the latter. The next review will make CA nodes
(as well as other things) only happen when they are for caching or for
multi-process.

Other changes included:

* Overwrite default `AddressMapper.__repr__`, so when it is subject, in debug
  mode, it doesn't have machine address, that results cache misses.
* `Node` is content address-ed in `Scheduler`, which might be questionable.
  Previous thinking is to have scheduler stay out of CA business, this can
  still be achived by having an additional conversion in Engine, we can revisit
  if there is such a need.
* No more special handling of cyclic `Noop`, previousely due to scheduler
  introduces a `State` but it doesn't do CA now it does

[1] Content address performance pantsbuild#3066
[2] Randomness as in https://rbcommons.com/s/twitter/r/3593/

Testing Done:
https://travis-ci.org/peiyuwang/pants/builds/118313227

Bugs closed: 3066, 3090

Reviewed at https://rbcommons.com/s/twitter/r/3604/
  • Loading branch information
peiyuwang committed Mar 24, 2016
1 parent b7514f6 commit 899ce53
Show file tree
Hide file tree
Showing 15 changed files with 136 additions and 139 deletions.
21 changes: 11 additions & 10 deletions src/python/pants/engine/exp/examples/planners.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,8 @@ def table(cls):
def setup_json_scheduler(build_root, debug=True):
"""Return a build graph and scheduler configured for BLD.json files under the given build root.
:rtype :class:`pants.engine.exp.scheduler.LocalScheduler`
:rtype A tuple of :class:`pants.engine.exp.scheduler.LocalScheduler`,
:class:`pants.engine.exp.storage.Storage`.
"""

storage = Storage.create(debug=debug)
Expand All @@ -415,11 +416,11 @@ def setup_json_scheduler(build_root, debug=True):

# Register "literal" subjects required for these tasks.
# TODO: Replace with `Subsystems`.
address_mapper_key = storage.put(AddressMapper(symbol_table_cls=symbol_table_cls,
address_mapper = AddressMapper(symbol_table_cls=symbol_table_cls,
build_pattern=r'^BLD.json$',
parser_cls=JsonParser))
source_roots_key = storage.put(SourceRoots(('src/java','src/scala')))
scrooge_tool_address_key = storage.put(Address.parse('src/scala/scrooge'))
parser_cls=JsonParser)
source_roots = SourceRoots(('src/java','src/scala'))
scrooge_tool_address = Address.parse('src/scala/scrooge')

goals = {
'compile': Classpath,
Expand All @@ -445,12 +446,12 @@ def setup_json_scheduler(build_root, debug=True):
(ScalaSources,
[Select(ThriftSources),
SelectVariant(ScroogeScalaConfiguration, 'thrift'),
SelectLiteral(scrooge_tool_address_key, Classpath)],
SelectLiteral(scrooge_tool_address, Classpath)],
gen_scrooge_thrift),
(JavaSources,
[Select(ThriftSources),
SelectVariant(ScroogeJavaConfiguration, 'thrift'),
SelectLiteral(scrooge_tool_address_key, Classpath)],
SelectLiteral(scrooge_tool_address, Classpath)],
gen_scrooge_thrift),
] + [
# scala dependency inference
Expand All @@ -467,7 +468,7 @@ def setup_json_scheduler(build_root, debug=True):
select_package_address),
(PathGlobs,
[Select(JVMPackageName),
SelectLiteral(source_roots_key, SourceRoots)],
SelectLiteral(source_roots, SourceRoots)],
calculate_package_search_path),
] + [
# Remote dependency resolution
Expand Down Expand Up @@ -503,10 +504,10 @@ def setup_json_scheduler(build_root, debug=True):
[Select(UnpickleableOutput)],
unpickleable_input),
] + (
create_graph_tasks(address_mapper_key, symbol_table_cls)
create_graph_tasks(address_mapper, symbol_table_cls)
) + (
create_fs_tasks()
)

project_tree = FileSystemProjectTree(build_root)
return LocalScheduler(goals, tasks, symbol_table_cls, project_tree), storage
return LocalScheduler(goals, tasks, symbol_table_cls, storage, project_tree), storage
7 changes: 3 additions & 4 deletions src/python/pants/engine/exp/examples/visualizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ def format_type(node):


def format_subject(node):
subject = node.subject_key.string
if node.variants:
return '({})@{}'.format(subject, ','.join('{}={}'.format(k, v) for k, v in node.variants))
return '({})@{}'.format(node.subject, ','.join('{}={}'.format(k, v) for k, v in node.variants))
else:
return '({})'.format(subject)
return '({})'.format(node.subject)


def format_product(node):
Expand Down Expand Up @@ -98,7 +97,7 @@ def visualize_execution_graph(scheduler, storage, request):

def visualize_build_request(build_root, goals, subjects):
scheduler, storage = setup_json_scheduler(build_root)
execution_request = scheduler.build_request(goals, storage.puts(subjects))
execution_request = scheduler.build_request(goals, subjects)
# NB: Calls `reduce` independently of `execute`, in order to render a graph before validating it.
engine = LocalSerialEngine(scheduler, storage)
engine.start()
Expand Down
9 changes: 5 additions & 4 deletions src/python/pants/engine/exp/legacy/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,19 @@ def setup():

# Register "literal" subjects required for these tasks.
# TODO: Replace with `Subsystems`.
address_mapper_key = storage.put(AddressMapper(symbol_table_cls=symbol_table_cls,
parser_cls=LegacyPythonCallbacksParser))
address_mapper = AddressMapper(symbol_table_cls=symbol_table_cls,
parser_cls=LegacyPythonCallbacksParser)

# Create a Scheduler containing graph and filesystem tasks, with no installed goals. The ExpGraph
# will explicitly request the products it needs.
tasks = (
create_legacy_graph_tasks() +
create_fs_tasks() +
create_graph_tasks(address_mapper_key, symbol_table_cls)
create_graph_tasks(address_mapper, symbol_table_cls)
)

return (
LocalScheduler(dict(), tasks, symbol_table_cls, project_tree),
LocalScheduler(dict(), tasks, symbol_table_cls, storage, project_tree),
storage,
spec_roots,
symbol_table_cls
Expand All @@ -80,6 +80,7 @@ def dependencies():
graph = ExpGraph(scheduler, engine, symbol_table_cls)
for address in graph.inject_specs_closure(spec_roots):
print(address)
print('Cache stats: {}'.format(engine._cache.get_stats()), file=sys.stderr)
finally:
engine.close()

Expand Down
4 changes: 2 additions & 2 deletions src/python/pants/engine/exp/legacy/graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def _index(self, roots):
if state_key.type is Throw:
# TODO: get access to `Storage` instance in order to `to-str` more effectively here.
raise AddressLookupError(
'Build graph construction failed for {}:\n {}'.format(node.subject_key, state.exc))
'Build graph construction failed for {}:\n {}'.format(node.subject, state.exc))
elif state_key.type is not Return:
State.raise_unrecognized(state_key.type)
if node.product is not LegacyBuildGraphNode:
Expand Down Expand Up @@ -124,7 +124,7 @@ def inject_address_closure(self, address):

def inject_specs_closure(self, specs, fail_fast=None):
# Request loading of these specs.
request = self._scheduler.execution_request([LegacyBuildGraphNode], self._engine.storage.puts(specs))
request = self._scheduler.execution_request([LegacyBuildGraphNode], specs)
result = self._engine.execute(request)
if result.error:
raise result.error
Expand Down
7 changes: 7 additions & 0 deletions src/python/pants/engine/exp/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,10 @@ def __ne__(self, other):
def __hash__(self):
# Compiled regexes are not hashable.
return hash((self.symbol_table_cls, self.parser_cls))

def __repr__(self):
return 'AddressMapper(parser={}, symbol_table={}, build_pattern={})'.format(
self.parser_cls, self.symbol_table_cls, self.build_pattern.pattern)

def __str__(self):
return repr(self)
55 changes: 24 additions & 31 deletions src/python/pants/engine/exp/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from pants.engine.exp.addressable import parse_variants
from pants.engine.exp.fs import (DirectoryListing, FileContent, Path, PathLiteral, Paths,
file_content, list_directory, path_exists)
from pants.engine.exp.storage import Key
from pants.engine.exp.struct import HasStructs, Variants
from pants.util.meta import AbstractClass
from pants.util.objects import datatype
Expand Down Expand Up @@ -71,11 +70,9 @@ class Node(AbstractClass):
def validate_node(cls, node):
if not isinstance(node, Node):
raise ValueError('Value {} is not a Node.'.format(node))
if type(node.subject_key) is not Key:
raise ValueError('Node {} has a non-Key subject.'.format(node))

@abstractproperty
def subject_key(self):
def subject(self):
"""The subject for this Node."""

@abstractproperty
Expand All @@ -91,7 +88,7 @@ def is_cacheable(self):
"""Whether node should be cached or not."""

@abstractmethod
def step(self, subject, dependency_states, step_context):
def step(self, dependency_states, step_context):
"""Given a dict of the dependency States for this Node, returns the current State of the Node.
The NodeBuilder parameter provides a way to construct Nodes that require information about
Expand All @@ -106,7 +103,7 @@ def step(self, subject, dependency_states, step_context):
"""


class SelectNode(datatype('SelectNode', ['subject_key', 'product', 'variants', 'variant_key']), Node):
class SelectNode(datatype('SelectNode', ['subject', 'product', 'variants', 'variant_key']), Node):
"""A Node that selects a product for a subject.
A Select can be satisfied by multiple sources, but fails if multiple sources produce a value. The
Expand All @@ -124,7 +121,7 @@ def _variants_node(self):
# TODO: This super-broad check is crazy expensive. Should reduce to just doing Variants
# lookups for literal/addressable products.
if self.product != Variants:
return SelectNode(self.subject_key, Variants, self.variants, None)
return SelectNode(self.subject, Variants, self.variants, None)
return None

def _select_literal(self, candidate, variant_value):
Expand All @@ -150,7 +147,7 @@ def items():
return item
return None

def step(self, subject, dependency_states, step_context):
def step(self, dependency_states, step_context):
# Request default Variants for the subject, so that if there are any we can propagate
# them to task nodes.
variants = self.variants
Expand All @@ -176,13 +173,13 @@ def step(self, subject, dependency_states, step_context):
variant_value = variant_values[0]

# If the Subject "is a" or "has a" Product, then we're done.
literal_value = self._select_literal(subject, variant_value)
literal_value = self._select_literal(self.subject, variant_value)
if literal_value is not None:
return Return(literal_value)

# Else, attempt to use a configured task to compute the value.
has_waiting_dep = False
dependencies = list(step_context.gen_nodes(self.subject_key, self.product, variants))
dependencies = list(step_context.gen_nodes(self.subject, self.product, variants))
matches = {}
for dep in dependencies:
dep_state = dependency_states.get(dep, None)
Expand All @@ -205,13 +202,13 @@ def step(self, subject, dependency_states, step_context):
# TODO: Multiple successful tasks are not currently supported. We should allow for this
# by adding support for "mergeable" products. see:
# https://github.com/pantsbuild/pants/issues/2526
return Throw(ConflictingProducersError.create(subject, self.product, matches))
return Throw(ConflictingProducersError.create(self.subject, self.product, matches))
elif len(matches) == 1:
return Return(matches.values()[0])
return Noop('No source of {}.'.format(self))


class DependenciesNode(datatype('DependenciesNode', ['subject_key', 'product', 'variants', 'dep_product', 'field']), Node):
class DependenciesNode(datatype('DependenciesNode', ['subject', 'product', 'variants', 'dep_product', 'field']), Node):
"""A Node that selects the given Product for each of the items in a field `field` on this subject.
Begins by selecting the `dep_product` for the subject, and then selects a product for each
Expand All @@ -226,7 +223,7 @@ def is_cacheable(self):
return True

def _dep_product_node(self):
return SelectNode(self.subject_key, self.dep_product, self.variants, None)
return SelectNode(self.subject, self.dep_product, self.variants, None)

def _dependency_nodes(self, step_context, dep_product):
for dependency in getattr(dep_product, self.field or 'dependencies'):
Expand All @@ -235,9 +232,9 @@ def _dependency_nodes(self, step_context, dep_product):
# If a subject has literal variants for particular dependencies, they win over all else.
dependency, literal_variants = parse_variants(dependency)
variants = Variants.merge(variants, literal_variants)
yield SelectNode(step_context.introduce_subject(dependency), self.product, variants, None)
yield SelectNode(dependency, self.product, variants, None)

def step(self, subject, dependency_states, step_context):
def step(self, dependency_states, step_context):
# Request the product we need in order to request dependencies.
dep_product_node = self._dep_product_node()
dep_product_state = dependency_states.get(dep_product_node, None)
Expand Down Expand Up @@ -268,7 +265,7 @@ def step(self, subject, dependency_states, step_context):
return Return([dependency_states[d].value for d in dependencies])


class ProjectionNode(datatype('ProjectionNode', ['subject_key', 'product', 'variants', 'projected_subject', 'fields', 'input_product']), Node):
class ProjectionNode(datatype('ProjectionNode', ['subject', 'product', 'variants', 'projected_subject', 'fields', 'input_product']), Node):
"""A Node that selects the given input Product for the Subject, and then selects for a new subject.
TODO: This is semantically very similar to DependenciesNode (which might be considered to be a
Expand All @@ -280,12 +277,12 @@ def is_cacheable(self):
return True

def _input_node(self):
return SelectNode(self.subject_key, self.input_product, self.variants, None)
return SelectNode(self.subject, self.input_product, self.variants, None)

def _output_node(self, step_context, projected_subject):
return SelectNode(step_context.introduce_subject(projected_subject), self.product, self.variants, None)
return SelectNode(projected_subject, self.product, self.variants, None)

def step(self, subject, dependency_states, step_context):
def step(self, dependency_states, step_context):
# Request the product we need to compute the subject.
input_node = self._input_node()
input_state = dependency_states.get(input_node, None)
Expand Down Expand Up @@ -323,18 +320,18 @@ def step(self, subject, dependency_states, step_context):
raise State.raise_unrecognized(output_state)


class TaskNode(datatype('TaskNode', ['subject_key', 'product', 'variants', 'func', 'clause']), Node):
class TaskNode(datatype('TaskNode', ['subject', 'product', 'variants', 'func', 'clause']), Node):

@property
def is_cacheable(self):
return True

def step(self, subject, dependency_states, step_context):
def step(self, dependency_states, step_context):
# Compute dependencies.
dep_values = []
dependencies = []
for select in self.clause:
dep = select.construct_node(self.subject_key, self.variants)
dep = select.construct_node(self.subject, self.variants)
if dep is None:
return Noop('Dependency {} is not satisfiable.'.format(select))
dependencies.append(dep)
Expand All @@ -361,7 +358,7 @@ def step(self, subject, dependency_states, step_context):
return Throw(e)


class FilesystemNode(datatype('FilesystemNode', ['subject_key', 'product', 'variants']), Node):
class FilesystemNode(datatype('FilesystemNode', ['subject', 'product', 'variants']), Node):
"""A native node type for filesystem operations."""

_FS_PRODUCT_TYPES = {
Expand All @@ -383,9 +380,9 @@ def _input_type(self):
return self._FS_PRODUCT_TYPES[self.product]

def _input_node(self):
return SelectNode(self.subject_key, self._input_type(), self.variants, None)
return SelectNode(self.subject, self._input_type(), self.variants, None)

def step(self, subject, dependency_states, step_context):
def step(self, dependency_states, step_context):
# Request the relevant input product for the output product.
input_node = self._input_node()
input_state = dependency_states.get(input_node, None)
Expand Down Expand Up @@ -424,10 +421,6 @@ def __init__(self, node_builder, storage, project_tree):
self._storage = storage
self.project_tree = project_tree

def introduce_subject(self, subject):
"""Introduces a potentially new Subject, and returns a subject Key."""
return self._storage.put(subject)

def gen_nodes(self, subject_key, product, variants):
def gen_nodes(self, subject, product, variants):
"""Yields Node instances which might be able to provide a value for the given inputs."""
return self._node_builder.gen_nodes(subject_key, product, variants)
return self._node_builder.gen_nodes(subject, product, variants)
6 changes: 3 additions & 3 deletions src/python/pants/engine/exp/register.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def create_fs_tasks():
]


def create_graph_tasks(address_mapper_key, symbol_table_cls):
def create_graph_tasks(address_mapper, symbol_table_cls):
"""Creates tasks used to parse Structs from BUILD files.
:param address_mapper_key: The subject key for an AddressMapper instance.
Expand All @@ -66,12 +66,12 @@ def create_graph_tasks(address_mapper_key, symbol_table_cls):
] + [
# BUILD file parsing.
(AddressFamily,
[SelectLiteral(address_mapper_key, AddressMapper),
[SelectLiteral(address_mapper, AddressMapper),
Select(Path),
SelectProjection(FilesContent, Paths, ('paths',), BuildFilePaths)],
parse_address_family),
(BuildFilePaths,
[SelectLiteral(address_mapper_key, AddressMapper),
[SelectLiteral(address_mapper, AddressMapper),
Select(DirectoryListing)],
filter_buildfile_paths),
] + [
Expand Down
Loading

0 comments on commit 899ce53

Please sign in to comment.