Skip to content

Commit

Permalink
Filter streaming workunits by LogLevel (pantsbuild#9854)
Browse files Browse the repository at this point in the history
### Problem

Now that we have the ability to assign a LogLevel to workunits, we want to use that log level to filter which workunits we stream to Python plugins.

### Solution

Remove the `display` bool on `WorkunitMetadata` and replace it with logic that accepts a maximum log verbosity `LogLevel` from Python code, and filters streaming workunits based on this maximum verbosity.
  • Loading branch information
gshuflin authored May 23, 2020
1 parent 93a66ca commit f9853d9
Show file tree
Hide file tree
Showing 11 changed files with 194 additions and 54 deletions.
115 changes: 112 additions & 3 deletions src/python/pants/engine/internals/engine_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
fmt_rule,
remove_locations_from_traceback,
)
from pants.util.logging import LogLevel


class A:
Expand Down Expand Up @@ -50,7 +51,7 @@ class Fib:
val: int


@named_rule
@named_rule(desc="Fibonacci")
async def fib(n: int) -> Fib:
if n < 2:
return Fib(n)
Expand Down Expand Up @@ -97,7 +98,11 @@ class Omega:
pass


@named_rule(canonical_name="rule_one")
class Epsilon:
pass


@named_rule(canonical_name="rule_one", desc="Rule number 1")
async def rule_one_function(i: Input) -> Beta:
"""This rule should be the first one executed by the engine, and thus have no parent."""
a = Alpha()
Expand All @@ -107,7 +112,7 @@ async def rule_one_function(i: Input) -> Beta:
return b


@named_rule
@named_rule(desc="Rule number 2")
async def rule_two(a: Alpha) -> Omega:
"""This rule should be invoked in the body of `rule_one` and therefore its workunit should be a
child of `rule_one`'s workunit."""
Expand All @@ -129,6 +134,25 @@ def rule_four(a: Alpha) -> Gamma:
return Gamma()


@named_rule(desc="Rule A")
async def rule_A(i: Input) -> Alpha:
o = Omega()
a = await Get[Alpha](Omega, o)
return a


@rule
async def rule_B(o: Omega) -> Alpha:
e = Epsilon()
a = await Get[Alpha](Epsilon, e)
return a


@named_rule(desc="Rule C")
def rule_C(e: Epsilon) -> Alpha:
return Alpha()


class EngineTest(unittest.TestCase, SchedulerTestBase):

assert_equal_with_printing = assert_equal_with_printing
Expand Down Expand Up @@ -392,10 +416,95 @@ def test_streaming_workunits_parent_id_and_rule_metadata(self):
r3 = next(item for item in finished if item["name"] == "rule_three")
r4 = next(item for item in finished if item["name"] == "rule_four")

# rule_one should have no parent_id because its actual parent workunit was filted based on level
assert r1.get("parent_id", None) is None

assert r2["parent_id"] == r1["span_id"]
assert r3["parent_id"] == r1["span_id"]
assert r4["parent_id"] == r2["span_id"]

assert r3["description"] == "Rule number 3"
assert r4["description"] == "Rule number 4"
assert r4["level"] == "INFO"

def test_streaming_workunit_log_levels(self) -> None:
rules = [RootRule(Input), rule_one_function, rule_two, rule_three, rule_four]
scheduler = self.mk_scheduler(
rules, include_trace_on_error=False, should_report_workunits=True
)
tracker = self.WorkunitTracker()
handler = StreamingWorkunitHandler(
scheduler,
callbacks=[tracker.add],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.TRACE,
)

with handler.session():
i = Input()
scheduler.product_request(Beta, subjects=[i])

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

# With the max_workunit_verbosity set to TRACE, we should see the workunit corresponding to the Select node.
select = next(
item
for item in finished
if item["name"] not in {"rule_one", "rule_two", "rule_three", "rule_four"}
)
assert select["name"] == "select"
assert select["level"] == "DEBUG"

r1 = next(item for item in finished if item["name"] == "rule_one")
assert r1["parent_id"] == select["span_id"]

def test_streaming_workunit_log_level_parent_rewrite(self) -> None:
rules = [RootRule(Input), rule_A, rule_B, rule_C]
scheduler = self.mk_scheduler(
rules, include_trace_on_error=False, should_report_workunits=True
)
tracker = self.WorkunitTracker()
info_level_handler = StreamingWorkunitHandler(
scheduler,
callbacks=[tracker.add],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.INFO,
)

with info_level_handler.session():
i = Input()
scheduler.product_request(Alpha, subjects=[i])

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

assert len(finished) == 2
r_A = next(item for item in finished if item["name"] == "rule_A")
r_C = next(item for item in finished if item["name"] == "rule_C")
assert "parent_id" not in r_A
assert r_C["parent_id"] == r_A["span_id"]

scheduler = self.mk_scheduler(
rules, include_trace_on_error=False, should_report_workunits=True
)
tracker = self.WorkunitTracker()
debug_level_handler = StreamingWorkunitHandler(
scheduler,
callbacks=[tracker.add],
report_interval_seconds=0.01,
max_workunit_verbosity=LogLevel.DEBUG,
)

with debug_level_handler.session():
i = Input()
scheduler.product_request(Alpha, subjects=[i])

assert tracker.finished
finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks))

r_A = next(item for item in finished if item["name"] == "rule_A")
r_B = next(item for item in finished if item["name"] == "rule_B")
r_C = next(item for item in finished if item["name"] == "rule_C")
assert r_B["parent_id"] == r_A["span_id"]
assert r_C["parent_id"] == r_B["span_id"]
1 change: 0 additions & 1 deletion src/python/pants/engine/internals/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,6 @@ def init_externs():
none = self.ffi.from_handle(context._handle).to_value(None)
self.lib.externs_set(
context._handle,
logger.getEffectiveLevel(),
none,
self.ffi_lib.extern_call,
self.ffi_lib.extern_generator_send,
Expand Down
12 changes: 8 additions & 4 deletions src/python/pants/engine/internals/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from pants.option.global_options import ExecutionOptions
from pants.util.contextutil import temporary_file_path
from pants.util.dirutil import check_no_overlapping_paths
from pants.util.logging import LogLevel
from pants.util.strutil import pluralize

if TYPE_CHECKING:
Expand Down Expand Up @@ -307,9 +308,10 @@ def visualize_to_dir(self):
def _metrics(self, session):
return self._from_value(self._native.lib.scheduler_metrics(self._scheduler, session))

def poll_workunits(self, session) -> PolledWorkunits:
def poll_workunits(self, session, max_log_verbosity: LogLevel) -> PolledWorkunits:
max_verbosity = max_log_verbosity.level
result: Tuple[Tuple[Workunit], Tuple[Workunit]] = self._from_value(
self._native.lib.poll_session_workunits(self._scheduler, session)
self._native.lib.poll_session_workunits(self._scheduler, session, max_verbosity)
)
return {"started": result[0], "completed": result[1]}

Expand Down Expand Up @@ -433,8 +435,10 @@ def scheduler(self):
def session(self):
return self._session

def poll_workunits(self) -> PolledWorkunits:
return cast(PolledWorkunits, self._scheduler.poll_workunits(self._session))
def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits:
return cast(
PolledWorkunits, self._scheduler.poll_workunits(self._session, max_log_verbosity)
)

def graph_len(self):
return self._scheduler.graph_len()
Expand Down
24 changes: 19 additions & 5 deletions src/python/pants/reporting/streaming_workunit_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from contextlib import contextmanager
from typing import Any, Callable, Iterable, Iterator, Optional

from pants.util.logging import LogLevel


class StreamingWorkunitHandler:
"""StreamingWorkunitHandler's job is to periodically call each registered callback function with
Expand All @@ -15,17 +17,22 @@ class StreamingWorkunitHandler:
"""

def __init__(
self, scheduler: Any, callbacks: Iterable[Callable], report_interval_seconds: float
self,
scheduler: Any,
callbacks: Iterable[Callable],
report_interval_seconds: float,
max_workunit_verbosity: LogLevel = LogLevel.INFO,
):
self.scheduler = scheduler
self.report_interval = report_interval_seconds
self.callbacks = callbacks
self._thread_runner: Optional[_InnerHandler] = None
self.max_workunit_verbosity = max_workunit_verbosity

def start(self) -> None:
if self.callbacks:
self._thread_runner = _InnerHandler(
self.scheduler, self.callbacks, self.report_interval
self.scheduler, self.callbacks, self.report_interval, self.max_workunit_verbosity
)
self._thread_runner.start()

Expand All @@ -35,7 +42,7 @@ def end(self) -> None:

# After stopping the thread, poll workunits one last time to make sure
# we report any workunits that were added after the last time the thread polled.
workunits = self.scheduler.poll_workunits()
workunits = self.scheduler.poll_workunits(self.max_workunit_verbosity)
for callback in self.callbacks:
callback(
workunits=workunits["completed"],
Expand All @@ -56,16 +63,23 @@ def session(self) -> Iterator[None]:


class _InnerHandler(threading.Thread):
def __init__(self, scheduler: Any, callbacks: Iterable[Callable], report_interval: float):
def __init__(
self,
scheduler: Any,
callbacks: Iterable[Callable],
report_interval: float,
max_workunit_verbosity: LogLevel,
):
super().__init__(daemon=True)
self.scheduler = scheduler
self.stop_request = threading.Event()
self.report_interval = report_interval
self.callbacks = callbacks
self.max_workunit_verbosity = max_workunit_verbosity

def run(self):
while not self.stop_request.isSet():
workunits = self.scheduler.poll_workunits()
workunits = self.scheduler.poll_workunits(self.max_workunit_verbosity)
for callback in self.callbacks:
callback(
workunits=workunits["completed"],
Expand Down
47 changes: 32 additions & 15 deletions src/rust/engine/engine_cffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ use futures::future::FutureExt;
use futures::future::{self as future03, TryFutureExt};
use futures01::{future, Future};
use hashing::{Digest, EMPTY_DIGEST};
use log::{error, warn, Log};
use log::{self, error, warn, Log};
use logging::logger::LOGGER;
use logging::{Destination, Logger};
use logging::{Destination, Logger, PythonLogLevel};
use rule_graph::RuleGraph;
use std::any::Any;
use std::borrow::Borrow;
use std::convert::TryInto;
use std::ffi::CStr;
use std::fs::File;
use std::io;
Expand Down Expand Up @@ -127,7 +128,6 @@ impl RawNodes {
#[no_mangle]
pub extern "C" fn externs_set(
context: *const ExternContext,
log_level: u8,
none: Handle,
call: CallExtern,
generator_send: GeneratorSendExtern,
Expand Down Expand Up @@ -157,7 +157,6 @@ pub extern "C" fn externs_set(
) {
externs::set_externs(Externs {
context,
log_level,
none,
call,
generator_send,
Expand Down Expand Up @@ -534,6 +533,10 @@ fn workunit_to_py_value(workunit: &Workunit) -> Option<Value> {
externs::store_utf8("span_id"),
externs::store_utf8(&workunit.span_id),
),
(
externs::store_utf8("level"),
externs::store_utf8(&workunit.metadata.level.to_string()),
),
];
if let Some(parent_id) = &workunit.parent_id {
dict_entries.push((
Expand Down Expand Up @@ -601,20 +604,34 @@ fn workunits_to_py_tuple_value<'a>(workunits: impl Iterator<Item = &'a Workunit>
pub extern "C" fn poll_session_workunits(
scheduler_ptr: *mut Scheduler,
session_ptr: *mut Session,
max_log_verbosity_level: u64,
) -> Handle {
let py_level: Result<PythonLogLevel, _> = max_log_verbosity_level.try_into();
let max_log_verbosity: log::Level = match py_level {
Ok(level) => level.into(),
Err(e) => {
warn!(
"Error setting streaming workunit log level: {}. Defaulting to 'Info'.",
e
);
log::Level::Info
}
};

with_scheduler(scheduler_ptr, |_scheduler| {
with_session(session_ptr, |session| {
let value = session
.workunit_store()
.with_latest_workunits(|started, completed| {
let mut started_iter = started.iter();
let started = workunits_to_py_tuple_value(&mut started_iter);

let mut completed_iter = completed.iter();
let completed = workunits_to_py_tuple_value(&mut completed_iter);

externs::store_tuple(&[started, completed])
});
let value =
session
.workunit_store()
.with_latest_workunits(max_log_verbosity, |started, completed| {
let mut started_iter = started.iter();
let started = workunits_to_py_tuple_value(&mut started_iter);

let mut completed_iter = completed.iter();
let completed = workunits_to_py_tuple_value(&mut completed_iter);

externs::store_tuple(&[started, completed])
});
value.into()
})
})
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/logging/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ use num_enum::TryFromPrimitive;
// This is a hard-coding of constants in the standard logging python package.
#[derive(Debug, Eq, PartialEq, TryFromPrimitive, Clone, Copy)]
#[repr(u64)]
enum PythonLogLevel {
pub enum PythonLogLevel {
NotSet = 0,
// Trace doesn't exist in a Python world, so set it to "a bit lower than Debug".
Trace = 5,
Expand Down
4 changes: 1 addition & 3 deletions src/rust/engine/process_execution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ impl CommandRunner for BoundedCommandRunner {
.unwrap_or_else(|| "<Unnamed process>".to_string());
let outer_metadata = WorkunitMetadata {
desc: Some(desc.clone()),
level: Level::Info,
display: false,
level: Level::Debug,
blocked: true,
};
let bounded_fut = {
Expand All @@ -456,7 +455,6 @@ impl CommandRunner for BoundedCommandRunner {
let metadata = WorkunitMetadata {
desc: Some(desc),
level: Level::Info,
display: false,
blocked: false,
};
with_workunit(context.workunit_store.clone(), name, metadata, async move {
Expand Down
2 changes: 1 addition & 1 deletion src/rust/engine/process_execution/src/remote_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2235,7 +2235,7 @@ async fn extract_output_files_from_response_no_prefix() {
}

fn workunits_with_constant_span_id(workunit_store: &mut WorkunitStore) -> HashSet<Workunit> {
workunit_store.with_latest_workunits(|_, completed_workunits| {
workunit_store.with_latest_workunits(log::Level::Trace, |_, completed_workunits| {
completed_workunits
.iter()
.map(|workunit| Workunit {
Expand Down
Loading

0 comments on commit f9853d9

Please sign in to comment.