Skip to content

Commit

Permalink
Insert Hostname tag into metrics and other services (cadence-workflow…
Browse files Browse the repository at this point in the history
…#5245)

* add hostinfo log

* Revert "Switch to thin, versioned ES clients (cadence-workflow#5217)"

This reverts commit 28ac3ca.

* add tasklist name to see if can distinguish partitioned tasklist

* add hostname field in params, matchinEngine, and emit that field through logger

* Add GetHostName() in resourceTest

* add hostname and oss logger

* pass hostname into resourceImpl.go

* add hostname tag to CadenceTasklistRequests

* remove redundant logger

* Remove unnecessary changes to dynamic config and clean up logging

* Revert "Revert "Switch to thin, versioned ES clients (cadence-workflow#5217)""

This reverts commit 26d9baf.

* add hostname injection into server.go and add hostname field into config.go

* Revert "add hostname injection into server.go and add hostname field into config.go"

This reverts commit 5b7cfd4.

* add hostname into matching's NewConfig method parameters

* add hostname injection into cadence history

* add hostname injection into cadence frontend

* add hostname parameter into frontend test

* add hostname parameter into additional tests

* add hostname parameter into tasklist config and additional tests

* add hostname parameter into cadence worker

* add test to test whether newly added feature won't break anything

* dummy commit

* describe HostName in comments
  • Loading branch information
timl3136 authored May 4, 2023
1 parent 8249f90 commit 0ff904e
Show file tree
Hide file tree
Showing 18 changed files with 79 additions and 13 deletions.
5 changes: 5 additions & 0 deletions common/metrics/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ const (
signalName = "signalName"
workflowVersion = "workflow_version"
shardID = "shard_id"
matchingHost = "matching_host"

allValue = "all"
unknownValue = "_unknown_"
Expand Down Expand Up @@ -211,3 +212,7 @@ func SignalNameAllTag() Tag {
func WorkflowVersionTag(value string) Tag {
return metricWithUnknown(workflowVersion, value)
}

func MatchingHostTag(value string) Tag {
return metricWithUnknown(matchingHost, value)
}
1 change: 1 addition & 0 deletions common/resource/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type (
InstanceID string
Logger log.Logger
ThrottledLogger log.Logger
HostName string

MetricScope tally.Scope
MembershipResolver membership.Resolver
Expand Down
3 changes: 3 additions & 0 deletions common/resource/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ type (
GetExecutionManager(int) (persistence.ExecutionManager, error)
GetPersistenceBean() persistenceClient.Bean

// GetHostName get host name
GetHostName() string

// loggers

GetLogger() log.Logger
Expand Down
12 changes: 12 additions & 0 deletions common/resource/resourceImpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ type (
// persistence clients
persistenceBean persistenceClient.Bean

// hostName
hostName string

// loggers
logger log.Logger
throttledLogger log.Logger
Expand All @@ -130,6 +133,8 @@ func New(
serviceConfig *service.Config,
) (impl *Impl, retError error) {

hostname := params.HostName

logger := params.Logger
throttledLogger := loggerimpl.NewThrottledLogger(logger, serviceConfig.ThrottledLoggerMaxRPS)

Expand Down Expand Up @@ -281,6 +286,9 @@ func New(
// persistence clients
persistenceBean: persistenceBean,

// hostname
hostName: hostname,

// loggers

logger: logger,
Expand Down Expand Up @@ -529,6 +537,10 @@ func (h *Impl) GetPersistenceBean() persistenceClient.Bean {
return h.persistenceBean
}

func (h *Impl) GetHostName() string {
return h.hostName
}

// loggers

// GetLogger return logger
Expand Down
7 changes: 7 additions & 0 deletions common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ type (
ExecutionMgr *mocks.ExecutionManager
PersistenceBean *persistenceClient.MockBean

HostName string

Logger log.Logger
}
)
Expand Down Expand Up @@ -380,6 +382,11 @@ func (s *Test) GetPersistenceBean() persistenceClient.Bean {
return s.PersistenceBean
}

// GetHostName for testing
func (s *Test) GetHostName() string {
return s.HostName
}

// loggers

// GetLogger for testing
Expand Down
1 change: 1 addition & 0 deletions service/frontend/clusterRedirectionHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *clusterRedirectionHandlerSuite) SetupTest() {
),
0,
false,
"hostname",
)
frontendHandler := NewWorkflowHandler(s.mockResource, s.config, nil, client.NewVersionChecker())

Expand Down
1 change: 1 addition & 0 deletions service/frontend/clusterRedirectionPolicy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (s *selectedAPIsForwardingRedirectionPolicySuite) SetupTest() {
),
0,
false,
"hostname",
)
s.policy = newSelectedOrAllAPIsForwardingPolicy(
s.currentClusterName,
Expand Down
7 changes: 6 additions & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,13 @@ type Config struct {

// Emit signal related metrics with signal name tag. Be aware of cardinality.
EmitSignalNameMetricsTag dynamicconfig.BoolPropertyFnWithDomainFilter

// HostName for machine running the service
HostName string
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVisConfigExist bool) *Config {
func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVisConfigExist bool, hostName string) *Config {
return &Config{
NumHistoryShards: numHistoryShards,
IsAdvancedVisConfigExist: isAdvancedVisConfigExist,
Expand Down Expand Up @@ -179,6 +182,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, isAdvancedVis
FailoverCoolDown: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.FrontendFailoverCoolDown),
RequiredDomainDataKeys: dc.GetMapProperty(dynamicconfig.RequiredDomainDataKeys),
},
HostName: hostName,
}
}

Expand Down Expand Up @@ -219,6 +223,7 @@ func NewService(
),
params.PersistenceConfig.NumHistoryShards,
isAdvancedVisExistInConfig,
params.HostName,
)
params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()

Expand Down
3 changes: 3 additions & 0 deletions service/frontend/workflowHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,7 @@ func (s *workflowHandlerSuite) TestRestartWorkflowExecution__Success() {
s.mockResource.GetLogger()),
numHistoryShards,
false,
"hostname",
),
)
ctx := context.Background()
Expand Down Expand Up @@ -1262,6 +1263,7 @@ func (s *workflowHandlerSuite) getWorkflowExecutionHistory(nextEventID int64, tr
s.mockResource.GetLogger()),
numHistoryShards,
false,
"hostname",
),
)
ctx := context.Background()
Expand Down Expand Up @@ -1559,6 +1561,7 @@ func (s *workflowHandlerSuite) newConfig(dynamicClient dc.Client) *Config {
),
numHistoryShards,
false,
"hostname",
)
config.EmitSignalNameMetricsTag = dc.GetBoolPropertyFnFilteredByDomain(true)
return config
Expand Down
9 changes: 7 additions & 2 deletions service/history/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,10 +323,13 @@ type Config struct {
LargeShardHistorySizeMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryEventMetricThreshold dynamicconfig.IntPropertyFn
LargeShardHistoryBlobMetricThreshold dynamicconfig.IntPropertyFn

// HostName for machine running the service
HostName string
}

// New returns new service config with default values
func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, storeType string, isAdvancedVisConfigExist bool) *Config {
func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, storeType string, isAdvancedVisConfigExist bool, hostname string) *Config {
cfg := &Config{
NumberOfShards: numberOfShards,
IsAdvancedVisConfigExist: isAdvancedVisConfigExist,
Expand Down Expand Up @@ -565,6 +568,8 @@ func New(dc *dynamicconfig.Collection, numberOfShards int, maxMessageSize int, s
LargeShardHistorySizeMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistorySizeMetricThreshold),
LargeShardHistoryEventMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryEventMetricThreshold),
LargeShardHistoryBlobMetricThreshold: dc.GetIntProperty(dynamicconfig.LargeShardHistoryBlobMetricThreshold),

HostName: hostname,
}

return cfg
Expand Down Expand Up @@ -594,7 +599,7 @@ func NewForTestByShardNumber(shardNumber int) *Config {
panicIfErr(inMem.UpdateValue(dynamicconfig.NormalDecisionScheduleToStartMaxAttempts, 3))
panicIfErr(inMem.UpdateValue(dynamicconfig.EnablePendingActivityValidation, true))
dc := dynamicconfig.NewCollection(inMem, log.NewNoop())
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false)
config := New(dc, shardNumber, 1024*1024, config.StoreTypeCassandra, false, "")
// reduce the duration of long poll to increase test speed
config.LongPollExpirationInterval = dc.GetDurationPropertyFilteredByDomain(dynamicconfig.HistoryLongPollExpirationInterval)
config.EnableConsistentQueryByDomain = dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableConsistentQueryByDomain)
Expand Down
3 changes: 2 additions & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ func NewService(
params.PersistenceConfig.NumHistoryShards,
params.RPCFactory.GetMaxMessageSize(),
params.PersistenceConfig.DefaultStoreType(),
params.PersistenceConfig.IsAdvancedVisibilityConfigExist())
params.PersistenceConfig.IsAdvancedVisibilityConfigExist(),
params.HostName)

params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()

Expand Down
9 changes: 8 additions & 1 deletion service/matching/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ type (
EnableTaskInfoLogByDomainID dynamicconfig.BoolPropertyFnWithDomainIDFilter

ActivityTaskSyncMatchWaitTime dynamicconfig.DurationPropertyFnWithDomainFilter

// hostname info
HostName string
}

forwarderConfig struct {
Expand Down Expand Up @@ -96,11 +99,13 @@ type (
MaxTaskBatchSize func() int
NumWritePartitions func() int
NumReadPartitions func() int
// hostname
HostName string
}
)

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection) *Config {
func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
return &Config{
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceMaxQPS),
PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.MatchingPersistenceGlobalMaxQPS),
Expand Down Expand Up @@ -130,6 +135,7 @@ func NewConfig(dc *dynamicconfig.Collection) *Config {
EnableDebugMode: dc.GetBoolProperty(dynamicconfig.EnableDebugMode)(),
EnableTaskInfoLogByDomainID: dc.GetBoolPropertyFilteredByDomainID(dynamicconfig.MatchingEnableTaskInfoLogByDomainID),
ActivityTaskSyncMatchWaitTime: dc.GetDurationPropertyFilteredByDomain(dynamicconfig.MatchingActivityTaskSyncMatchWaitTime),
HostName: hostName,
}
}

Expand Down Expand Up @@ -194,5 +200,6 @@ func newTaskListConfig(id *taskListID, config *Config, domainCache cache.DomainC
return common.MaxInt(1, config.ForwarderMaxChildrenPerNode(domainName, taskListName, taskType))
},
},
HostName: config.HostName,
}, nil
}
2 changes: 1 addition & 1 deletion service/matching/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestMatcherSuite(t *testing.T) {
func (t *MatcherTestSuite) SetupTest() {
t.controller = gomock.NewController(t.T())
t.client = matching.NewMockClient(t.controller)
cfg := NewConfig(dynamicconfig.NewNopCollection())
cfg := NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
t.taskList = newTestTaskListID(uuid.New(), common.ReservedTaskListPrefix+"tl0/1", persistence.TaskListTypeDecision)
tlCfg, err := newTaskListConfig(t.taskList, cfg, t.newDomainCache())
t.NoError(err)
Expand Down
14 changes: 10 additions & 4 deletions service/matching/matchingEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,10 @@ func (e *matchingEngineImpl) AddDecisionTask(
// Only emit traffic metrics if the tasklist is not sticky and is not forwarded
if int32(request.GetTaskList().GetKind()) == 0 && request.ForwardedFrom == "" {
e.metricsClient.Scope(metrics.MatchingAddTaskScope).Tagged(metrics.DomainTag(domainName),
metrics.TaskListTag(taskListName), metrics.TaskListTypeTag("decision_task")).IncCounter(metrics.CadenceTasklistRequests)
e.emitInfoOrDebugLog(domainID, "Emitting tasklist counter on decision task", tag.Dynamic("tasklistName", taskListName),
metrics.TaskListTag(taskListName), metrics.TaskListTypeTag("decision_task"),
metrics.MatchingHostTag(e.config.HostName)).IncCounter(metrics.CadenceTasklistRequests)
e.emitInfoOrDebugLog(domainID, "Emitting tasklist counter on decision task",
tag.Dynamic("tasklistName", taskListName),
tag.Dynamic("taskListBaseName", taskList.baseName))
}

Expand All @@ -330,6 +332,7 @@ func (e *matchingEngineImpl) AddDecisionTask(
ScheduleToStartTimeout: request.GetScheduleToStartTimeoutSeconds(),
CreatedTime: time.Now(),
}

return tlMgr.AddTask(hCtx.Context, addTaskParams{
execution: request.Execution,
taskInfo: taskInfo,
Expand Down Expand Up @@ -374,8 +377,10 @@ func (e *matchingEngineImpl) AddActivityTask(
// Only emit traffic metrics if the tasklist is not sticky and is not forwarded
if int32(request.GetTaskList().GetKind()) == 0 && request.ForwardedFrom == "" {
e.metricsClient.Scope(metrics.MatchingAddTaskScope).Tagged(metrics.DomainTag(domainName),
metrics.TaskListTag(taskListName), metrics.TaskListTypeTag("activity_task")).IncCounter(metrics.CadenceTasklistRequests)
e.emitInfoOrDebugLog(domainID, "Emitting tasklist counter on activity task", tag.Dynamic("tasklistName", taskListName),
metrics.TaskListTag(taskListName), metrics.TaskListTypeTag("activity_task"),
metrics.MatchingHostTag(e.config.HostName)).IncCounter(metrics.CadenceTasklistRequests)
e.emitInfoOrDebugLog(domainID, "Emitting tasklist counter on activity task",
tag.Dynamic("tasklistName", taskListName),
tag.Dynamic("taskListBaseName", taskList.baseName))
}

Expand All @@ -392,6 +397,7 @@ func (e *matchingEngineImpl) AddActivityTask(
ScheduleToStartTimeout: request.GetScheduleToStartTimeoutSeconds(),
CreatedTime: time.Now(),
}

return tlMgr.AddTask(hCtx.Context, addTaskParams{
execution: request.Execution,
taskInfo: taskInfo,
Expand Down
8 changes: 7 additions & 1 deletion service/matching/matchingEngine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1249,6 +1249,12 @@ func (s *matchingEngineSuite) assertPollTaskResponse(taskType int, param *testPa
}
}

func (s *matchingEngineSuite) TestConfigDefaultHostName() {
configEmpty := Config{}
s.NotEqualValues(s.matchingEngine.config.HostName, configEmpty.HostName)
s.EqualValues(configEmpty.HostName, "")
}

func newActivityTaskScheduledEvent(eventID int64, decisionTaskCompletedEventID int64,
scheduleAttributes *types.ScheduleActivityTaskDecisionAttributes) *types.HistoryEvent {
historyEvent := newHistoryEvent(eventID, types.EventTypeActivityTaskScheduled)
Expand Down Expand Up @@ -1588,7 +1594,7 @@ func validateTimeRange(t time.Time, expectedDuration time.Duration) bool {
}

func defaultTestConfig() *Config {
config := NewConfig(dynamicconfig.NewNopCollection())
config := NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
config.LongPollExpirationInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(100 * time.Millisecond)
config.MaxTaskDeleteBatchSize = dynamicconfig.GetIntPropertyFilteredByTaskListInfo(1)
return config
Expand Down
1 change: 1 addition & 0 deletions service/matching/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func NewService(
params.Logger,
dynamicconfig.ClusterNameFilter(params.ClusterMetadata.GetCurrentClusterName()),
),
params.HostName,
)

serviceResource, err := resource.New(
Expand Down
4 changes: 2 additions & 2 deletions service/matching/taskListManager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func TestCheckIdleTaskList(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

cfg := NewConfig(dynamicconfig.NewNopCollection())
cfg := NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

// Idle
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestAddTaskStandby(t *testing.T) {
controller := gomock.NewController(t)
defer controller.Finish()

cfg := NewConfig(dynamicconfig.NewNopCollection())
cfg := NewConfig(dynamicconfig.NewNopCollection(), "some random hostname")
cfg.IdleTasklistCheckInterval = dynamicconfig.GetDurationPropertyFnFilteredByTaskListInfo(10 * time.Millisecond)

tlm := createTestTaskListManagerWithConfig(controller, cfg)
Expand Down
2 changes: 2 additions & 0 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type (
DomainReplicationMaxRetryDuration dynamicconfig.DurationPropertyFn
EnableESAnalyzer dynamicconfig.BoolPropertyFn
EnableWatchDog dynamicconfig.BoolPropertyFn
HostName string
}
)

Expand Down Expand Up @@ -189,6 +190,7 @@ func NewConfig(params *resource.Params) *Config {
PersistenceGlobalMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceGlobalMaxQPS),
PersistenceMaxQPS: dc.GetIntProperty(dynamicconfig.WorkerPersistenceMaxQPS),
DomainReplicationMaxRetryDuration: dc.GetDurationProperty(dynamicconfig.WorkerReplicationTaskMaxRetryDuration),
HostName: params.HostName,
}
advancedVisWritingMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
Expand Down

0 comments on commit 0ff904e

Please sign in to comment.