Skip to content

Commit

Permalink
Support refreshing long running workflows based on user config (caden…
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored Mar 29, 2022
1 parent 7328473 commit ba4a5d9
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 81 deletions.
2 changes: 2 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ const (
DefaultESAnalyzerMinNumWorkflowsForAvg = 100
// DefaultESAnalyzerLimitToTypes controls if we want to limit ESAnalyzer only to some workflow types
DefaultESAnalyzerLimitToTypes = ""
// DefaultESAnalyzerEnableAvgDurationBasedChecks controls if we want to enable avg duration based refreshes
DefaultESAnalyzerEnableAvgDurationBasedChecks = false
// DefaultESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains
DefaultESAnalyzerLimitToDomains = ""
// DefaultESAnalyzerWorkflowDurationWarnThreshold defines warning threshold for a workflow duration
Expand Down
8 changes: 7 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2171,14 +2171,19 @@ const (
// Value type: Int
// Default value: "" => means no limitation
ESAnalyzerLimitToTypes
// ESAnalyzerEnableAvgDurationBasedChecks controls if we want to enable avg duration based task refreshes
// KeyName: worker.ESAnalyzerEnableAvgDurationBasedChecks
// Value type: Bool
// Default value: false
ESAnalyzerEnableAvgDurationBasedChecks
// ESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains
// KeyName: worker.ESAnalyzerLimitToDomains
// Value type: Int
// Default value: "" => means no limitation
ESAnalyzerLimitToDomains
// ESAnalyzerWorkflowDurationWarnThresholds defines the warning execution thresholds for workflow types
// KeyName: worker.ESAnalyzerWorkflowDurationWarnThresholds
// Value type: string (json of a dictionary {"<domainName>/<workflowType>":<value>,...})
// Value type: string [{"DomainName":"<domain>", "WorkflowType":"<workflowType>", "Threshold":"<duration>", "Refresh":<shouldRefresh>, "MaxNumWorkflows":<maxNumber>}]
// Default value: ""
ESAnalyzerWorkflowDurationWarnThresholds

Expand Down Expand Up @@ -2581,6 +2586,7 @@ var Keys = map[Key]string{
ESAnalyzerBufferWaitTime: "worker.ESAnalyzerBufferWaitTime",
ESAnalyzerMinNumWorkflowsForAvg: "worker.ESAnalyzerMinNumWorkflowsForAvg",
ESAnalyzerLimitToTypes: "worker.ESAnalyzerLimitToTypes",
ESAnalyzerEnableAvgDurationBasedChecks: "worker.ESAnalyzerEnableAvgDurationBasedChecks",
ESAnalyzerLimitToDomains: "worker.ESAnalyzerLimitToDomains",
ESAnalyzerWorkflowDurationWarnThresholds: "worker.ESAnalyzerWorkflowDurationWarnThresholds",

Expand Down
1 change: 1 addition & 0 deletions service/worker/esanalyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ type (
ESAnalyzerMaxNumDomains dynamicconfig.IntPropertyFn
ESAnalyzerMaxNumWorkflowTypes dynamicconfig.IntPropertyFn
ESAnalyzerLimitToTypes dynamicconfig.StringPropertyFn
ESAnalyzerEnableAvgDurationBasedChecks dynamicconfig.BoolPropertyFn
ESAnalyzerLimitToDomains dynamicconfig.StringPropertyFn
ESAnalyzerNumWorkflowsToRefresh dynamicconfig.IntPropertyFnWithWorkflowTypeFilter
ESAnalyzerBufferWaitTime dynamicconfig.DurationPropertyFnWithWorkflowTypeFilter
Expand Down
101 changes: 91 additions & 10 deletions service/worker/esanalyzer/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (s *esanalyzerWorkflowTestSuite) SetupTest() {
s.workflow.findLongRunningWorkflows,
activity.RegisterOptions{Name: findLongRunningWorkflowsActivity},
)
s.workflowEnv.RegisterActivityWithOptions(
s.workflow.getLongRunCheckEntries,
activity.RegisterOptions{Name: getLongRunCheckEntriesActivity},
)

s.activityEnv.RegisterActivityWithOptions(
s.workflow.getWorkflowTypes,
Expand All @@ -181,6 +185,10 @@ func (s *esanalyzerWorkflowTestSuite) SetupTest() {
s.workflow.findLongRunningWorkflows,
activity.RegisterOptions{Name: findLongRunningWorkflowsActivity},
)
s.activityEnv.RegisterActivityWithOptions(
s.workflow.getLongRunCheckEntries,
activity.RegisterOptions{Name: getLongRunCheckEntriesActivity},
)
}

func (s *esanalyzerWorkflowTestSuite) TearDownTest() {
Expand Down Expand Up @@ -210,10 +218,21 @@ func (s *esanalyzerWorkflowTestSuite) TestExecuteWorkflow() {
}
s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[0]).
Return(workflows, nil).Times(1)
s.workflowEnv.OnActivity(findLongRunningWorkflowsActivity, mock.Anything).
Return(nil).Times(1)

s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows).Return(nil).Times(1)
longRunningWorkflows := []LongRunCheckEntry{
{
DomainName: s.DomainName,
WorkflowType: s.WorkflowType,
Threshold: time.Hour,
Refresh: true,
},
}
s.workflowEnv.OnActivity(getLongRunCheckEntriesActivity, mock.Anything).
Return(longRunningWorkflows, nil).Times(1)
s.workflowEnv.OnActivity(findLongRunningWorkflowsActivity, mock.Anything, longRunningWorkflows[0]).
Return(workflows, nil).Times(1)

s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows).Return(nil).Times(2)

s.workflowEnv.ExecuteWorkflow(esanalyzerWFTypeName)
err := s.workflowEnv.GetWorkflowResult(nil)
Expand Down Expand Up @@ -254,8 +273,8 @@ func (s *esanalyzerWorkflowTestSuite) TestExecuteWorkflowMultipleWorkflowTypes()
Return(workflows1, nil).Times(1)
s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[1]).
Return(workflows2, nil).Times(1)
s.workflowEnv.OnActivity(findLongRunningWorkflowsActivity, mock.Anything).
Return(nil).Times(1)
s.workflowEnv.OnActivity(getLongRunCheckEntriesActivity, mock.Anything).
Return(nil, nil).Times(1)

s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows1).Return(nil).Times(1)
s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows2).Return(nil).Times(1)
Expand Down Expand Up @@ -366,6 +385,48 @@ func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowI
s.EqualError(err, "InternalServiceError{Message: Inconsistent worklow. Expected domainID: deadbeef-0123-4567-890a-bcdef0123460, actual: another-domain-id}")
}

func (s *esanalyzerWorkflowTestSuite) TestGetLongRunCheckEntriesSingleEntry() {
s.config.ESAnalyzerWorkflowDurationWarnThresholds = dynamicconfig.GetStringPropertyFn(
`[{"DomainName":"SomeDomain", "WorkflowType":"SomeWFType", "Threshold":"2m", "Refresh":true}]`,
)

actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.getLongRunCheckEntries)
s.NoError(err)
var longRunningWorkflows []LongRunCheckEntry
err = actFuture.Get(&longRunningWorkflows)
s.NoError(err)
s.Equal(1, len(longRunningWorkflows))
s.Equal("SomeDomain", longRunningWorkflows[0].DomainName)
s.Equal("SomeWFType", longRunningWorkflows[0].WorkflowType)
s.Equal(2*time.Minute, longRunningWorkflows[0].Threshold)
s.Equal(0, longRunningWorkflows[0].MaxNumWorkflows)
s.Equal(true, longRunningWorkflows[0].Refresh)
}

func (s *esanalyzerWorkflowTestSuite) TestGetLongRunCheckEntriesMultipleEntries() {
s.config.ESAnalyzerWorkflowDurationWarnThresholds = dynamicconfig.GetStringPropertyFn(
`[{"DomainName":"d1", "WorkflowType":"t1", "Threshold":"2m", "Refresh":true},{"DomainName":"d2", "WorkflowType":"t2", "Threshold":"3m", "Refresh":false, "MaxNumWorkflows":1000000}]`,
)

actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.getLongRunCheckEntries)
s.NoError(err)
var longRunningWorkflows []LongRunCheckEntry
err = actFuture.Get(&longRunningWorkflows)
s.NoError(err)
s.Equal(2, len(longRunningWorkflows))
s.Equal("d1", longRunningWorkflows[0].DomainName)
s.Equal("t1", longRunningWorkflows[0].WorkflowType)
s.Equal(2*time.Minute, longRunningWorkflows[0].Threshold)
s.Equal(0, longRunningWorkflows[0].MaxNumWorkflows)
s.Equal(true, longRunningWorkflows[0].Refresh)

s.Equal("d2", longRunningWorkflows[1].DomainName)
s.Equal("t2", longRunningWorkflows[1].WorkflowType)
s.Equal(3*time.Minute, longRunningWorkflows[1].Threshold)
s.Equal(1000000, longRunningWorkflows[1].MaxNumWorkflows)
s.Equal(false, longRunningWorkflows[1].Refresh)
}

func (s *esanalyzerWorkflowTestSuite) TestFindLongRunningWorkflows() {
s.mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(
&elasticsearch.RawResponse{
Expand All @@ -382,8 +443,15 @@ func (s *esanalyzerWorkflowTestSuite) TestFindLongRunningWorkflows() {
int64(1234),
).Return().Times(1)

s.config.ESAnalyzerWorkflowDurationWarnThresholds = dynamicconfig.GetStringPropertyFn(`{"test-domain/workflow1":"1m"}`)
_, err := s.activityEnv.ExecuteActivity(s.workflow.findLongRunningWorkflows)
longRunningWorkflows := []LongRunCheckEntry{
{
DomainName: s.DomainName,
WorkflowType: s.WorkflowType,
Threshold: time.Hour,
Refresh: true,
},
}
_, err := s.activityEnv.ExecuteActivity(s.workflow.findLongRunningWorkflows, longRunningWorkflows[0])
s.NoError(err)
}

Expand Down Expand Up @@ -470,7 +538,15 @@ func (s *esanalyzerWorkflowTestSuite) TestFindStuckWorkflowsMinNumWorkflowValida
s.Equal(0, len(results))
}

func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypes() {
func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypesEnabled() {
s.testGetWorkflowTypes(true)
}
func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypesDisabled() {
s.testGetWorkflowTypes(false)
}

func (s *esanalyzerWorkflowTestSuite) testGetWorkflowTypes(enabled bool) {
s.config.ESAnalyzerEnableAvgDurationBasedChecks = dynamicconfig.GetBoolPropertyFn(enabled)
esResult := struct {
Buckets []DomainInfo `json:"buckets"`
}{
Expand Down Expand Up @@ -512,11 +588,16 @@ func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypes() {
var results []WorkflowTypeInfo
err = actFuture.Get(&results)
s.NoError(err)
s.Equal(2, len(results))
s.Equal(normalizeDomainInfos(esResult.Buckets), results)
if enabled {
s.Equal(2, len(results))
s.Equal(normalizeDomainInfos(esResult.Buckets), results)
} else {
s.Equal(0, len(results))
}
}

func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypesFromConfig() {
s.config.ESAnalyzerEnableAvgDurationBasedChecks = dynamicconfig.GetBoolPropertyFn(true)
workflowTypes := []WorkflowTypeInfo{
{DomainID: s.DomainID, Name: "workflow1"},
{DomainID: s.DomainID, Name: "workflow2"},
Expand Down
Loading

0 comments on commit ba4a5d9

Please sign in to comment.