Skip to content

Commit

Permalink
enhancement(route transform): add _unmatched route (vectordotdev#11875)
Browse files Browse the repository at this point in the history
* route: add else route

Signed-off-by: Jérémie Drouet <[email protected]>

* route: update documentation

Signed-off-by: Jérémie Drouet <[email protected]>

* route: fix config test

Signed-off-by: Jérémie Drouet <[email protected]>

* route: rename _else to _unmatched

Signed-off-by: Jérémie Drouet <[email protected]>

* route: rename variable

Signed-off-by: Jérémie Drouet <[email protected]>

* route: remove event

Signed-off-by: Jérémie Drouet <[email protected]>

* route: add function to validate routes

Signed-off-by: Jérémie Drouet <[email protected]>

* route: update changelog

Signed-off-by: Jérémie Drouet <[email protected]>

* route: update changelog

Signed-off-by: Jérémie Drouet <[email protected]>

* Update website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md

Co-authored-by: Jesse Szwedko <[email protected]>

Co-authored-by: Jesse Szwedko <[email protected]>
  • Loading branch information
jdrouet and jszwedko authored Mar 22, 2022
1 parent 0575a3a commit 816fa48
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 43 deletions.
5 changes: 5 additions & 0 deletions lib/vector-core/src/transform/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ pub trait TransformConfig: core::fmt::Debug + Send + Sync + dyn_clone::DynClone
/// of events flowing through the transform.
fn outputs(&self, merged_definition: &schema::Definition) -> Vec<Output>;

/// Verifies that the provided outputs and the inner plumbing of the transform are valid.
fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
Ok(())
}

fn transform_type(&self) -> &'static str;

/// Return true if the transform is able to be run across multiple tasks simultaneously with no
Expand Down
6 changes: 5 additions & 1 deletion src/config/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ pub fn check_outputs(config: &ConfigBuilder) -> Result<(), Vec<String>> {
}

for (key, transform) in config.transforms.iter() {
let outputs = transform.inner.outputs(&schema::Definition::empty());
let definition = schema::Definition::empty();
if let Err(errs) = transform.inner.validate(&definition) {
errors.extend(errs.into_iter().map(|msg| format!("Transform {key} {msg}")));
}
let outputs = transform.inner.outputs(&definition);
if outputs
.iter()
.map(|output| output.port.as_deref().unwrap_or(""))
Expand Down
4 changes: 0 additions & 4 deletions src/internal_events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,6 @@ mod remap;
mod remove_fields;
#[cfg(feature = "transforms-rename_fields")]
mod rename_fields;
#[cfg(feature = "transforms-route")]
mod route;
mod sample;
#[cfg(feature = "sinks-sematext")]
mod sematext_metrics;
Expand Down Expand Up @@ -289,8 +287,6 @@ pub(crate) use self::remap::*;
pub(crate) use self::remove_fields::*;
#[cfg(feature = "transforms-rename_fields")]
pub(crate) use self::rename_fields::*;
#[cfg(feature = "transforms-route")]
pub(crate) use self::route::*;
#[cfg(feature = "transforms-sample")]
pub(crate) use self::sample::*;
#[cfg(feature = "sinks-sematext")]
Expand Down
13 changes: 0 additions & 13 deletions src/internal_events/route.rs

This file was deleted.

78 changes: 68 additions & 10 deletions src/transforms/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ use crate::{
TransformDescription,
},
event::Event,
internal_events::RouteEventDiscarded,
schema,
transforms::Transform,
};

//------------------------------------------------------------------------------

pub(crate) const UNMATCHED_ROUTE: &str = "_unmatched";

#[derive(Clone)]
pub struct Route {
conditions: Vec<(String, Condition)>,
Expand All @@ -38,15 +39,17 @@ impl SyncTransform for Route {
event: Event,
output: &mut vector_core::transform::TransformOutputsBuf,
) {
let mut check_failed: usize = 0;
for (output_name, condition) in &self.conditions {
if condition.check(&event) {
output.push_named(output_name, event.clone());
} else {
emit!(RouteEventDiscarded {
output: output_name.as_ref()
})
check_failed += 1;
}
}
if check_failed == self.conditions.len() {
output.push_named(UNMATCHED_ROUTE, event);
}
}
}

Expand Down Expand Up @@ -89,11 +92,24 @@ impl TransformConfig for RouteConfig {
Input::all()
}

fn validate(&self, _: &schema::Definition) -> Result<(), Vec<String>> {
if self.route.contains_key(UNMATCHED_ROUTE) {
Err(vec![format!(
"cannot have a named output with reserved name: `{UNMATCHED_ROUTE}`"
)])
} else {
Ok(())
}
}

fn outputs(&self, _: &schema::Definition) -> Vec<Output> {
self.route
let mut result: Vec<Output> = self
.route
.keys()
.map(|output_name| Output::from((output_name, DataType::all())))
.collect()
.collect();
result.push(Output::from((UNMATCHED_ROUTE, DataType::all())));
result
}

fn transform_type(&self) -> &'static str {
Expand Down Expand Up @@ -193,7 +209,7 @@ mod test {

#[test]
fn route_pass_all_route_conditions() {
let output_names = vec!["first", "second", "third"];
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(
serde_json::json!({"message": "hello world", "second": "second", "third": "third"}),
)
Expand Down Expand Up @@ -224,14 +240,18 @@ mod test {
transform.transform(event.clone(), &mut outputs);
for output_name in output_names {
let mut events: Vec<_> = outputs.drain_named(output_name).collect();
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
if output_name == UNMATCHED_ROUTE {
assert!(events.is_empty());
} else {
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
}
}
}

#[test]
fn route_pass_one_route_condition() {
let output_names = vec!["first", "second", "third"];
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "hello world"})).unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
Expand Down Expand Up @@ -267,6 +287,44 @@ mod test {
}
}

#[test]
fn route_pass_no_route_condition() {
let output_names = vec!["first", "second", "third", UNMATCHED_ROUTE];
let event = Event::try_from(serde_json::json!({"message": "NOPE"})).unwrap();
let config = toml::from_str::<RouteConfig>(
r#"
route.first.type = "vrl"
route.first.source = '.message == "hello world"'
route.second.type = "vrl"
route.second.source = '.second == "second"'
route.third.type = "vrl"
route.third.source = '.third == "third"'
"#,
)
.unwrap();

let mut transform = Route::new(&config, &Default::default()).unwrap();
let mut outputs = TransformOutputsBuf::new_with_capacity(
output_names
.iter()
.map(|output_name| Output::from((output_name.to_owned(), DataType::all())))
.collect(),
1,
);

transform.transform(event.clone(), &mut outputs);
for output_name in output_names {
let mut events: Vec<_> = outputs.drain_named(output_name).collect();
if output_name == UNMATCHED_ROUTE {
assert_eq!(events.len(), 1);
assert_eq!(events.pop().unwrap(), event);
}
assert_eq!(events.len(), 0);
}
}

#[tokio::test]
async fn route_metrics_with_output_tag() {
init_test();
Expand Down
6 changes: 1 addition & 5 deletions tests/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -808,14 +808,10 @@ async fn route() {
type = "check_fields"
"host.eq" = "gerry"
[transforms.splitting_gerrys.route.no_gerrys]
type = "check_fields"
"host.neq" = "gerry"
[sinks.out]
type = "socket"
mode = "tcp"
inputs = ["splitting_gerrys.only_gerrys", "splitting_gerrys.no_gerrys"]
inputs = ["splitting_gerrys.only_gerrys", "splitting_gerrys._unmatched"]
encoding = "text"
address = "127.0.0.1:9999"
"#,
Expand Down
11 changes: 10 additions & 1 deletion website/content/en/highlights/2022-03-22-0-21-0-upgrade-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ Vector's 0.21.0 release includes **breaking changes**:
6. [The `vector top` human_metrics flag `-h` is now `-H`](#vector-top-human-metrics)
7. [Remainder operator (%) in VRL is fallible](#remainder-fallible)
8. [AWS SDK Migration](#aws-sdk-migration)

9. [Route transform metric `event_discarded_total` removed](#transform-route-metric)

And **deprecations**:

Expand Down Expand Up @@ -252,6 +252,15 @@ For more details on configuring auth, you can visit these links:
- https://docs.aws.amazon.com/sdk-for-rust/latest/dg/credentials.html
- https://docs.aws.amazon.com/sdk-for-rust/latest/dg/environment-variables.html

#### Route transform metric `event_discarded_total` removed {#transform-route-metric}

Until now, when using the `route` transform, if an event didn't match any configured route, this event would be
discarded and lost for the following transforms and sinks.

A new `_unmatched` route has now been introduced and the events are no longer discarded, making the `event_discarded_total` metric irrelevant so it has been dropped.

You can still get the total number of events that match no routes via `component_events_sent_total` with a tag of `output=_unmatched`.

### Deprecations

#### `receivedEventsTotal`, `sentEventsTotal`, `sentEventsThroughput`, `receivedEventsThroughput` subscriptions have been deprecated {#deprecate-aggregate-subscriptions}
Expand Down
11 changes: 2 additions & 9 deletions website/cue/reference/components/transforms/route.cue
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ components: transforms: route: {
description: """
A table of route identifiers to logical conditions representing the filter of the route. Each route
can then be referenced as an input by other components with the name `<transform_name>.<route_id>`.
Note, `_default` is a reserved output name and cannot be used as a route name.
If an event doesn't match any route, it will be sent to the `<transform_name>._unmatched` output.
Note, `_default` and `_unmatched` are reserved output names and cannot be used as route names.
"""
required: true
type: object: {
Expand Down Expand Up @@ -117,12 +118,4 @@ components: transforms: route: {
description: "Each route can be referenced as an input by other components with the name `<transform_name>.<route_id>`."
},
]

telemetry: metrics: {
events_discarded_total: components.sources.internal_metrics.output.metrics.events_discarded_total & {
tags: {
output: components.sources.internal_metrics.output.metrics._output
}
}
}
}

0 comments on commit 816fa48

Please sign in to comment.