Skip to content

Commit

Permalink
Add support for prometheus metrics config (cadence-workflow#1770)
Browse files Browse the repository at this point in the history
* Cleanup metrics incompatible with prometheus
* Add prometheus metrics config parsing and initialization
* Add a test config file for prometheus metrics
  • Loading branch information
shreyassrivatsan authored May 2, 2019
1 parent 00f07d6 commit 3fcd5f4
Show file tree
Hide file tree
Showing 9 changed files with 312 additions and 120 deletions.
13 changes: 13 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (s *server) startService() common.Daemon {
dc := dynamicconfig.NewCollection(params.DynamicConfig, params.Logger)

svcCfg := s.cfg.Services[s.name]
params.MetricScope = svcCfg.Metrics.NewScope()
params.MetricScope = svcCfg.Metrics.NewScope(params.Logger)
params.RPCFactory = svcCfg.RPC.NewFactory(params.Name, params.Logger)
params.PProfInitializer = svcCfg.PProf.NewInitializer(params.Logger)
enableGlobalDomain := dc.GetBoolProperty(dynamicconfig.EnableGlobalDomain, s.cfg.ClustersInfo.EnableGlobalDomain)
Expand Down
68 changes: 32 additions & 36 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,16 @@ const (

// Common tags for all services
const (
HostnameTagName = "hostname"
OperationTagName = "operation"
// ShardTagName is temporary until we can get all metric data removed for the service
ShardTagName = "shard"
CadenceRoleTagName = "cadence-role"
StatsTypeTagName = "stats-type"
CacheTypeTagName = "cache-type"
HostnameTagName = "hostname"
OperationTagName = "operation"
CadenceRoleTagName = "cadence_role"
StatsTypeTagName = "stats_type"
CacheTypeTagName = "cache_type"
)

// This package should hold all the metrics and tags for cadence
const (
UnknownDirectoryTagValue = "Unknown"
AllShardsTagValue = "ALL"
NoneShardsTagValue = "NONE"

HistoryRoleTagValue = "history"
MatchingRoleTagValue = "matching"
Expand Down Expand Up @@ -776,9 +772,9 @@ const (
var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
// common scope Names
Common: {
PersistenceCreateShardScope: {operation: "CreateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetShardScope: {operation: "GetShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateShardScope: {operation: "UpdateShard", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateShardScope: {operation: "CreateShard"},
PersistenceGetShardScope: {operation: "GetShard"},
PersistenceUpdateShardScope: {operation: "UpdateShard"},
PersistenceCreateWorkflowExecutionScope: {operation: "CreateWorkflowExecution"},
PersistenceGetWorkflowExecutionScope: {operation: "GetWorkflowExecution"},
PersistenceUpdateWorkflowExecutionScope: {operation: "UpdateWorkflowExecution"},
Expand All @@ -794,24 +790,24 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceGetTimerIndexTasksScope: {operation: "GetTimerIndexTasks"},
PersistenceCompleteTimerTaskScope: {operation: "CompleteTimerTask"},
PersistenceRangeCompleteTimerTaskScope: {operation: "RangeCompleteTimerTask"},
PersistenceCreateTaskScope: {operation: "CreateTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetTasksScope: {operation: "GetTasks", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteTaskScope: {operation: "CompleteTask", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceListTaskListScope: {operation: "ListTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteTaskListScope: {operation: "DeleteTaskList", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceAppendHistoryEventsScope: {operation: "AppendHistoryEvents", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetWorkflowExecutionHistoryScope: {operation: "GetWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteWorkflowExecutionHistoryScope: {operation: "DeleteWorkflowExecutionHistory", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateDomainScope: {operation: "CreateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetDomainScope: {operation: "GetDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceUpdateDomainScope: {operation: "UpdateDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainScope: {operation: "DeleteDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteDomainByNameScope: {operation: "DeleteDomainByName", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceListDomainScope: {operation: "ListDomain", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetMetadataScope: {operation: "GetMetadata", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCreateTaskScope: {operation: "CreateTask"},
PersistenceGetTasksScope: {operation: "GetTasks"},
PersistenceCompleteTaskScope: {operation: "CompleteTask"},
PersistenceCompleteTasksLessThanScope: {operation: "CompleteTasksLessThan"},
PersistenceLeaseTaskListScope: {operation: "LeaseTaskList"},
PersistenceUpdateTaskListScope: {operation: "UpdateTaskList"},
PersistenceListTaskListScope: {operation: "ListTaskList"},
PersistenceDeleteTaskListScope: {operation: "DeleteTaskList"},
PersistenceAppendHistoryEventsScope: {operation: "AppendHistoryEvents"},
PersistenceGetWorkflowExecutionHistoryScope: {operation: "GetWorkflowExecutionHistory"},
PersistenceDeleteWorkflowExecutionHistoryScope: {operation: "DeleteWorkflowExecutionHistory"},
PersistenceCreateDomainScope: {operation: "CreateDomain"},
PersistenceGetDomainScope: {operation: "GetDomain"},
PersistenceUpdateDomainScope: {operation: "UpdateDomain"},
PersistenceDeleteDomainScope: {operation: "DeleteDomain"},
PersistenceDeleteDomainByNameScope: {operation: "DeleteDomainByName"},
PersistenceListDomainScope: {operation: "ListDomain"},
PersistenceGetMetadataScope: {operation: "GetMetadata"},
PersistenceRecordWorkflowExecutionStartedScope: {operation: "RecordWorkflowExecutionStarted"},
PersistenceRecordWorkflowExecutionClosedScope: {operation: "RecordWorkflowExecutionClosed"},
PersistenceListOpenWorkflowExecutionsScope: {operation: "ListOpenWorkflowExecutions"},
Expand All @@ -825,12 +821,12 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
PersistenceVisibilityDeleteWorkflowExecutionScope: {operation: "VisibilityDeleteWorkflowExecution"},
PersistenceListWorkflowExecutionsScope: {operation: "ListWorkflowExecutions"},
PersistenceScanWorkflowExecutionsScope: {operation: "ScanWorkflowExecutions"},
PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceReadHistoryBranchScope: {operation: "ReadHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceForkHistoryBranchScope: {operation: "ForkHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceDeleteHistoryBranchScope: {operation: "DeleteHistoryBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceCompleteForkBranchScope: {operation: "CompleteForkBranch", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceGetHistoryTreeScope: {operation: "GetHistoryTree", tags: map[string]string{ShardTagName: NoneShardsTagValue}},
PersistenceAppendHistoryNodesScope: {operation: "AppendHistoryNodes"},
PersistenceReadHistoryBranchScope: {operation: "ReadHistoryBranch"},
PersistenceForkHistoryBranchScope: {operation: "ForkHistoryBranch"},
PersistenceDeleteHistoryBranchScope: {operation: "DeleteHistoryBranch"},
PersistenceCompleteForkBranchScope: {operation: "CompleteForkBranch"},
PersistenceGetHistoryTreeScope: {operation: "GetHistoryTree"},

BlobstoreClientUploadScope: {operation: "BlobstoreClientUpload", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}},
BlobstoreClientDownloadScope: {operation: "BlobstoreClientDownload", tags: map[string]string{CadenceRoleTagName: BlobstoreRoleTagValue}},
Expand Down
18 changes: 18 additions & 0 deletions common/metrics/defs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,31 +36,49 @@ func TestScopeDefsMapped(t *testing.T) {
key, ok := ScopeDefs[Common][i]
require.True(t, ok)
require.NotEmpty(t, key)
for tag := range key.tags {
assert.True(t, IsMetric(tag), "metric tags should conform to regex")
}
}
for i := FrontendStartWorkflowExecutionScope; i < NumFrontendScopes; i++ {
key, ok := ScopeDefs[Frontend][i]
require.True(t, ok)
require.NotEmpty(t, key)
for tag := range key.tags {
assert.True(t, IsMetric(tag), "metric tags should conform to regex")
}
}
for i := HistoryStartWorkflowExecutionScope; i < NumHistoryScopes; i++ {
key, ok := ScopeDefs[History][i]
require.True(t, ok)
require.NotEmpty(t, key)
for tag := range key.tags {
assert.True(t, IsMetric(tag), "metric tags should conform to regex")
}
}
for i := MatchingPollForDecisionTaskScope; i < NumMatchingScopes; i++ {
key, ok := ScopeDefs[Matching][i]
require.True(t, ok)
require.NotEmpty(t, key)
for tag := range key.tags {
assert.True(t, IsMetric(tag), "metric tags should conform to regex")
}
}
for i := ReplicatorScope; i < NumWorkerScopes; i++ {
key, ok := ScopeDefs[Worker][i]
require.True(t, ok)
require.NotEmpty(t, key)
for tag := range key.tags {
assert.True(t, IsMetric(tag), "metric tags should conform to regex")
}
}
for i := NumWorkerScopes; i < NumBlobstoreScopes; i++ {
key, ok := ScopeDefs[Blobstore][i]
require.True(t, ok)
require.NotEmpty(t, key)
for tag := range key.tags {
assert.True(t, IsMetric(tag), "metric tags should conform to regex")
}
}
}

Expand Down
3 changes: 3 additions & 0 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/uber/cadence/common/blobstore/filestore"

"github.com/uber-go/tally/m3"
"github.com/uber-go/tally/prometheus"
"github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/service/dynamicconfig"
Expand Down Expand Up @@ -252,6 +253,8 @@ type (
M3 *m3.Configuration `yaml:"m3"`
// Statsd is the configuration for statsd reporter
Statsd *Statsd `yaml:"statsd"`
// Prometheus is the configuration for prometheus reporter
Prometheus *prometheus.Configuration `yaml:"prometheus"`
// Tags is the set of key-value pairs to be reported
// as part of every metric
Tags map[string]string `yaml:"tags"`
Expand Down
108 changes: 99 additions & 9 deletions common/service/config/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
package config

import (
"net/http"
"strings"
"time"

"github.com/cactus/go-statsd-client/statsd"
prom "github.com/m3db/prometheus_client_golang/prometheus"
"github.com/uber-go/tally"
"github.com/uber-go/tally/prometheus"
tallystatsdreporter "github.com/uber-go/tally/statsd"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
statsdreporter "github.com/uber/cadence/common/metrics/tally/statsd"
"log"
"time"
)

// NewScope builds a new tally scope
Expand All @@ -37,22 +43,25 @@ import (
// only one of them will be used for
// reporting. Currently, m3 is preferred
// over statsd
func (c *Metrics) NewScope() tally.Scope {
func (c *Metrics) NewScope(logger log.Logger) tally.Scope {
if c.M3 != nil {
return c.newM3Scope()
return c.newM3Scope(logger)
}
if c.Statsd != nil {
return c.newStatsdScope()
return c.newStatsdScope(logger)
}
if c.Prometheus != nil {
return c.newPrometheusScope(logger)
}
return tally.NoopScope
}

// newM3Scope returns a new m3 scope with
// a default reporting interval of a second
func (c *Metrics) newM3Scope() tally.Scope {
func (c *Metrics) newM3Scope(logger log.Logger) tally.Scope {
reporter, err := c.M3.NewReporter()
if err != nil {
log.Fatalf("error creating m3 reporter, err=%v", err)
logger.Fatal("error creating m3 reporter", tag.Error(err))
}
scopeOpts := tally.ScopeOptions{
Tags: c.Tags,
Expand All @@ -64,14 +73,14 @@ func (c *Metrics) newM3Scope() tally.Scope {

// newM3Scope returns a new statsd scope with
// a default reporting interval of a second
func (c *Metrics) newStatsdScope() tally.Scope {
func (c *Metrics) newStatsdScope(logger log.Logger) tally.Scope {
config := c.Statsd
if len(config.HostPort) == 0 {
return tally.NoopScope
}
statter, err := statsd.NewBufferedClient(config.HostPort, config.Prefix, config.FlushInterval, config.FlushBytes)
if err != nil {
log.Fatalf("error creating statsd client, err=%v", err)
logger.Fatal("error creating statsd client", tag.Error(err))
}
//NOTE: according to ( https://github.com/uber-go/tally )Tally's statsd implementation doesn't support tagging.
// Therefore, we implement Tally interface to have a statsd reporter that can support tagging
Expand All @@ -83,3 +92,84 @@ func (c *Metrics) newStatsdScope() tally.Scope {
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
}

// newPrometheusScope returns a new prometheus scope with
// a default reporting interval of a second
func (c *Metrics) newPrometheusScope(logger log.Logger) tally.Scope {
reporter, err := NewPrometheusReporter(
c.Prometheus,
prometheus.ConfigurationOptions{
OnError: func(err error) {
logger.Warn("error in prometheus reporter", tag.Error(err))
},
},
)
if err != nil {
logger.Fatal("error creating prometheus reporter", tag.Error(err))
}
scopeOpts := tally.ScopeOptions{
Tags: c.Tags,
CachedReporter: reporter,
Separator: prometheus.DefaultSeparator,
}
scope, _ := tally.NewRootScope(scopeOpts, time.Second)
return scope
}

// NewPrometheusReporter - creates a prometheus reporter
// N.B - copy of the NewReporter method in tally - https://github.com/uber-go/tally/blob/master/prometheus/config.go#L77
// as the above method does not allow setting a separate registry per root
// which is necessary when we are running multiple roles within a same process
func NewPrometheusReporter(
config *prometheus.Configuration,
configOpts prometheus.ConfigurationOptions,
) (prometheus.Reporter, error) {
var opts prometheus.Options
opts.OnRegisterError = configOpts.OnError

switch config.TimerType {
case "summary":
opts.DefaultTimerType = prometheus.SummaryTimerType
case "histogram":
opts.DefaultTimerType = prometheus.HistogramTimerType
}

if len(config.DefaultHistogramBuckets) > 0 {
var values []float64
for _, value := range config.DefaultHistogramBuckets {
values = append(values, value.Upper)
}
opts.DefaultHistogramBuckets = values
}

if len(config.DefaultSummaryObjectives) > 0 {
values := make(map[float64]float64)
for _, value := range config.DefaultSummaryObjectives {
values[value.Percentile] = value.AllowedError
}
opts.DefaultSummaryObjectives = values
}

opts.Registerer = prom.NewRegistry()

reporter := prometheus.NewReporter(opts)

path := "/metrics"
if handlerPath := strings.TrimSpace(config.HandlerPath); handlerPath != "" {
path = handlerPath
}

if addr := strings.TrimSpace(config.ListenAddress); addr == "" {
http.Handle(path, reporter.HTTPHandler())
} else {
mux := http.NewServeMux()
mux.Handle(path, reporter.HTTPHandler())
go func() {
if err := http.ListenAndServe(addr, mux); err != nil {
configOpts.OnError(err)
}
}()
}

return reporter, nil
}
Loading

0 comments on commit 3fcd5f4

Please sign in to comment.