diff --git a/lib/vector-core/src/transform/config.rs b/lib/vector-core/src/transform/config.rs index a253bd681bcf6..e1e06e22611b1 100644 --- a/lib/vector-core/src/transform/config.rs +++ b/lib/vector-core/src/transform/config.rs @@ -93,6 +93,11 @@ pub trait TransformConfig: core::fmt::Debug + Send + Sync + dyn_clone::DynClone /// of events flowing through the transform. fn outputs(&self, merged_definition: &schema::Definition) -> Vec; + /// Verifies that the provided outputs and the inner plumbing of the transform are valid. + fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec> { + Ok(()) + } + fn transform_type(&self) -> &'static str; /// Return true if the transform is able to be run across multiple tasks simultaneously with no diff --git a/src/config/validation.rs b/src/config/validation.rs index e5642ce17f887..6e117505eee96 100644 --- a/src/config/validation.rs +++ b/src/config/validation.rs @@ -159,7 +159,11 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec> { } for (key, transform) in config.transforms.iter() { - let outputs = transform.inner.outputs(&schema::Definition::empty()); + let definition = schema::Definition::empty(); + if let Err(errs) = transform.inner.validate(&definition) { + errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}"))); + } + let outputs = transform.inner.outputs(&definition); if outputs .iter() .map(|output| output.port.as_deref().unwrap_or("")) diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 57de568304c84..0f01538953687 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -121,8 +121,6 @@ mod remap; mod remove_fields; #[cfg(feature = "transforms-rename_fields")] mod rename_fields; -#[cfg(feature = "transforms-route")] -mod route; mod sample; #[cfg(feature = "sinks-sematext")] mod sematext_metrics; @@ -289,8 +287,6 @@ pub(crate) use self::remap::*; pub(crate) use self::remove_fields::*; #[cfg(feature = "transforms-rename_fields")] pub(crate) use self::rename_fields::*; -#[cfg(feature = "transforms-route")] -pub(crate) use self::route::*; #[cfg(feature = "transforms-sample")] pub(crate) use self::sample::*; #[cfg(feature = "sinks-sematext")] diff --git a/src/internal_events/route.rs b/src/internal_events/route.rs deleted file mode 100644 index cd1e6bc5277b0..0000000000000 --- a/src/internal_events/route.rs +++ /dev/null @@ -1,13 +0,0 @@ -use metrics::counter; -use vector_core::internal_event::InternalEvent; - -#[derive(Debug)] -pub struct RouteEventDiscarded<'a> { - pub output: &'a str, -} - -impl<'a> InternalEvent for RouteEventDiscarded<'a> { - fn emit(self) { - counter!("events_discarded_total", 1, "output" => self.output.to_string()); - } -} diff --git a/src/transforms/route.rs b/src/transforms/route.rs index 3bdc0808ea181..ca84796a21326 100644 --- a/src/transforms/route.rs +++ b/src/transforms/route.rs @@ -9,13 +9,14 @@ use crate::{ TransformDescription, }, event::Event, - internal_events::RouteEventDiscarded, schema, transforms::Transform, }; //------------------------------------------------------------------------------ +pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched"; + #[derive(Clone)] pub struct Route { conditions: Vec<(String, Condition)>, @@ -38,15 +39,17 @@ impl SyncTransform for Route { event: Event, output: &mut vector_core::transform::TransformOutputsBuf, ) { + let mut check_failed: usize = 0; for (output_name, condition) in &self.conditions { if condition.check(&event) { output.push_named(output_name, event.clone()); } else { - emit!(RouteEventDiscarded { - output: output_name.as_ref() - }) + check_failed += 1; } } + if check_failed == self.conditions.len() { + output.push_named(UNMATCHED_ROUTE, event); + } } } @@ -89,11 +92,24 @@ impl TransformConfig for RouteConfig { Input::all() } + fn validate(&self, _: &schema::Definition) -> Result<(), Vec> { + if self.route.contains_key(UNMATCHED_ROUTE) { + Err(vec![format!( + "cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`" + )]) + } else { + Ok(()) + } + } + fn outputs(&self, _: &schema::Definition) -> Vec { - self.route + let mut result: Vec = self + .route .keys() .map(|output_name| Output::from((output_name, DataType::all()))) - .collect() + .collect(); + result.push(Output::from((UNMATCHED_ROUTE, DataType::all()))); + result } fn transform_type(&self) -> &'static str { @@ -193,7 +209,7 @@ mod test { #[test] fn route_pass_all_route_conditions() { - let output_names = vec!["first", "second", "third"]; + let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; let event = Event::try_from( serde_json::json!({"message": "hello world", "second": "second", "third": "third"}), ) @@ -224,14 +240,18 @@ mod test { transform.transform(event.clone(), &mut outputs); for output_name in output_names { let mut events: Vec<_> = outputs.drain_named(output_name).collect(); - assert_eq!(events.len(), 1); - assert_eq!(events.pop().unwrap(), event); + if output_name == UNMATCHED_ROUTE { + assert!(events.is_empty()); + } else { + assert_eq!(events.len(), 1); + assert_eq!(events.pop().unwrap(), event); + } } } #[test] fn route_pass_one_route_condition() { - let output_names = vec!["first", "second", "third"]; + let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; let event = Event::try_from(serde_json::json!({"message": "hello world"})).unwrap(); let config = toml::from_str::( r#" @@ -267,6 +287,44 @@ mod test { } } + #[test] + fn route_pass_no_route_condition() { + let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE]; + let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap(); + let config = toml::from_str::( + r#" + route.first.type = "vrl" + route.first.source = '.message == "hello world"' + + route.second.type = "vrl" + route.second.source = '.second == "second"' + + route.third.type = "vrl" + route.third.source = '.third == "third"' + "#, + ) + .unwrap(); + + let mut transform = Route::new(&config, &Default::default()).unwrap(); + let mut outputs = TransformOutputsBuf::new_with_capacity( + output_names + .iter() + .map(|output_name| Output::from((output_name.to_owned(), DataType::all()))) + .collect(), + 1, + ); + + transform.transform(event.clone(), &mut outputs); + for output_name in output_names { + let mut events: Vec<_> = outputs.drain_named(output_name).collect(); + if output_name == UNMATCHED_ROUTE { + assert_eq!(events.len(), 1); + assert_eq!(events.pop().unwrap(), event); + } + assert_eq!(events.len(), 0); + } + } + #[tokio::test] async fn route_metrics_with_output_tag() { init_test(); diff --git a/tests/config.rs b/tests/config.rs index 7d55d3cce2626..4f6c3a686d8e6 100644 --- a/tests/config.rs +++ b/tests/config.rs @@ -808,14 +808,10 @@ async fn route() { type = "check_fields" "host.eq" = "gerry" - [transforms.splitting_gerrys.route.no_gerrys] - type = "check_fields" - "host.neq" = "gerry" - [sinks.out] type = "socket" mode = "tcp" - inputs = ["splitting_gerrys.only_gerrys", "splitting_gerrys.no_gerrys"] + inputs = ["splitting_gerrys.only_gerrys", "splitting_gerrys._unmatched"] encoding = "text" address = "127.0.0.1:9999" "#, diff --git a/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md b/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md index 1aaa8aaf02b31..ebcb25f38bc6a 100644 --- a/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md +++ b/website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md @@ -21,7 +21,7 @@ Vector's 0.21.0 release includes **breaking changes**: 6. [The `vector top` human_metrics flag `-h` is now `-H`](#vector-top-human-metrics) 7. [Remainder operator (%) in VRL is fallible](#remainder-fallible) 8. [AWS SDK Migration](#aws-sdk-migration) - +9. [Route transform metric `event_discarded_total` removed](#transform-route-metric) And **deprecations**: @@ -252,6 +252,15 @@ For more details on configuring auth, you can visit these links: - https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credentials.html - https://docs.aws.amazon.com/sdk-for-rust/latest/dg/environment-variables.html +#### Route transform metric `event_discarded_total` removed {#transform-route-metric} + +Until now, when using the `route` transform, if an event didn't match any configured route, this event would be +discarded and lost for the following transforms and sinks. + +A new `_unmatched` route has now been introduced and the events are no longer discarded, making the `event_discarded_total` metric irrelevant so it has been dropped. + +You can still get the total number of events that match no routes via `component_events_sent_total` with a tag of `output=_unmatched`. + ### Deprecations #### `receivedEventsTotal`, `sentEventsTotal`, `sentEventsThroughput`, `receivedEventsThroughput` subscriptions have been deprecated {#deprecate-aggregate-subscriptions} diff --git a/website/cue/reference/components/transforms/route.cue b/website/cue/reference/components/transforms/route.cue index a34a34ebb1bff..2c145ce169179 100644 --- a/website/cue/reference/components/transforms/route.cue +++ b/website/cue/reference/components/transforms/route.cue @@ -32,7 +32,8 @@ components: transforms: route: { description: """ A table of route identifiers to logical conditions representing the filter of the route. Each route can then be referenced as an input by other components with the name `.`. - Note, `_default` is a reserved output name and cannot be used as a route name. + If an event doesn't match any route, it will be sent to the `._unmatched` output. + Note, `_default` and `_unmatched` are reserved output names and cannot be used as route names. """ required: true type: object: { @@ -117,12 +118,4 @@ components: transforms: route: { description: "Each route can be referenced as an input by other components with the name `.`." }, ] - - telemetry: metrics: { - events_discarded_total: components.sources.internal_metrics.output.metrics.events_discarded_total & { - tags: { - output: components.sources.internal_metrics.output.metrics._output - } - } - } }