Skip to content

Commit

Permalink
Update opentelemetry version and add gauge cache. (temporalio#2161)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ardagan authored Nov 12, 2021
1 parent 59c5715 commit 5d6bb62
Show file tree
Hide file tree
Showing 10 changed files with 272 additions and 69 deletions.
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,6 @@ ALL_SCRIPTS := $(shell find . -name "*.sh")

PINNED_DEPENDENCIES := \
github.com/DataDog/[email protected] \
go.opentelemetry.io/[email protected] \
go.opentelemetry.io/otel/exporters/metric/[email protected] \
github.com/apache/[email protected] \
github.com/go-sql-driver/[email protected]

Expand Down
7 changes: 5 additions & 2 deletions common/metrics/opentelemetry_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,20 @@ type (
metricDefs map[int]metricDefinition
serviceIdx ServiceIdx
scopeWrapper func(impl internalScope) internalScope
gaugeCache OtelGaugeCache
}
)

// NewOpentelemeteryClientByReporter creates and returns a new instance of Client implementation
// serviceIdx indicates the service type in (InputhostIndex, ... StorageIndex)
func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx, reporter *OpentelemetryReporter, logger log.Logger) (Client, error) {
func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx, reporter *OpentelemetryReporter, logger log.Logger, gaugeCache OtelGaugeCache) (Client, error) {
tagsFilterConfig := NewTagFilteringScopeConfig(clientConfig.ExcludeTags)

scopeWrapper := func(impl internalScope) internalScope {
return NewTagFilteringScope(tagsFilterConfig, impl)
}

rootScope := newOpentelemetryScope(serviceIdx, reporter, nil, clientConfig.Tags, getMetricDefs(serviceIdx), false)
rootScope := newOpentelemetryScope(serviceIdx, reporter, nil, clientConfig.Tags, getMetricDefs(serviceIdx), false, gaugeCache)

totalScopes := len(ScopeDefs[Common]) + len(ScopeDefs[serviceIdx])
metricsClient := &opentelemetryClient{
Expand All @@ -59,6 +61,7 @@ func newOpentelemeteryClient(clientConfig *ClientConfig, serviceIdx ServiceIdx,
metricDefs: getMetricDefs(serviceIdx),
serviceIdx: serviceIdx,
scopeWrapper: scopeWrapper,
gaugeCache: gaugeCache,
}

for idx, def := range ScopeDefs[Common] {
Expand Down
8 changes: 4 additions & 4 deletions common/metrics/opentelemetry_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@
package metrics

import (
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
)

func tagMapToLabelArray(tags map[string]string) []label.KeyValue {
result := make([]label.KeyValue, len(tags))
func tagMapToLabelArray(tags map[string]string) []attribute.KeyValue {
result := make([]attribute.KeyValue, len(tags))
idx := 0
for k, v := range tags {
result[idx] = label.String(k, v)
result[idx] = attribute.String(k, v)
idx++
}
return result
Expand Down
32 changes: 23 additions & 9 deletions common/metrics/opentelemetry_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ import (
"net/http"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/metric"
export "go.opentelemetry.io/otel/sdk/export/metric"
"go.opentelemetry.io/otel/sdk/metric/aggregator/histogram"
controller "go.opentelemetry.io/otel/sdk/metric/controller/basic"
processor "go.opentelemetry.io/otel/sdk/metric/processor/basic"
selector "go.opentelemetry.io/otel/sdk/metric/selector/simple"

"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
Expand All @@ -46,6 +50,7 @@ type (
config *PrometheusConfig
server *http.Server
clientConfig *ClientConfig
gaugeCache OtelGaugeCache
}

OpentelemetryListener struct {
Expand All @@ -61,12 +66,18 @@ func newOpentelemeteryReporter(
if len(histogramBoundaries) == 0 {
histogramBoundaries = defaultHistogramBoundaries
}
exporter, err := prometheus.InstallNewPipeline(
prometheus.Config{
DefaultSummaryQuantiles: defaultQuantiles,
DefaultHistogramBoundaries: histogramBoundaries,
},

c := controller.New(
processor.NewFactory(
selector.NewWithHistogramDistribution(
histogram.WithExplicitBoundaries(histogramBoundaries),
),
export.CumulativeExportKindSelector(),
processor.WithMemory(true),
),
)
exporter, err := prometheus.New(
prometheus.Config{DefaultHistogramBoundaries: histogramBoundaries}, c)

if err != nil {
logger.Error("Failed to initialize prometheus exporter.", tag.Error(err))
Expand All @@ -75,7 +86,7 @@ func newOpentelemeteryReporter(

metricServer := initPrometheusListener(prometheusConfig, logger, exporter)

meter := otel.Meter("temporal")
meter := c.Meter("temporal")
reporter := &OpentelemetryReporter{
exporter: exporter,
meter: meter,
Expand Down Expand Up @@ -120,7 +131,10 @@ func (r *OpentelemetryReporter) GetMeterMust() metric.MeterMust {
}

func (r *OpentelemetryReporter) NewClient(logger log.Logger, serviceIdx ServiceIdx) (Client, error) {
return newOpentelemeteryClient(r.clientConfig, serviceIdx, r, logger)
if r.gaugeCache == nil {
r.gaugeCache = NewOtelGaugeCache(r)
}
return newOpentelemeteryClient(r.clientConfig, serviceIdx, r, logger, r.gaugeCache)
}

func (r *OpentelemetryReporter) Stop(logger log.Logger) {
Expand Down
42 changes: 21 additions & 21 deletions common/metrics/opentelemetry_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,18 +28,20 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
)

type (
opentelemetryScope struct {
serviceIdx ServiceIdx
reporter *OpentelemetryReporter
labels []label.KeyValue
labels []attribute.KeyValue
tags map[string]string
rootScope *opentelemetryScope
defs map[int]metricDefinition
isNamespaceTagged bool

gaugeCache OtelGaugeCache
}
)

Expand All @@ -50,6 +52,7 @@ func newOpentelemetryScope(
tags map[string]string,
defs map[int]metricDefinition,
isNamespace bool,
gaugeCache OtelGaugeCache,
) *opentelemetryScope {
result := &opentelemetryScope{
serviceIdx: serviceIdx,
Expand All @@ -58,6 +61,7 @@ func newOpentelemetryScope(
rootScope: rootScope,
defs: defs,
isNamespaceTagged: isNamespace,
gaugeCache: gaugeCache,
}
result.labels = tagMapToLabelArray(tags)
return result
Expand All @@ -81,32 +85,28 @@ func (m *opentelemetryScope) AddCounter(id int, delta int64) {

func (m *opentelemetryScope) UpdateGauge(id int, value float64) {
def := m.defs[id]
ctx := context.Background()
m.reporter.GetMeterMust().NewFloat64ValueRecorder(def.metricName.String()).Record(ctx, value, m.labels...)

m.gaugeCache.Set(def.metricName.String(), m.tags, value)
if !def.metricRollupName.Empty() && (m.rootScope != nil) {
m.rootScope.reporter.GetMeterMust().NewFloat64ValueRecorder(def.metricRollupName.String()).Record(
ctx, value, m.rootScope.labels...,
)
m.gaugeCache.Set(def.metricRollupName.String(), m.rootScope.tags, value)
}
}

func (m *opentelemetryScope) StartTimer(id int) Stopwatch {
def := m.defs[id]

timer := newOpenTelemetryStopwatchMetric(
m.reporter.GetMeterMust().NewFloat64ValueRecorder(def.metricName.String()),
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()),
m.labels)
switch {
case !def.metricRollupName.Empty():
timerRollup := newOpenTelemetryStopwatchMetric(
m.rootScope.reporter.GetMeterMust().NewFloat64ValueRecorder(def.metricName.String()),
m.rootScope.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()),
m.rootScope.labels)
return newOpenTelemetryStopwatch([]openTelemetryStopwatchMetric{timer, timerRollup})
case m.isNamespaceTagged:
allScope := m.taggedString(map[string]string{namespace: namespaceAllValue})
timerAll := newOpenTelemetryStopwatchMetric(
allScope.reporter.GetMeterMust().NewFloat64ValueRecorder(def.metricName.String()),
allScope.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()),
allScope.labels)
return newOpenTelemetryStopwatch([]openTelemetryStopwatchMetric{timer, timerAll})
default:
Expand All @@ -117,21 +117,21 @@ func (m *opentelemetryScope) StartTimer(id int) Stopwatch {
func (m *opentelemetryScope) RecordTimer(id int, d time.Duration) {
def := m.defs[id]
ctx := context.Background()
m.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricName.String()).Record(ctx, d.Nanoseconds(), m.labels...)
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()).Record(ctx, d.Nanoseconds(), m.labels...)

if !def.metricRollupName.Empty() && (m.rootScope != nil) {
m.rootScope.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricRollupName.String()).Record(
m.rootScope.reporter.GetMeterMust().NewInt64Histogram(def.metricRollupName.String()).Record(
ctx, d.Nanoseconds(), m.rootScope.labels...,
)
}

switch {
case !def.metricRollupName.Empty() && (m.rootScope != nil):
m.rootScope.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricRollupName.String()).Record(
m.rootScope.reporter.GetMeterMust().NewInt64Histogram(def.metricRollupName.String()).Record(
ctx, d.Nanoseconds(), m.rootScope.labels...,
)
case m.isNamespaceTagged:
m.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricName.String()).Record(
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()).Record(
ctx,
d.Nanoseconds(),
m.taggedString(map[string]string{namespace: namespaceAllValue}).labels...,
Expand All @@ -144,21 +144,21 @@ func (m *opentelemetryScope) RecordDistribution(id int, d int) {
def := m.defs[id]

ctx := context.Background()
m.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricName.String()).Record(ctx, value, m.labels...)
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()).Record(ctx, value, m.labels...)

if !def.metricRollupName.Empty() && (m.rootScope != nil) {
m.rootScope.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricRollupName.String()).Record(
m.rootScope.reporter.GetMeterMust().NewInt64Histogram(def.metricRollupName.String()).Record(
ctx, value, m.rootScope.labels...,
)
}

switch {
case !def.metricRollupName.Empty() && (m.rootScope != nil):
m.rootScope.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricRollupName.String()).Record(
m.rootScope.reporter.GetMeterMust().NewInt64Histogram(def.metricRollupName.String()).Record(
ctx, value, m.rootScope.labels...,
)
case m.isNamespaceTagged:
m.reporter.GetMeterMust().NewInt64ValueRecorder(def.metricName.String()).Record(
m.reporter.GetMeterMust().NewInt64Histogram(def.metricName.String()).Record(
ctx,
value,
m.taggedString(map[string]string{namespace: namespaceAllValue}).labels...,
Expand All @@ -179,7 +179,7 @@ func (m *opentelemetryScope) taggedString(tags map[string]string) *opentelemetry
}
tagMap[k] = v
}
return newOpentelemetryScope(m.serviceIdx, m.reporter, m.rootScope, tagMap, m.defs, namespaceTagged)
return newOpentelemetryScope(m.serviceIdx, m.reporter, m.rootScope, tagMap, m.defs, namespaceTagged, m.gaugeCache)
}

func (m *opentelemetryScope) Tagged(tags ...Tag) Scope {
Expand All @@ -196,7 +196,7 @@ func (m *opentelemetryScope) namespaceTagged(key string, value string) bool {
}

func (m *opentelemetryScope) userScope() UserScope {
return newOpentelemetryUserScope(m.reporter, m.tags)
return newOpentelemetryUserScope(m.reporter, m.tags, m.gaugeCache)
}

func (m *opentelemetryScope) AddCounterInternal(name string, delta int64) {
Expand Down
12 changes: 6 additions & 6 deletions common/metrics/opentelemetry_stopwatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"go.temporal.io/server/common/clock"
Expand All @@ -49,14 +49,14 @@ type (
}

openTelemetryStopwatchMetricImpl struct {
timer metric.Float64ValueRecorder
labels []label.KeyValue
timer metric.Int64Histogram
labels []attribute.KeyValue
}
)

func newOpenTelemetryStopwatchMetric(
timer metric.Float64ValueRecorder,
labels []label.KeyValue,
timer metric.Int64Histogram,
labels []attribute.KeyValue,
) *openTelemetryStopwatchMetricImpl {
return &openTelemetryStopwatchMetricImpl{
timer: timer,
Expand Down Expand Up @@ -89,5 +89,5 @@ func (o *opentelemetryStopwatch) Subtract(toSubstract time.Duration) {
}

func (om *openTelemetryStopwatchMetricImpl) Record(ctx context.Context, d time.Duration) {
om.timer.Record(ctx, float64(d.Nanoseconds()), om.labels...)
om.timer.Record(ctx, d.Nanoseconds(), om.labels...)
}
23 changes: 13 additions & 10 deletions common/metrics/opentelemetry_user_scope.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,26 @@ import (
"context"
"time"

"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/attribute"
)

type opentelemetryUserScope struct {
reporter *OpentelemetryReporter
labels []label.KeyValue
labels []attribute.KeyValue
tags map[string]string

gaugeCache OtelGaugeCache
}

func newOpentelemetryUserScope(
reporter *OpentelemetryReporter,
tags map[string]string,
gaugeCache OtelGaugeCache,
) *opentelemetryUserScope {
result := &opentelemetryUserScope{
reporter: reporter,
tags: tags,
reporter: reporter,
tags: tags,
gaugeCache: gaugeCache,
}
result.labels = tagMapToLabelArray(tags)
return result
Expand All @@ -60,25 +64,24 @@ func (o opentelemetryUserScope) AddCounter(counter string, delta int64) {

func (o opentelemetryUserScope) StartTimer(timer string) Stopwatch {
metric := newOpenTelemetryStopwatchMetric(
o.reporter.GetMeterMust().NewFloat64ValueRecorder(timer),
o.reporter.GetMeterMust().NewInt64Histogram(timer),
o.labels)
return newOpenTelemetryStopwatch([]openTelemetryStopwatchMetric{metric})
}

func (o opentelemetryUserScope) RecordTimer(timer string, d time.Duration) {
ctx := context.Background()
o.reporter.GetMeterMust().NewInt64ValueRecorder(timer).Record(ctx, d.Nanoseconds(), o.labels...)
o.reporter.GetMeterMust().NewInt64Histogram(timer).Record(ctx, d.Nanoseconds(), o.labels...)
}

func (o opentelemetryUserScope) RecordDistribution(id string, d int) {
value := int64(d)
ctx := context.Background()
o.reporter.GetMeterMust().NewInt64ValueRecorder(id).Record(ctx, value, o.labels...)
o.reporter.GetMeterMust().NewInt64Histogram(id).Record(ctx, value, o.labels...)
}

func (o opentelemetryUserScope) UpdateGauge(gauge string, value float64) {
ctx := context.Background()
o.reporter.GetMeterMust().NewFloat64ValueRecorder(gauge).Record(ctx, value, o.labels...)
o.gaugeCache.Set(gauge, o.tags, value)
}

// Tagged provides new scope with added and/or overriden tags values.
Expand All @@ -91,5 +94,5 @@ func (o opentelemetryUserScope) Tagged(tags map[string]string) UserScope {
for key, value := range tags {
tagMap[key] = value
}
return newOpentelemetryUserScope(o.reporter, tagMap)
return newOpentelemetryUserScope(o.reporter, tagMap, o.gaugeCache)
}
Loading

0 comments on commit 5d6bb62

Please sign in to comment.