Skip to content

Commit

Permalink
[engine] replace py impl of validator with native validator (pantsbui…
Browse files Browse the repository at this point in the history
…ld#4259)

### Problem

Validation of the rule graph for the v2 engine still happens on the python side. That's a problem because we want to use the rule graph in the native scheduler and don't have access.

### Solution

 This ports the validator to rust, removing it from the python side. It also adjusts the associated tests so that the new validator implementation is covered by them. They've also been updated to account for some changes in display.

It also splits the LocalScheduler so that it defers to a WrappedNativeScheduler, which allows the validation tests to focus on just the validation portion.

### Follow on

Next is to port or ignore the GraphMaker tests, followed by making use of the graph within the native scheduler to remove statically determinable noop subgraphs.
  • Loading branch information
baroquebobcat authored Feb 18, 2017
1 parent 31a8f64 commit 0395c91
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 237 deletions.
49 changes: 0 additions & 49 deletions src/python/pants/engine/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,55 +61,6 @@ def __str__(self):
self.func.__name__)


class RuleValidationResult(datatype('RuleValidationResult', ['rule', 'errors', 'warnings'])):
"""Container for errors and warnings found during rule validation."""

def valid(self):
return len(self.errors) == 0 and len(self.warnings) == 0

def has_warnings(self):
return len(self.warnings) > 0

def has_errors(self):
return len(self.errors) > 0


class RulesetValidator(object):
"""Validates that the rule index has no missing tasks."""

def __init__(self, rule_index, goal_to_product, root_subject_types):
if not root_subject_types:
raise ValueError('root_subject_types must not be empty')
self._goal_to_product = goal_to_product


self._graph = GraphMaker(rule_index, root_subject_types).full_graph()

def validate(self):
""" Validates that all tasks can be executed based on the declared product types and selectors.
It checks
- all products selected by tasks are produced by some task or intrinsic, or come from a root
subject type
- all goal products are also produced
"""

# TODO cycles, because it should handle that.
error_message = self._graph.error_message()
if error_message:
raise ValueError(error_message)
task_and_intrinsic_product_types = tuple(r.output_product_type for r in self._graph.root_rules)
self._validate_goal_products(task_and_intrinsic_product_types)

def _validate_goal_products(self, task_and_intrinsic_product_types):
for goal, goal_product in self._goal_to_product.items():
if goal_product not in task_and_intrinsic_product_types:
# NB: We could also check goals of the Goal type to see if the products they request are
# also available.
raise ValueError(
'no task for product used by goal "{}": {}'.format(goal, goal_product.__name__))


class SingletonRule(datatype('SingletonRule', ['product_type', 'func']), Rule):
"""A default rule for a product, which is thus a singleton for that product."""

Expand Down
245 changes: 145 additions & 100 deletions src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from pants.engine.fs import PathGlobs, create_fs_intrinsics, generate_fs_subjects
from pants.engine.isolated_process import create_snapshot_intrinsics, create_snapshot_singletons
from pants.engine.nodes import Return, Throw
from pants.engine.rules import RuleIndex, RulesetValidator
from pants.engine.rules import RuleIndex
from pants.engine.selectors import (Select, SelectDependencies, SelectLiteral, SelectProjection,
SelectVariant, constraint_for)
from pants.engine.struct import HasProducts, Variants
Expand All @@ -41,70 +41,35 @@ class ExecutionRequest(datatype('ExecutionRequest', ['roots'])):
"""


class LocalScheduler(object):
"""A scheduler that expands a product Graph by executing user defined tasks."""

def __init__(self,
goals,
tasks,
project_tree,
native,
graph_lock=None):
"""
:param goals: A dict from a goal name to a product type. A goal is just an alias for a
particular (possibly synthetic) product.
:param tasks: A set of (output, input selection clause, task function) triples which
is used to compute values in the product graph.
:param project_tree: An instance of ProjectTree for the current build root.
:param native: An instance of engine.subsystem.native.Native.
:param graph_lock: A re-entrant lock to use for guarding access to the internal product Graph
instance. Defaults to creating a new threading.RLock().
"""
self._products_by_goal = goals
self._project_tree = project_tree
self._native = native
self._product_graph_lock = graph_lock or threading.RLock()
self._run_count = 0

class WrappedNativeScheduler(object):
def __init__(self, native, rule_index, root_subject_types):
# TODO: The only (?) case where we use inheritance rather than exact type unions.
has_products_constraint = SubclassesOf(HasProducts)

# Create the ExternContext, and the native Scheduler.
self._native = native
self._scheduler = native.new_scheduler(has_products_constraint,
constraint_for(Address),
constraint_for(Variants))
self._execution_request = None

# Validate and register all provided and intrinsic tasks.
# TODO: This bounding of input Subject types allows for closed-world validation, but is not
# strictly necessary for execution. We might eventually be able to remove it by only executing
# validation below the execution roots (and thus not considering paths that aren't in use).

root_subject_types = {
Address,
BuildFileAddress,
AscendantAddresses,
DescendantAddresses,
PathGlobs,
SiblingAddresses,
SingleAddress,
}
intrinsics = create_fs_intrinsics(project_tree) + create_snapshot_intrinsics(project_tree)
singletons = create_snapshot_singletons(project_tree)
rule_index = RuleIndex.create(tasks, intrinsics, singletons)

self._register_tasks(rule_index.tasks)
self._register_intrinsics(rule_index.intrinsics)
self._register_singletons(rule_index.singletons)
self.root_subject_types = root_subject_types

self._validate_ruleset(root_subject_types)
def graph_trace(self):
with temporary_file_path() as path:
self._native.lib.graph_trace(self._scheduler, bytes(path))
with open(path) as fd:
for line in fd.readlines():
yield line.rstrip()

RulesetValidator(rule_index, goals, root_subject_types).validate()
def assert_ruleset_valid(self):
listed = list(TypeId(self._to_id(t)) for t in self.root_subject_types)

def _validate_ruleset(self, root_subject_types):
listed = list(TypeId(self._to_id(t)) for t in root_subject_types)
raw_value = self._native.lib.validator_run(self._scheduler, listed, len(listed))
value = self._from_value(raw_value)

self._native.lib.validator_run(self._scheduler, listed, len(listed))
if isinstance(value, Exception):
raise ValueError(str(value))

def _to_value(self, obj):
return self._native.context.to_value(obj)
Expand Down Expand Up @@ -209,14 +174,124 @@ def _register_tasks(self, tasks):
raise ValueError('Unrecognized Selector type: {}'.format(selector))
self._native.lib.task_end(self._scheduler)

def visualize_graph_to_file(self, filename):
self._native.lib.graph_visualize(self._scheduler, bytes(filename))

def invalidate_via_keys(self, subject_keys):
return self._native.lib.graph_invalidate(self._scheduler,
subject_keys,
len(subject_keys))

def graph_len(self):
return self._native.lib.graph_len(self._scheduler)

def exec_reset(self):
self._native.lib.execution_reset(self._scheduler)

def add_root_selection(self, subject, selector):
if type(selector) is Select:
self._native.lib.execution_add_root_select(self._scheduler, self._to_key(subject),
self._to_constraint(selector.product))
elif type(selector) is SelectDependencies:
self._native.lib.execution_add_root_select_dependencies(self._scheduler,
self._to_key(subject),
self._to_constraint(selector.product),
self._to_constraint(
selector.dep_product),
self._to_utf8_buf(selector.field),
self._to_ids_buf(
selector.field_types),
selector.transitive)
else:
raise ValueError('Unsupported root selector type: {}'.format(selector))

def run_and_return_stat(self):
return self._native.lib.execution_execute(self._scheduler)

def visualize_to_dir(self):
return self._native.visualize_to_dir

def to_keys(self, subjects):
return list(self._to_key(subject) for subject in subjects)

def root_entries(self, execution_request):
raw_roots = self._native.lib.execution_roots(self._scheduler)
try:
roots = {}
for root, raw_root in zip(execution_request.roots,
self._native.unpack(raw_roots.nodes_ptr,
raw_roots.nodes_len)):
if raw_root.state_tag is 0:
state = None
elif raw_root.state_tag is 1:
state = Return(self._from_value(raw_root.state_value))
elif raw_root.state_tag is 2:
state = Throw(self._from_value(raw_root.state_value))
elif raw_root.state_tag is 3:
state = Throw(self._from_value(raw_root.state_value))
else:
raise ValueError(
'Unrecognized State type `{}` on: {}'.format(raw_root.state_tag, raw_root))
roots[root] = state
finally:
self._native.lib.nodes_destroy(raw_roots)
return roots


class LocalScheduler(object):
"""A scheduler that expands a product Graph by executing user defined tasks."""

def __init__(self,
goals,
tasks,
project_tree,
native,
graph_lock=None):
"""
:param goals: A dict from a goal name to a product type. A goal is just an alias for a
particular (possibly synthetic) product.
:param tasks: A set of (output, input selection clause, task function) triples which
is used to compute values in the product graph.
:param project_tree: An instance of ProjectTree for the current build root.
:param native: An instance of engine.subsystem.native.Native.
:param graph_lock: A re-entrant lock to use for guarding access to the internal product Graph
instance. Defaults to creating a new threading.RLock().
"""
self._products_by_goal = goals
self._project_tree = project_tree
self._product_graph_lock = graph_lock or threading.RLock()
self._run_count = 0

# Create the ExternContext, and the native Scheduler.
self._execution_request = None


# Validate and register all provided and intrinsic tasks.
# TODO: This bounding of input Subject types allows for closed-world validation, but is not
# strictly necessary for execution. We might eventually be able to remove it by only executing
# validation below the execution roots (and thus not considering paths that aren't in use).

root_subject_types = {
Address,
BuildFileAddress,
AscendantAddresses,
DescendantAddresses,
PathGlobs,
SiblingAddresses,
SingleAddress,
}
intrinsics = create_fs_intrinsics(project_tree) + create_snapshot_intrinsics(project_tree)
singletons = create_snapshot_singletons(project_tree)
rule_index = RuleIndex.create(tasks, intrinsics, singletons)
self._scheduler = WrappedNativeScheduler(native, rule_index, root_subject_types)

self._scheduler.assert_ruleset_valid()

def trace(self):
"""Yields a stringified 'stacktrace' starting from the scheduler's roots."""
with self._product_graph_lock:
with temporary_file_path() as path:
self._native.lib.graph_trace(self._scheduler, bytes(path))
with open(path) as fd:
for line in fd.readlines():
yield line.rstrip()
for line in self._scheduler.graph_trace():
yield line

def visualize_graph_to_file(self, filename):
"""Visualize a graph walk by writing graphviz `dot` output to a file.
Expand All @@ -225,7 +300,7 @@ def visualize_graph_to_file(self, filename):
:param str filename: The filename to output the graphviz output to.
"""
with self._product_graph_lock:
self._native.lib.graph_visualize(self._scheduler, bytes(filename))
self._scheduler.visualize_graph_to_file(filename)

def build_request(self, goals, subjects):
"""Translate the given goal names into product types, and return an ExecutionRequest.
Expand Down Expand Up @@ -280,59 +355,29 @@ def root_entries(self, execution_request):
with self._product_graph_lock:
if self._execution_request is not execution_request:
raise AssertionError(
"Multiple concurrent executions are not supported! {} vs {}".format(
self._execution_request, execution_request))
raw_roots = self._native.gc(self._native.lib.execution_roots(self._scheduler),
self._native.lib.nodes_destroy)
roots = {}
for root, raw_root in zip(execution_request.roots, self._native.unpack(raw_roots.nodes_ptr, raw_roots.nodes_len)):
if raw_root.state_tag is 0:
state = None
elif raw_root.state_tag is 1:
state = Return(self._from_value(raw_root.state_value))
elif raw_root.state_tag is 2:
state = Throw(self._from_value(raw_root.state_value))
elif raw_root.state_tag is 3:
state = Throw(self._from_value(raw_root.state_value))
else:
raise ValueError('Unrecognized State type `{}` on: {}'.format(raw_root.state_tag, raw_root))
roots[root] = state
return roots
"Multiple concurrent executions are not supported! {} vs {}".format(
self._execution_request, execution_request))
return self._scheduler.root_entries(execution_request)

def invalidate_files(self, filenames):
"""Calls `Graph.invalidate_files()` against an internal product Graph instance."""
subjects = set(generate_fs_subjects(filenames))
subject_keys = list(self._to_key(subject) for subject in subjects)
subject_keys = self._scheduler.to_keys(subjects)
with self._product_graph_lock:
invalidated = self._native.lib.graph_invalidate(self._scheduler,
subject_keys,
len(subject_keys))
invalidated = self._scheduler.invalidate_via_keys(subject_keys)
logger.debug('invalidated %d nodes for subjects: %s', invalidated, subjects)
return invalidated

def node_count(self):
with self._product_graph_lock:
return self._native.lib.graph_len(self._scheduler)
return self._scheduler.graph_len()

def _execution_add_roots(self, execution_request):
if self._execution_request is not None:
self._native.lib.execution_reset(self._scheduler)
self._scheduler.exec_reset()
self._execution_request = execution_request
for subject, selector in execution_request.roots:
if type(selector) is Select:
self._native.lib.execution_add_root_select(self._scheduler,
self._to_key(subject),
self._to_constraint(selector.product))
elif type(selector) is SelectDependencies:
self._native.lib.execution_add_root_select_dependencies(self._scheduler,
self._to_key(subject),
self._to_constraint(selector.product),
self._to_constraint(selector.dep_product),
self._to_utf8_buf(selector.field),
self._to_ids_buf(selector.field_types),
selector.transitive)
else:
raise ValueError('Unsupported root selector type: {}'.format(selector))
self._scheduler.add_root_selection(subject, selector)

def schedule(self, execution_request):
"""Yields batches of Steps until the roots specified by the request have been completed.
Expand All @@ -347,21 +392,21 @@ def schedule(self, execution_request):
# Reset execution, and add any roots from the request.
self._execution_add_roots(execution_request)
# Execute in native engine.
execution_stat = self._native.lib.execution_execute(self._scheduler)
execution_stat = self._scheduler.run_and_return_stat()
# Receive execution statistics.
runnable_count = execution_stat.runnable_count
scheduling_iterations = execution_stat.scheduling_iterations

if self._native.visualize_to_dir is not None:
if self._scheduler.visualize_to_dir() is not None:
name = 'run.{}.dot'.format(self._run_count)
self._run_count += 1
self.visualize_graph_to_file(os.path.join(self._native.visualize_to_dir, name))
self.visualize_graph_to_file(os.path.join(self._scheduler.visualize_to_dir(), name))

logger.debug(
'ran %s scheduling iterations and %s runnables in %f seconds. '
'there are %s total nodes.',
scheduling_iterations,
runnable_count,
time.time() - start_time,
self._native.lib.graph_len(self._scheduler)
self._scheduler.graph_len()
)
Loading

0 comments on commit 0395c91

Please sign in to comment.