Skip to content

Commit

Permalink
Allow @rule-authors to give rules names (pantsbuild#8592)
Browse files Browse the repository at this point in the history
## Problem

We want rule authors to be able to control what @rules are and are not important enough to be reported, to avoid creating gigantic numbers of workunits that would clog up any reporting infrastructure built on top of workunits.

## Solution

This commit adds the ability to specify a name: str keyword argument to @rule and @console_rules (e..g @rule(name='Some human-readable name')\ndef a_rule_fn(a: A) -> B). @console_rules names are automatically filled in from the name of their associated goal, although this can be overridden with an explicit argument.

When a workunit is about to be created in nodes.rs as part of the NodeKey run function, we check to see whether this Node has a defined display_info - a string that right now is only provided by the name parameter to an @rule, although we can extend this in the future. If there is such a string, we add a workunit whose name is that string to the workunit store. If not, we ignore it. (Note that we also add Workunits to the store with hardcoded names in a few places pertaining to filesystem and remote operations, which this commit doesn't change).

## Result

With this commit, the number of workunits recorded in the workunit store (and thus reported to zipkin) will be drastically decreased, since now only @console_rules have a name by default. A rule author can restore workunits associated with any @rule by giving that @rule a name.
  • Loading branch information
gshuflin authored Nov 22, 2019
1 parent fe2a09d commit c5b8683
Show file tree
Hide file tree
Showing 18 changed files with 223 additions and 97 deletions.
16 changes: 10 additions & 6 deletions src/python/pants/bin/local_pants_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from pants.option.arg_splitter import UnknownGoalHelp
from pants.option.options_bootstrapper import OptionsBootstrapper
from pants.reporting.reporting import Reporting
from pants.reporting.streaming_workunit_handler import StreamingWorkunitHandler
from pants.util.contextutil import maybe_profiled


Expand Down Expand Up @@ -106,13 +107,13 @@ def _maybe_init_graph_session(graph_session, options_bootstrapper,build_config,

v2_ui = options.for_global_scope().v2_ui
zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2
#TODO(gregorys) This should_report_workunits flag must be set to True for
# AsyncWorkunitHandler to receive WorkUnits. It should eventually
#TODO(#8658) This should_report_workunits flag must be set to True for
# StreamingWorkunitHandler to receive WorkUnits. It should eventually
# be merged with the zipkin_trace_v2 flag, since they both involve most
# of the same engine functionality, but for now is separate to avoid
# breaking functionality associated with zipkin tracing while iterating on async workunit reporting.
should_report_workunits = False
graph_session = graph_scheduler_helper.new_session(zipkin_trace_v2, RunTracker.global_instance().run_id, v2_ui, should_report_workunits)
# breaking functionality associated with zipkin tracing while iterating on streaming workunit reporting.
stream_workunits = options.for_scope('reporting').stream_workunits
graph_session = graph_scheduler_helper.new_session(zipkin_trace_v2, RunTracker.global_instance().run_id, v2_ui, should_report_workunits=stream_workunits)
return graph_session, graph_session.scheduler_session

@staticmethod
Expand Down Expand Up @@ -319,7 +320,10 @@ def _run(self):
try:
self._maybe_handle_help()

engine_result = self._maybe_run_v2()
streaming_reporter = StreamingWorkunitHandler(self._scheduler_session, callback=None)
with streaming_reporter.session():
engine_result = self._maybe_run_v2()

goal_runner_result = self._maybe_run_v1()
finally:
try:
Expand Down
66 changes: 58 additions & 8 deletions src/python/pants/engine/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,8 @@ def _get_starting_indent(source):


def _make_rule(
return_type: type, parameter_types: typing.Iterable[type], cacheable: bool = True
return_type: type, parameter_types: typing.Iterable[type], cacheable: bool = True,
name: Optional[bool] = None
) -> Callable[[Callable], Callable]:
"""A @decorator that declares that a particular static function may be used as a TaskRule.
Expand Down Expand Up @@ -285,13 +286,20 @@ def resolve_type(name):
# Register dependencies for @console_rule/Goal.
dependency_rules = (optionable_rule(return_type.Options),) if is_goal_cls else None

# Set a default name for Goal classes if one is not explicitly provided
if is_goal_cls and name is None:
effective_name = return_type.name
else:
effective_name = name

func.rule = TaskRule(
return_type,
tuple(parameter_types),
func,
input_gets=tuple(gets),
dependency_rules=dependency_rules,
cacheable=cacheable,
name=effective_name,
)

return func
Expand All @@ -302,6 +310,14 @@ class InvalidTypeAnnotation(TypeError):
"""Indicates an incorrect type annotation for an `@rule`."""


class UnrecognizedRuleArgument(TypeError):
"""Indicates an unrecognized keyword argument to a `@rule`."""


class MissingTypeAnnotation(TypeError):
"""Indicates a missing type annotation for an `@rule`."""


class MissingReturnTypeAnnotation(InvalidTypeAnnotation):
"""Indicates a missing return type annotation for an `@rule`."""

Expand All @@ -321,14 +337,30 @@ def _ensure_type_annotation(
return annotation


def rule(*args, cacheable=True) -> Callable:
PUBLIC_RULE_DECORATOR_ARGUMENTS = {'name'}
# We don't want @rule-writers to use 'cacheable' as a kwarg directly, but rather
# set it implicitly based on whether the rule annotation is @rule or @console_rule.
# So we leave it out of PUBLIC_RULE_DECORATOR_ARGUMENTS.
IMPLICIT_PRIVATE_RULE_DECORATOR_ARGUMENTS = {'cacheable'}


def rule_decorator(*args, **kwargs) -> Callable:
if len(args) != 1 and not inspect.isfunction(args[0]):
raise ValueError(
'The @rule decorator expects no arguments and for the function it decorates to be '
f'type-annotated. Given {args}.'
)

if len(set(kwargs) - PUBLIC_RULE_DECORATOR_ARGUMENTS - IMPLICIT_PRIVATE_RULE_DECORATOR_ARGUMENTS) != 0:
raise UnrecognizedRuleArgument(
f"`@rule`s and `@console_rule`s only accept the following keyword arguments: {PUBLIC_RULE_DECORATOR_ARGUMENTS}"
)

func = args[0]

cacheable: bool = kwargs['cacheable']
name = kwargs.get('name')

signature = inspect.signature(func)
func_id = f'@rule {func.__module__}:{func.__name__}'
return_type = _ensure_type_annotation(
Expand All @@ -346,11 +378,24 @@ def rule(*args, cacheable=True) -> Callable:
)
for name, parameter in signature.parameters.items()
)
return _make_rule(return_type, parameter_types, cacheable=cacheable)(func)
return _make_rule(return_type, parameter_types, cacheable=cacheable, name=name)(func)


def inner_rule(*args, **kwargs) -> Callable:
if len(args) == 1 and inspect.isfunction(args[0]):
return rule_decorator(*args, **kwargs)
else:
def wrapper(*args):
return rule_decorator(*args, **kwargs)
return wrapper


def rule(*args, **kwargs) -> Callable:
return inner_rule(*args, **kwargs, cacheable=True)


def console_rule(*args) -> Callable:
return rule(*args, cacheable=False)
def console_rule(*args, **kwargs) -> Callable:
return inner_rule(*args, **kwargs, cacheable=False)


def union(cls):
Expand Down Expand Up @@ -460,6 +505,7 @@ class TaskRule(Rule):
_dependency_rules: Tuple
_dependency_optionables: Tuple
cacheable: bool
name: Optional[str]

def __init__(
self,
Expand All @@ -470,6 +516,7 @@ def __init__(
dependency_rules: Optional[Tuple] = None,
dependency_optionables: Optional[Tuple] = None,
cacheable: bool = True,
name: Optional[str] = None,
):
self._output_type = output_type
self.input_selectors = input_selectors
Expand All @@ -478,14 +525,17 @@ def __init__(
self._dependency_rules = dependency_rules or ()
self._dependency_optionables = dependency_optionables or ()
self.cacheable = cacheable
self.name = name

def __str__(self):
return ('({}, {!r}, {}, gets={}, opts={})'
.format(self.output_type.__name__,
return ('(name={}, {!r}, {}, gets={}, opts={})'
.format(self.name or '<not defined>',
self.output_type.__name__,
self.input_selectors,
self.func.__name__,
self.input_gets,
self.dependency_optionables))
self.dependency_optionables,
))

@property
def output_type(self):
Expand Down
5 changes: 4 additions & 1 deletion src/python/pants/engine/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,13 +167,16 @@ def _register_rules(self, rule_index):
else:
raise ValueError('Unexpected Rule type: {}'.format(rule))

def _register_task(self, output_type, rule, union_rules):
def _register_task(self, output_type, rule: TaskRule, union_rules):
"""Register the given TaskRule with the native scheduler."""
func = Function(self._to_key(rule.func))
self._native.lib.tasks_task_begin(self._tasks, func, self._to_type(output_type), rule.cacheable)
for selector in rule.input_selectors:
self._native.lib.tasks_add_select(self._tasks, self._to_type(selector))

if rule.name:
self._native.lib.tasks_add_display_info(self._tasks, rule.name.encode())

def add_get_edge(product, subject):
self._native.lib.tasks_add_get(self._tasks, self._to_type(product), self._to_type(subject))

Expand Down
27 changes: 12 additions & 15 deletions src/python/pants/init/engine_initializer.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
ExecutionOptions,
GlobMatchErrorBehavior,
)
from pants.reporting.async_workunit_handler import AsyncWorkunitHandler


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -172,7 +171,7 @@ class LegacyGraphScheduler:
build_file_aliases: Any
goal_map: Any

def new_session(self, zipkin_trace_v2, build_id, v2_ui=False, should_report_workunits=True):
def new_session(self, zipkin_trace_v2, build_id, v2_ui=False, should_report_workunits=False):
session = self.scheduler.new_session(zipkin_trace_v2, build_id, v2_ui, should_report_workunits)
return LegacyGraphSession(session, self.build_file_aliases, self.goal_map)

Expand Down Expand Up @@ -205,26 +204,24 @@ def run_console_rules(self, options_bootstrapper, goals, target_roots):
:returns: An exit code.
"""

async_reporter = AsyncWorkunitHandler(self.scheduler_session, callback=None)
subject = target_roots.specs
console = Console(
use_colors=options_bootstrapper.bootstrap_options.for_global_scope().colors
)
workspace = Workspace(self.scheduler_session)
interactive_runner = InteractiveRunner(self.scheduler_session)

with async_reporter.session():
for goal in goals:
goal_product = self.goal_map[goal]
params = Params(subject, options_bootstrapper, console, workspace, interactive_runner)
logger.debug(f'requesting {goal_product} to satisfy execution of `{goal}` goal')
try:
exit_code = self.scheduler_session.run_console_rule(goal_product, params)
finally:
console.flush()

if exit_code != PANTS_SUCCEEDED_EXIT_CODE:
return exit_code
for goal in goals:
goal_product = self.goal_map[goal]
params = Params(subject, options_bootstrapper, console, workspace, interactive_runner)
logger.debug(f'requesting {goal_product} to satisfy execution of `{goal}` goal')
try:
exit_code = self.scheduler_session.run_console_rule(goal_product, params)
finally:
console.flush()

if exit_code != PANTS_SUCCEEDED_EXIT_CODE:
return exit_code

return PANTS_SUCCEEDED_EXIT_CODE

Expand Down
3 changes: 2 additions & 1 deletion src/python/pants/pantsd/service/scheduler_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ def prepare_v1_graph_run_v2(self, options, options_bootstrapper):
build_id = RunTracker.global_instance().run_id
v2_ui = options.for_global_scope().v2_ui
zipkin_trace_v2 = options.for_scope('reporting').zipkin_trace_v2
session = self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui)
stream_workunits = options.for_scope('reporting').stream_workunits
session = self._graph_helper.new_session(zipkin_trace_v2, build_id, v2_ui, should_report_workunits=stream_workunits)

if options.for_global_scope().loop:
fn = self._loop
Expand Down
2 changes: 2 additions & 0 deletions src/python/pants/reporting/reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def register_options(cls, register):
register('--zipkin-max-span-batch-size', advanced=True, type=int, default=100,
help='Spans in a Zipkin trace are sent to the Zipkin server in batches.'
'zipkin-max-span-batch-size sets the max size of one batch.')
register('--stream-workunits', advanced=True, type=bool, default=False,
help="If set to true, report workunit information while pants is running")

def initialize(self, run_tracker, all_options, start_time=None):
"""Initialize with the given RunTracker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
DEFAULT_REPORT_INTERVAL_SECONDS = 10


class AsyncWorkunitHandler:
class StreamingWorkunitHandler:
def __init__(self, scheduler: Any, callback: Optional[Callable], report_interval_seconds: float = DEFAULT_REPORT_INTERVAL_SECONDS):
self.scheduler = scheduler
self.report_interval = report_interval_seconds
Expand Down
10 changes: 10 additions & 0 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -551,6 +551,16 @@ pub extern "C" fn tasks_add_select(tasks_ptr: *mut Tasks, product: TypeId) {
})
}

#[no_mangle]
pub extern "C" fn tasks_add_display_info(tasks_ptr: *mut Tasks, name_ptr: *const raw::c_char) {
let name: String = unsafe { CStr::from_ptr(name_ptr) }
.to_string_lossy()
.into_owned();
with_tasks(tasks_ptr, |tasks| {
tasks.add_display_info(name);
})
}

#[no_mangle]
pub extern "C" fn tasks_task_end(tasks_ptr: *mut Tasks) {
with_tasks(tasks_ptr, |tasks| {
Expand Down
35 changes: 16 additions & 19 deletions src/rust/engine/fs/store/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use uuid;
use workunit_store::WorkUnitStore;
use workunit_store::{WorkUnit, WorkUnitStore};

#[derive(Clone)]
pub struct ByteStore {
Expand Down Expand Up @@ -218,12 +218,11 @@ impl ByteStore {
}
})
.then(move |future| {
let workunit = workunit_store::WorkUnit {
name: workunit_name.clone(),
time_span: TimeSpan::since(&start_time),
span_id: workunit_store::generate_random_64bit_string(),
parent_id: workunit_store::get_parent_id(),
};
let workunit = WorkUnit::new(
workunit_name.clone(),
TimeSpan::since(&start_time),
workunit_store::get_parent_id(),
);
workunit_store.add_workunit(workunit);
future
})
Expand Down Expand Up @@ -297,12 +296,11 @@ impl ByteStore {
}
})
.then(move |future| {
let workunit = workunit_store::WorkUnit {
name: workunit_name.clone(),
time_span: TimeSpan::since(&start_time),
span_id: workunit_store::generate_random_64bit_string(),
parent_id: workunit_store::get_parent_id(),
};
let workunit = WorkUnit::new(
workunit_name.clone(),
TimeSpan::since(&start_time),
workunit_store::get_parent_id(),
);
workunit_store.add_workunit(workunit);
future
})
Expand Down Expand Up @@ -345,12 +343,11 @@ impl ByteStore {
})
})
.then(move |future| {
let workunit = workunit_store::WorkUnit {
name: workunit_name.clone(),
time_span: TimeSpan::since(&start_time),
span_id: workunit_store::generate_random_64bit_string(),
parent_id: workunit_store::get_parent_id(),
};
let workunit = WorkUnit::new(
workunit_name.clone(),
TimeSpan::since(&start_time),
workunit_store::get_parent_id(),
);
workunit_store.add_workunit(workunit);
future
})
Expand Down
9 changes: 2 additions & 7 deletions src/rust/engine/process_execution/src/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
};
use std;
use std::cmp::min;
use workunit_store::{generate_random_64bit_string, get_parent_id, WorkUnit, WorkUnitStore};
use workunit_store::{get_parent_id, WorkUnit, WorkUnitStore};

// Environment variable which is exclusively used for cache key invalidation.
// This may be not specified in an ExecuteProcessRequest, and may be populated only by the
Expand Down Expand Up @@ -825,12 +825,7 @@ fn maybe_add_workunit(
// TODO: workunits for scheduling, fetching, executing and uploading should be recorded
// only if '--reporting-zipkin-trace-v2' is set
if !result_cached {
let workunit = WorkUnit {
name: String::from(name),
time_span,
span_id: generate_random_64bit_string(),
parent_id,
};
let workunit = WorkUnit::new(name.to_string(), time_span, parent_id);
workunit_store.add_workunit(workunit);
}
}
Expand Down
Loading

0 comments on commit c5b8683

Please sign in to comment.