Skip to content

Commit

Permalink
chore(metrics): Separate out iterating over single and multi-value ta…
Browse files Browse the repository at this point in the history
…gs (vectordotdev#15182)

* chore(metrics): Separate out iterating over single and multi-value tags

The `MetricTags` implementation has a single implementation of iteration over
its values that previously only had single-valued results, but now has
multi-valued results. These new semantics are unsuitable for most current uses
of metric tags, so separate these out into iterating over tags with a single
value and interating over all values of all tags.

* Make the _all variants iterate over simulated nullable tag values
  • Loading branch information
bruceg authored Nov 14, 2022
1 parent 540d490 commit 5b47140
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 79 deletions.
7 changes: 5 additions & 2 deletions lib/vector-core/src/event/metric/series.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,11 @@ impl fmt::Display for MetricSeries {
write_word(fmt, &self.name.name)?;
write!(fmt, "{{")?;
if let Some(tags) = &self.tags {
write_list(fmt, ",", tags.iter(), |fmt, (tag, value)| {
write_word(fmt, tag).and_then(|()| write!(fmt, "={:?}", value))
write_list(fmt, ",", tags.iter_all(), |fmt, (tag, value)| {
write_word(fmt, tag).and_then(|()| match value {
Some(value) => write!(fmt, "={:?}", value),
None => Ok(()),
})
})?;
}
write!(fmt, "}}")
Expand Down
81 changes: 24 additions & 57 deletions lib/vector-core/src/event/metric/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,32 @@ impl MetricTags {
(!self.is_empty()).then_some(self)
}

pub fn iter(&self) -> impl Iterator<Item = (&str, &str)> {
/// Iterate over references to all values of each tag.
pub fn iter_all(&self) -> impl Iterator<Item = (&str, Option<&str>)> {
self.0
.iter()
.flat_map(|(name, tags)| tags.iter().map(|tag| (name.as_ref(), tag)))
.flat_map(|(name, tags)| tags.iter().map(|tag| (name.as_ref(), Some(tag))))
}

/// Iterate over references to a single value of each tag.
pub fn iter_single(&self) -> impl Iterator<Item = (&str, &str)> {
self.0
.iter()
.filter_map(|(name, tags)| tags.as_single().map(|tag| (name.as_ref(), tag)))
}

/// Iterate over all values of each tag.
pub fn into_iter_all(self) -> impl Iterator<Item = (String, Option<String>)> {
self.0
.into_iter()
.flat_map(|(name, tags)| tags.into_iter().map(move |tag| (name.clone(), Some(tag))))
}

/// Iterate over a single value of each tag.
pub fn into_iter_single(self) -> impl Iterator<Item = (String, String)> {
self.0
.into_iter()
.filter_map(|(name, tags)| tags.into_single().map(|tag| (name, tag)))
}

pub fn contains_key(&self, name: &str) -> bool {
Expand Down Expand Up @@ -330,18 +352,6 @@ impl MetricTags {
}
}

impl IntoIterator for MetricTags {
type Item = (String, TagValue);
type IntoIter = IntoIter;

fn into_iter(self) -> Self::IntoIter {
IntoIter {
base: self.0.into_iter(),
current: None,
}
}
}

pub struct IntoIter {
base: btree_map::IntoIter<String, TagValueSet>,
current: Option<(String, <TagValueSet as IntoIterator>::IntoIter)>,
Expand Down Expand Up @@ -373,49 +383,6 @@ impl Iterator for IntoIter {
}
}

impl<'a> IntoIterator for &'a MetricTags {
type Item = (&'a str, &'a str);
type IntoIter = Iter<'a>;

fn into_iter(self) -> Self::IntoIter {
Iter {
base: self.0.iter(),
current: None,
}
}
}

pub struct Iter<'a> {
base: btree_map::Iter<'a, String, TagValueSet>,
current: Option<(&'a str, <&'a TagValueSet as IntoIterator>::IntoIter)>,
}

impl<'a> Iterator for Iter<'a> {
type Item = (&'a str, &'a str);

fn next(&mut self) -> Option<Self::Item> {
loop {
match &mut self.current {
Some((key, tag_set)) => {
if let Some(value) = tag_set.next() {
break Some((key, value));
}
self.current = None;
}
None => {
self.current = self
.base
.next()
.map(|(key, value)| (key.as_str(), value.iter()));
if self.current.is_none() {
break None;
}
}
}
}
}
}

impl From<BTreeMap<String, TagValue>> for MetricTags {
fn from(tags: BTreeMap<String, TagValue>) -> Self {
tags.into_iter().collect()
Expand Down
6 changes: 3 additions & 3 deletions lib/vector-core/src/event/vrl_target.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ impl vrl_lib::Target for VrlTarget {
["namespace"] => metric.series.name.namespace.take().map(Into::into),
["timestamp"] => metric.data.time.timestamp.take().map(Into::into),
["tags"] => metric.series.tags.take().map(|map| {
map.into_iter()
map.into_iter_single()
.map(|(k, v)| (k, v.into()))
.collect::<::value::Value>()
}),
Expand Down Expand Up @@ -478,7 +478,7 @@ fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value {
if let Some(tags) = metric.tags().cloned() {
map.insert(
"tags".to_owned(),
tags.into_iter()
tags.into_iter_single()
.map(|(tag, value)| (tag, value.into()))
.collect::<BTreeMap<_, _>>()
.into(),
Expand Down Expand Up @@ -524,7 +524,7 @@ fn precompute_metric_value(metric: &Metric, info: &ProgramInfo) -> Value {
.tags()
.cloned()
.unwrap()
.into_iter()
.into_iter_single()
.map(|(tag, value)| (tag, value.into()))
.collect::<BTreeMap<_, _>>()
.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/api/schema/events/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ impl Metric {
/// Metric tags
async fn tags(&self) -> Option<Vec<MetricTag>> {
self.event.tags().map(|tags| {
tags.iter()
tags.iter_single()
.map(|(key, value)| MetricTag {
key: key.to_owned(),
value: value.to_owned(),
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/aws_cloudwatch_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl RetryLogic for CloudWatchMetricsRetryLogic {

fn tags_to_dimensions(tags: &MetricTags) -> Vec<Dimension> {
// according to the API, up to 10 dimensions per metric can be provided
tags.iter()
tags.iter_single()
.take(10)
.map(|(k, v)| Dimension::builder().name(k).value(v).build())
.collect()
Expand Down
7 changes: 5 additions & 2 deletions src/sinks/datadog/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,8 +393,11 @@ fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) ->

fn encode_tags(tags: &MetricTags) -> Vec<String> {
let mut pairs: Vec<_> = tags
.iter()
.map(|(name, value)| format!("{}:{}", name, value))
.iter_all()
.map(|(name, value)| match value {
Some(value) => format!("{}:{}", name, value),
None => name.into(),
})
.collect();
pairs.sort();
pairs
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/gcp/stackdriver_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl HttpSink for HttpEventSink {
let metric_labels = series
.tags
.unwrap_or_default()
.into_iter()
.into_iter_single()
.collect::<std::collections::HashMap<_, _>>();

let series = gcp::GcpSeries {
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,13 +246,13 @@ pub(in crate::sinks) fn influx_line_protocol(
fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
let original_len = output.len();
// `tags` is already sorted
for (key, value) in tags {
for (key, value) in tags.iter_single() {
if key.is_empty() || value.is_empty() {
continue;
}
encode_string(&key, output);
encode_string(key, output);
output.put_u8(b'=');
encode_string(&value, output);
encode_string(value, output);
output.put_u8(b',');
}

Expand Down
4 changes: 2 additions & 2 deletions src/sinks/prometheus/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl StringCollector {
(None, Some(tag)) => write!(result, "{{{}}}", Self::format_tag(tag.0, &tag.1)),
(Some(tags), ref tag) => {
let mut parts = tags
.iter()
.iter_single()
.map(|(key, value)| Self::format_tag(key, value))
.collect::<Vec<_>>();

Expand Down Expand Up @@ -342,7 +342,7 @@ impl TimeSeries {
// Extract the labels into a vec and sort to produce a
// consistent key for the buffer.
let mut labels = labels
.into_iter()
.into_iter_single()
.map(|(name, value)| proto::Label { name, value })
.collect::<Labels>();
labels.sort();
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ mod integration_tests {
}
_ => panic!("Unhandled metric value, fix the test"),
}
for (tag, value) in metric.tags().unwrap() {
for (tag, value) in metric.tags().unwrap().iter_single() {
assert_eq!(output[tag], Value::String(value.to_string()));
}
let timestamp =
Expand Down
4 changes: 2 additions & 2 deletions src/sinks/s3_common/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ impl Service<S3Request> for S3Service {

let tagging = options.tags.map(|tags| {
let mut tagging = url::form_urlencoded::Serializer::new(String::new());
for (p, v) in tags {
tagging.append_pair(&p, &v);
for (p, v) in tags.iter_single() {
tagging.append_pair(p, v);
}
tagging.finish()
});
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/splunk_hec/metrics/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl HecMetricsEncoder {
let fields = metric
.tags()
.into_iter()
.flatten()
.flat_map(|tags| tags.iter_single())
// skip the metric tags used for templating
.filter(|(k, _)| !metadata.templated_field_keys.iter().any(|f| f == k))
.map(|(k, v)| (k, HecFieldValue::Str(v)))
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ impl SinkConfig for StatsdSinkConfig {

fn encode_tags(tags: &MetricTags) -> String {
let parts: Vec<_> = tags
.iter()
.iter_single()
.map(|(name, value)| {
if value == "true" {
name.to_string()
Expand Down
4 changes: 2 additions & 2 deletions src/transforms/tag_cardinality_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ impl TagCardinalityLimit {
LimitExceededAction::DropEvent => {
// This needs to check all the tags, to ensure that the ordering of tag names
// doesn't change the behavior of the check.
for (key, value) in &*tags_map {
for (key, value) in tags_map.iter_single() {
if self.tag_limit_exceeded(key, value) {
emit!(TagCardinalityLimitRejectingEvent {
tag_key: key,
Expand All @@ -261,7 +261,7 @@ impl TagCardinalityLimit {
return None;
}
}
for (key, value) in &*tags_map {
for (key, value) in tags_map.iter_single() {
self.record_tag_value(key, value);
}
}
Expand Down

0 comments on commit 5b47140

Please sign in to comment.