Skip to content

Commit

Permalink
Polish the metric processor
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Sep 12, 2020
1 parent 60a3a6a commit 5641654
Show file tree
Hide file tree
Showing 12 changed files with 938 additions and 581 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ All notable changes to this project will be documented in this file.
the new `cases` field).
- The `workflow` processor can now reference resource configured `branch`
processors.
- The `metric` processor now has a field `name` that replaces the now deprecated
field `path`. When used the processor now applies to all messages of a batch
and the name of the metric is now absolute, without being prefixed by a path
generated based on its position within the config.

### Changed

Expand Down
1 change: 1 addition & 0 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ PROCESSOR_MERGE_JSON_RETAIN_PARTS = false
PROCESSOR_METADATA_KEY = example
PROCESSOR_METADATA_OPERATOR = set
PROCESSOR_METADATA_VALUE = ${!hostname()}
PROCESSOR_METRIC_NAME
PROCESSOR_METRIC_PATH
PROCESSOR_METRIC_TYPE = counter
PROCESSOR_METRIC_VALUE
Expand Down
1 change: 1 addition & 0 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,7 @@ pipeline:
operator: ${PROCESSOR_METADATA_OPERATOR:set}
value: ${PROCESSOR_METADATA_VALUE:${!hostname()}}
metric:
name: ${PROCESSOR_METRIC_NAME}
path: ${PROCESSOR_METRIC_PATH}
type: ${PROCESSOR_METRIC_TYPE:counter}
value: ${PROCESSOR_METRIC_VALUE}
Expand Down
143 changes: 62 additions & 81 deletions config/examples/track_benthos_downloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,100 +4,81 @@ pipeline:
- bloblang: 'root = {}'
- workflow:
meta_path: results
branches:
dockerhub:
request_map: 'root = ""'
processors:
- try:
# Grab docker dl count
- http:
url: https://hub.docker.com/v2/repositories/jeffail/benthos/
verb: GET
retries: 0
- bloblang: |
type = "gauge"
source = "docker"
dist = "docker"
download_count = pull_count
- resource: set_metrics

github:
request_map: 'root = ""'
processors:
- try:
# Grab github latest release dl count
- http:
url: https://api.github.com/repos/Jeffail/benthos/releases/latest
verb: GET
retries: 0
- bloblang: |
root = assets.map_each(match {
"type":"gauge",
"source":"github",
"dist": name.re_replace("^benthos-?((lambda_)|_)[0-9\\.]+_([^\\.]+).*", "$2$3"),
"download_count": download_count
} {
dist != "checksums" => this
_ => deleted()
})
- unarchive:
format: json_array
- resource: set_metrics
- bloblang: 'root = if batch_index() != 0 { deleted() }'

homebrew:
request_map: 'root = ""'
processors:
- try:
- http:
url: https://formulae.brew.sh/api/formula/benthos.json
verb: GET
retries: 0
- bloblang: |
type = "gauge"
source = "homebrew"
dist = "brew"
download_count = analytics.install.30d.benthos
- resource: set_metrics
order: [ [ dockerhub, github, homebrew ] ]

resources:
processors:
set_metrics:
try:
- switch:
- condition:
bloblang: type == "gauge"
processors:
- resource: metric.gauge
- processors:
- resource: metric.counter
dockerhub:
branch:
request_map: 'root = ""'
processors:
- try:
# Grab docker dl count
- http:
url: https://hub.docker.com/v2/repositories/jeffail/benthos/
verb: GET
retries: 0
- bloblang: |
root.source = "docker"
root.dist = "docker"
root.download_count = this.pull_count
- resource: metric.gauge

github:
branch:
request_map: 'root = ""'
processors:
- try:
# Grab github latest release dl count
- http:
url: https://api.github.com/repos/Jeffail/benthos/releases/latest
verb: GET
retries: 0
- bloblang: |
root = this.assets.map_each(match {
"source":"github",
"dist": this.name.re_replace("^benthos-?((lambda_)|_)[0-9\\.]+_([^\\.]+).*", "$2$3"),
"download_count": this.download_count
} {
this.dist != "checksums" => this
_ => deleted()
})
- unarchive:
format: json_array
- resource: metric.gauge
- bloblang: 'root = if batch_index() != 0 { deleted() }'

homebrew:
branch:
request_map: 'root = ""'
processors:
- try:
- http:
url: https://formulae.brew.sh/api/formula/benthos.json
verb: GET
retries: 0
- bloblang: |
root.source = "homebrew"
root.dist = "brew"
root.download_count = this.analytics.install.30d.benthos
- resource: metric.gauge

metric.gauge:
metric:
labels:
dist: ${!json("dist")}
source: ${!json("source")}
path: BenthosDownloadGauge
type: gauge
value: ${!json("download_count")}

metric.counter:
metric:
name: BenthosDownloadGauge
labels:
version: ${!json("version")}
dist: ${!json("dist")}
source: ${!json("source")}
path: BenthosDownload
type: counter
dist: ${! json("dist") }
source: ${! json("source") }
value: ${! json("download_count") }

metrics:
cloudwatch:
namespace: BenthosAnalyticsStaging
flush_period: 500ms
region: eu-west-1
path_mapping: |
let name = this.re_replace("^resource\\.processor\\.metric\\.(gauge|counter)\\.(.*)$", "$2")
# Only emit our custom metric, and no internal Benthos metrics.
root = if [
"BenthosDownloadGauge",
"BenthosDownload"
].contains($name) { $name } else { deleted() }
"BenthosDownloadGauge"
].contains(this) { this } else { deleted() }
3 changes: 2 additions & 1 deletion config/processors/metric.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ pipeline:
- type: metric
metric:
labels: {}
path: ""
name: ""
parts: []
type: counter
value: ""
threads: 1
Expand Down
22 changes: 22 additions & 0 deletions lib/metrics/combine.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,28 @@ func Combine(t1, t2 Type) Type {
}
}

func unwrapMetric(t Type) Type {
for {
u, ok := t.(interface {
Unwrap() Type
})
if ok {
t = u.Unwrap()
} else {
break
}
}
return t
}

// Unwrap to the underlying metrics type.
func (c *combinedWrapper) Unwrap() Type {
return &combinedWrapper{
t1: unwrapMetric(c.t1),
t2: unwrapMetric(c.t2),
}
}

//------------------------------------------------------------------------------

type combinedCounter struct {
Expand Down
6 changes: 6 additions & 0 deletions lib/metrics/namespaced.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ func Namespaced(t Type, ns string) Type {
}
}

// Unwrap to the underlying metrics type.
// TODO: V4 make this standard for Type
func (d namespacedWrapper) Unwrap() Type {
return d.t
}

//------------------------------------------------------------------------------

func (d namespacedWrapper) GetCounter(path string) StatCounter {
Expand Down
Loading

0 comments on commit 5641654

Please sign in to comment.