From f9853d93e6343cd22cc9a3eaf671e00ca603dd5a Mon Sep 17 00:00:00 2001 From: gshuflin Date: Fri, 22 May 2020 18:16:28 -0700 Subject: [PATCH] Filter streaming workunits by LogLevel (#9854) ### Problem Now that we have the ability to assign a LogLevel to workunits, we want to use that log level to filter which workunits we stream to Python plugins. ### Solution Remove the `display` bool on `WorkunitMetadata` and replace it with logic that accepts a maximum log verbosity `LogLevel` from Python code, and filters streaming workunits based on this maximum verbosity. --- .../pants/engine/internals/engine_test.py | 115 +++++++++++++++++- src/python/pants/engine/internals/native.py | 1 - .../pants/engine/internals/scheduler.py | 12 +- .../reporting/streaming_workunit_handler.py | 24 +++- src/rust/engine/engine_cffi/src/lib.rs | 47 ++++--- src/rust/engine/logging/src/lib.rs | 2 +- src/rust/engine/process_execution/src/lib.rs | 4 +- .../process_execution/src/remote_tests.rs | 2 +- src/rust/engine/src/externs.rs | 1 - src/rust/engine/src/nodes.rs | 11 +- src/rust/engine/workunit_store/src/lib.rs | 29 ++--- 11 files changed, 194 insertions(+), 54 deletions(-) diff --git a/src/python/pants/engine/internals/engine_test.py b/src/python/pants/engine/internals/engine_test.py index 8a60ac82c80..67600caed76 100644 --- a/src/python/pants/engine/internals/engine_test.py +++ b/src/python/pants/engine/internals/engine_test.py @@ -18,6 +18,7 @@ fmt_rule, remove_locations_from_traceback, ) +from pants.util.logging import LogLevel class A: @@ -50,7 +51,7 @@ class Fib: val: int -@named_rule +@named_rule(desc="Fibonacci") async def fib(n: int) -> Fib: if n < 2: return Fib(n) @@ -97,7 +98,11 @@ class Omega: pass -@named_rule(canonical_name="rule_one") +class Epsilon: + pass + + +@named_rule(canonical_name="rule_one", desc="Rule number 1") async def rule_one_function(i: Input) -> Beta: """This rule should be the first one executed by the engine, and thus have no parent.""" a = Alpha() @@ -107,7 +112,7 @@ async def rule_one_function(i: Input) -> Beta: return b -@named_rule +@named_rule(desc="Rule number 2") async def rule_two(a: Alpha) -> Omega: """This rule should be invoked in the body of `rule_one` and therefore its workunit should be a child of `rule_one`'s workunit.""" @@ -129,6 +134,25 @@ def rule_four(a: Alpha) -> Gamma: return Gamma() +@named_rule(desc="Rule A") +async def rule_A(i: Input) -> Alpha: + o = Omega() + a = await Get[Alpha](Omega, o) + return a + + +@rule +async def rule_B(o: Omega) -> Alpha: + e = Epsilon() + a = await Get[Alpha](Epsilon, e) + return a + + +@named_rule(desc="Rule C") +def rule_C(e: Epsilon) -> Alpha: + return Alpha() + + class EngineTest(unittest.TestCase, SchedulerTestBase): assert_equal_with_printing = assert_equal_with_printing @@ -392,10 +416,95 @@ def test_streaming_workunits_parent_id_and_rule_metadata(self): r3 = next(item for item in finished if item["name"] == "rule_three") r4 = next(item for item in finished if item["name"] == "rule_four") + # rule_one should have no parent_id because its actual parent workunit was filted based on level assert r1.get("parent_id", None) is None + assert r2["parent_id"] == r1["span_id"] assert r3["parent_id"] == r1["span_id"] assert r4["parent_id"] == r2["span_id"] assert r3["description"] == "Rule number 3" assert r4["description"] == "Rule number 4" + assert r4["level"] == "INFO" + + def test_streaming_workunit_log_levels(self) -> None: + rules = [RootRule(Input), rule_one_function, rule_two, rule_three, rule_four] + scheduler = self.mk_scheduler( + rules, include_trace_on_error=False, should_report_workunits=True + ) + tracker = self.WorkunitTracker() + handler = StreamingWorkunitHandler( + scheduler, + callbacks=[tracker.add], + report_interval_seconds=0.01, + max_workunit_verbosity=LogLevel.TRACE, + ) + + with handler.session(): + i = Input() + scheduler.product_request(Beta, subjects=[i]) + + assert tracker.finished + finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks)) + + # With the max_workunit_verbosity set to TRACE, we should see the workunit corresponding to the Select node. + select = next( + item + for item in finished + if item["name"] not in {"rule_one", "rule_two", "rule_three", "rule_four"} + ) + assert select["name"] == "select" + assert select["level"] == "DEBUG" + + r1 = next(item for item in finished if item["name"] == "rule_one") + assert r1["parent_id"] == select["span_id"] + + def test_streaming_workunit_log_level_parent_rewrite(self) -> None: + rules = [RootRule(Input), rule_A, rule_B, rule_C] + scheduler = self.mk_scheduler( + rules, include_trace_on_error=False, should_report_workunits=True + ) + tracker = self.WorkunitTracker() + info_level_handler = StreamingWorkunitHandler( + scheduler, + callbacks=[tracker.add], + report_interval_seconds=0.01, + max_workunit_verbosity=LogLevel.INFO, + ) + + with info_level_handler.session(): + i = Input() + scheduler.product_request(Alpha, subjects=[i]) + + assert tracker.finished + finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks)) + + assert len(finished) == 2 + r_A = next(item for item in finished if item["name"] == "rule_A") + r_C = next(item for item in finished if item["name"] == "rule_C") + assert "parent_id" not in r_A + assert r_C["parent_id"] == r_A["span_id"] + + scheduler = self.mk_scheduler( + rules, include_trace_on_error=False, should_report_workunits=True + ) + tracker = self.WorkunitTracker() + debug_level_handler = StreamingWorkunitHandler( + scheduler, + callbacks=[tracker.add], + report_interval_seconds=0.01, + max_workunit_verbosity=LogLevel.DEBUG, + ) + + with debug_level_handler.session(): + i = Input() + scheduler.product_request(Alpha, subjects=[i]) + + assert tracker.finished + finished = list(itertools.chain.from_iterable(tracker.finished_workunit_chunks)) + + r_A = next(item for item in finished if item["name"] == "rule_A") + r_B = next(item for item in finished if item["name"] == "rule_B") + r_C = next(item for item in finished if item["name"] == "rule_C") + assert r_B["parent_id"] == r_A["span_id"] + assert r_C["parent_id"] == r_B["span_id"] diff --git a/src/python/pants/engine/internals/native.py b/src/python/pants/engine/internals/native.py index 6e6306857fd..9382818ef2b 100644 --- a/src/python/pants/engine/internals/native.py +++ b/src/python/pants/engine/internals/native.py @@ -827,7 +827,6 @@ def init_externs(): none = self.ffi.from_handle(context._handle).to_value(None) self.lib.externs_set( context._handle, - logger.getEffectiveLevel(), none, self.ffi_lib.extern_call, self.ffi_lib.extern_generator_send, diff --git a/src/python/pants/engine/internals/scheduler.py b/src/python/pants/engine/internals/scheduler.py index 2ac9baf1439..d1c3ba7d231 100644 --- a/src/python/pants/engine/internals/scheduler.py +++ b/src/python/pants/engine/internals/scheduler.py @@ -41,6 +41,7 @@ from pants.option.global_options import ExecutionOptions from pants.util.contextutil import temporary_file_path from pants.util.dirutil import check_no_overlapping_paths +from pants.util.logging import LogLevel from pants.util.strutil import pluralize if TYPE_CHECKING: @@ -307,9 +308,10 @@ def visualize_to_dir(self): def _metrics(self, session): return self._from_value(self._native.lib.scheduler_metrics(self._scheduler, session)) - def poll_workunits(self, session) -> PolledWorkunits: + def poll_workunits(self, session, max_log_verbosity: LogLevel) -> PolledWorkunits: + max_verbosity = max_log_verbosity.level result: Tuple[Tuple[Workunit], Tuple[Workunit]] = self._from_value( - self._native.lib.poll_session_workunits(self._scheduler, session) + self._native.lib.poll_session_workunits(self._scheduler, session, max_verbosity) ) return {"started": result[0], "completed": result[1]} @@ -433,8 +435,10 @@ def scheduler(self): def session(self): return self._session - def poll_workunits(self) -> PolledWorkunits: - return cast(PolledWorkunits, self._scheduler.poll_workunits(self._session)) + def poll_workunits(self, max_log_verbosity: LogLevel) -> PolledWorkunits: + return cast( + PolledWorkunits, self._scheduler.poll_workunits(self._session, max_log_verbosity) + ) def graph_len(self): return self._scheduler.graph_len() diff --git a/src/python/pants/reporting/streaming_workunit_handler.py b/src/python/pants/reporting/streaming_workunit_handler.py index e00ceb7e36c..e375793b3fd 100644 --- a/src/python/pants/reporting/streaming_workunit_handler.py +++ b/src/python/pants/reporting/streaming_workunit_handler.py @@ -5,6 +5,8 @@ from contextlib import contextmanager from typing import Any, Callable, Iterable, Iterator, Optional +from pants.util.logging import LogLevel + class StreamingWorkunitHandler: """StreamingWorkunitHandler's job is to periodically call each registered callback function with @@ -15,17 +17,22 @@ class StreamingWorkunitHandler: """ def __init__( - self, scheduler: Any, callbacks: Iterable[Callable], report_interval_seconds: float + self, + scheduler: Any, + callbacks: Iterable[Callable], + report_interval_seconds: float, + max_workunit_verbosity: LogLevel = LogLevel.INFO, ): self.scheduler = scheduler self.report_interval = report_interval_seconds self.callbacks = callbacks self._thread_runner: Optional[_InnerHandler] = None + self.max_workunit_verbosity = max_workunit_verbosity def start(self) -> None: if self.callbacks: self._thread_runner = _InnerHandler( - self.scheduler, self.callbacks, self.report_interval + self.scheduler, self.callbacks, self.report_interval, self.max_workunit_verbosity ) self._thread_runner.start() @@ -35,7 +42,7 @@ def end(self) -> None: # After stopping the thread, poll workunits one last time to make sure # we report any workunits that were added after the last time the thread polled. - workunits = self.scheduler.poll_workunits() + workunits = self.scheduler.poll_workunits(self.max_workunit_verbosity) for callback in self.callbacks: callback( workunits=workunits["completed"], @@ -56,16 +63,23 @@ def session(self) -> Iterator[None]: class _InnerHandler(threading.Thread): - def __init__(self, scheduler: Any, callbacks: Iterable[Callable], report_interval: float): + def __init__( + self, + scheduler: Any, + callbacks: Iterable[Callable], + report_interval: float, + max_workunit_verbosity: LogLevel, + ): super().__init__(daemon=True) self.scheduler = scheduler self.stop_request = threading.Event() self.report_interval = report_interval self.callbacks = callbacks + self.max_workunit_verbosity = max_workunit_verbosity def run(self): while not self.stop_request.isSet(): - workunits = self.scheduler.poll_workunits() + workunits = self.scheduler.poll_workunits(self.max_workunit_verbosity) for callback in self.callbacks: callback( workunits=workunits["completed"], diff --git a/src/rust/engine/engine_cffi/src/lib.rs b/src/rust/engine/engine_cffi/src/lib.rs index 6823ca52023..5c130bc4c83 100644 --- a/src/rust/engine/engine_cffi/src/lib.rs +++ b/src/rust/engine/engine_cffi/src/lib.rs @@ -48,12 +48,13 @@ use futures::future::FutureExt; use futures::future::{self as future03, TryFutureExt}; use futures01::{future, Future}; use hashing::{Digest, EMPTY_DIGEST}; -use log::{error, warn, Log}; +use log::{self, error, warn, Log}; use logging::logger::LOGGER; -use logging::{Destination, Logger}; +use logging::{Destination, Logger, PythonLogLevel}; use rule_graph::RuleGraph; use std::any::Any; use std::borrow::Borrow; +use std::convert::TryInto; use std::ffi::CStr; use std::fs::File; use std::io; @@ -127,7 +128,6 @@ impl RawNodes { #[no_mangle] pub extern "C" fn externs_set( context: *const ExternContext, - log_level: u8, none: Handle, call: CallExtern, generator_send: GeneratorSendExtern, @@ -157,7 +157,6 @@ pub extern "C" fn externs_set( ) { externs::set_externs(Externs { context, - log_level, none, call, generator_send, @@ -534,6 +533,10 @@ fn workunit_to_py_value(workunit: &Workunit) -> Option { externs::store_utf8("span_id"), externs::store_utf8(&workunit.span_id), ), + ( + externs::store_utf8("level"), + externs::store_utf8(&workunit.metadata.level.to_string()), + ), ]; if let Some(parent_id) = &workunit.parent_id { dict_entries.push(( @@ -601,20 +604,34 @@ fn workunits_to_py_tuple_value<'a>(workunits: impl Iterator pub extern "C" fn poll_session_workunits( scheduler_ptr: *mut Scheduler, session_ptr: *mut Session, + max_log_verbosity_level: u64, ) -> Handle { + let py_level: Result = max_log_verbosity_level.try_into(); + let max_log_verbosity: log::Level = match py_level { + Ok(level) => level.into(), + Err(e) => { + warn!( + "Error setting streaming workunit log level: {}. Defaulting to 'Info'.", + e + ); + log::Level::Info + } + }; + with_scheduler(scheduler_ptr, |_scheduler| { with_session(session_ptr, |session| { - let value = session - .workunit_store() - .with_latest_workunits(|started, completed| { - let mut started_iter = started.iter(); - let started = workunits_to_py_tuple_value(&mut started_iter); - - let mut completed_iter = completed.iter(); - let completed = workunits_to_py_tuple_value(&mut completed_iter); - - externs::store_tuple(&[started, completed]) - }); + let value = + session + .workunit_store() + .with_latest_workunits(max_log_verbosity, |started, completed| { + let mut started_iter = started.iter(); + let started = workunits_to_py_tuple_value(&mut started_iter); + + let mut completed_iter = completed.iter(); + let completed = workunits_to_py_tuple_value(&mut completed_iter); + + externs::store_tuple(&[started, completed]) + }); value.into() }) }) diff --git a/src/rust/engine/logging/src/lib.rs b/src/rust/engine/logging/src/lib.rs index 4db2c7649a4..64387d5f395 100644 --- a/src/rust/engine/logging/src/lib.rs +++ b/src/rust/engine/logging/src/lib.rs @@ -56,7 +56,7 @@ use num_enum::TryFromPrimitive; // This is a hard-coding of constants in the standard logging python package. #[derive(Debug, Eq, PartialEq, TryFromPrimitive, Clone, Copy)] #[repr(u64)] -enum PythonLogLevel { +pub enum PythonLogLevel { NotSet = 0, // Trace doesn't exist in a Python world, so set it to "a bit lower than Debug". Trace = 5, diff --git a/src/rust/engine/process_execution/src/lib.rs b/src/rust/engine/process_execution/src/lib.rs index 813efb37d9d..e780b86da5e 100644 --- a/src/rust/engine/process_execution/src/lib.rs +++ b/src/rust/engine/process_execution/src/lib.rs @@ -442,8 +442,7 @@ impl CommandRunner for BoundedCommandRunner { .unwrap_or_else(|| "".to_string()); let outer_metadata = WorkunitMetadata { desc: Some(desc.clone()), - level: Level::Info, - display: false, + level: Level::Debug, blocked: true, }; let bounded_fut = { @@ -456,7 +455,6 @@ impl CommandRunner for BoundedCommandRunner { let metadata = WorkunitMetadata { desc: Some(desc), level: Level::Info, - display: false, blocked: false, }; with_workunit(context.workunit_store.clone(), name, metadata, async move { diff --git a/src/rust/engine/process_execution/src/remote_tests.rs b/src/rust/engine/process_execution/src/remote_tests.rs index d6786ed7b19..cb42a059548 100644 --- a/src/rust/engine/process_execution/src/remote_tests.rs +++ b/src/rust/engine/process_execution/src/remote_tests.rs @@ -2235,7 +2235,7 @@ async fn extract_output_files_from_response_no_prefix() { } fn workunits_with_constant_span_id(workunit_store: &mut WorkunitStore) -> HashSet { - workunit_store.with_latest_workunits(|_, completed_workunits| { + workunit_store.with_latest_workunits(log::Level::Trace, |_, completed_workunits| { completed_workunits .iter() .map(|workunit| Workunit { diff --git a/src/rust/engine/src/externs.rs b/src/rust/engine/src/externs.rs index 9038f04a661..d1b89a40c52 100644 --- a/src/rust/engine/src/externs.rs +++ b/src/rust/engine/src/externs.rs @@ -344,7 +344,6 @@ pub type ExternContext = raw::c_void; pub struct Externs { pub context: *const ExternContext, - pub log_level: u8, pub none: Handle, pub call: CallExtern, pub generator_send: GeneratorSendExtern, diff --git a/src/rust/engine/src/nodes.rs b/src/rust/engine/src/nodes.rs index 3dc7f3a1559..ef7f4d2e7a9 100644 --- a/src/rust/engine/src/nodes.rs +++ b/src/rust/engine/src/nodes.rs @@ -1020,8 +1020,12 @@ impl Node for NodeKey { let (started_workunit_id, user_facing_name) = { let user_facing_name = self.user_facing_name(); - let display = context.session.should_handle_workunits() - && (user_facing_name.is_some() || self.display_info().is_some()); + let higher_priority = context.session.should_handle_workunits() && user_facing_name.is_some(); + let level = if higher_priority { + Level::Info + } else { + Level::Debug + }; let name = self.workunit_name(); let span_id = new_span_id(); @@ -1029,8 +1033,7 @@ impl Node for NodeKey { let parent_id = std::mem::replace(&mut workunit_state.parent_id, Some(span_id.clone())); let metadata = WorkunitMetadata { desc: user_facing_name.clone(), - level: Level::Info, - display, + level, blocked: false, }; diff --git a/src/rust/engine/workunit_store/src/lib.rs b/src/rust/engine/workunit_store/src/lib.rs index 5245272a607..dfc396ef66d 100644 --- a/src/rust/engine/workunit_store/src/lib.rs +++ b/src/rust/engine/workunit_store/src/lib.rs @@ -62,14 +62,12 @@ pub enum WorkunitState { pub struct WorkunitMetadata { pub desc: Option, pub level: Level, - pub display: bool, pub blocked: bool, } impl WorkunitMetadata { pub fn new() -> WorkunitMetadata { WorkunitMetadata { - display: true, level: Level::Info, desc: None, blocked: false, @@ -93,13 +91,6 @@ pub struct WorkUnitInnerStore { last_seen_completed_idx: usize, } -fn should_display_workunit(workunit_records: &HashMap, id: &str) -> bool { - match workunit_records.get(id) { - None => false, - Some(record) => record.metadata.display, - } -} - impl WorkunitStore { pub fn new() -> WorkunitStore { WorkunitStore { @@ -260,7 +251,7 @@ impl WorkunitStore { inner.completed_ids.push(span_id); } - pub fn with_latest_workunits(&mut self, f: F) -> T + pub fn with_latest_workunits(&mut self, max_verbosity: log::Level, f: F) -> T where F: FnOnce(&[Workunit], &[Workunit]) -> T, { @@ -268,11 +259,17 @@ impl WorkunitStore { let inner_store: &mut WorkUnitInnerStore = &mut *inner_guard; let workunit_records = &inner_store.workunit_records; - let compute_should_display = |workunit: Workunit| -> Workunit { + let should_emit = |workunit: &Workunit| -> bool { workunit.metadata.level <= max_verbosity }; + + let compute_adjusted_parent_id = |workunit: Workunit| -> Workunit { let mut parent_id: Option = workunit.parent_id; loop { if let Some(current_parent_id) = parent_id { - if should_display_workunit(workunit_records, ¤t_parent_id) { + let should_emit = match workunit_records.get(¤t_parent_id) { + None => false, + Some(workunit) => should_emit(&workunit), + }; + if should_emit { return Workunit { parent_id: Some(current_parent_id), ..workunit @@ -301,11 +298,11 @@ impl WorkunitStore { .iter() .flat_map(|id| workunit_records.get(id)) .flat_map(|workunit| match workunit.state { - WorkunitState::Started { .. } if workunit.metadata.display => Some(workunit.clone()), + WorkunitState::Started { .. } if should_emit(&workunit) => Some(workunit.clone()), WorkunitState::Started { .. } => None, WorkunitState::Completed { .. } => None, }) - .map(compute_should_display) + .map(compute_adjusted_parent_id) .collect(); inner_store.last_seen_started_idx = cur_len; @@ -316,11 +313,11 @@ impl WorkunitStore { .iter() .flat_map(|id| workunit_records.get(id)) .flat_map(|workunit| match workunit.state { - WorkunitState::Completed { .. } if workunit.metadata.display => Some(workunit.clone()), + WorkunitState::Completed { .. } if should_emit(&workunit) => Some(workunit.clone()), WorkunitState::Completed { .. } => None, WorkunitState::Started { .. } => None, }) - .map(compute_should_display) + .map(compute_adjusted_parent_id) .collect(); inner_store.last_seen_completed_idx = cur_len;