diff --git a/common/constants.go b/common/constants.go index 3ee181c49c9..b978031e16c 100644 --- a/common/constants.go +++ b/common/constants.go @@ -184,6 +184,27 @@ const ( TaskTypeCrossCluster TaskType = 6 ) +const ( + // DefaultESAnalyzerPause controls if we want to dynamically pause the analyzer + DefaultESAnalyzerPause = false + // DefaultESAnalyzerTimeWindow controls how many days to go back for ElasticSearch Analyzer + DefaultESAnalyzerTimeWindow = time.Hour * 24 * 30 + // DefaultESAnalyzerMaxNumDomains controls how many domains to check + DefaultESAnalyzerMaxNumDomains = 500 + // DefaultESAnalyzerMaxNumWorkflowTypes controls how many workflow types per domain to check + DefaultESAnalyzerMaxNumWorkflowTypes = 100 + // DefaultESAnalyzerNumWorkflowsToRefresh controls how many workflows per workflow type should be refreshed + DefaultESAnalyzerNumWorkflowsToRefresh = 100 + // DefaultESAnalyzerBufferWaitTime controls min time required to consider a worklow stuck + DefaultESAnalyzerBufferWaitTime = time.Minute * 30 + // DefaultESAnalyzerMinNumWorkflowsForAvg controls how many workflows to have at least to rely on workflow run time avg per type + DefaultESAnalyzerMinNumWorkflowsForAvg = 100 + // DefaultESAnalyzerLimitToTypes controls if we want to limit ESAnalyzer only to some workflow types + DefaultESAnalyzerLimitToTypes = "" + // DefaultESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains + DefaultESAnalyzerLimitToDomains = "" +) + // StickyTaskConditionFailedErrorMsg error msg for sticky task ConditionFailedError const StickyTaskConditionFailedErrorMsg = "StickyTaskConditionFailedError" diff --git a/common/dynamicconfig/config.go b/common/dynamicconfig/config.go index 6f19f205fdc..d422bb299af 100644 --- a/common/dynamicconfig/config.go +++ b/common/dynamicconfig/config.go @@ -164,6 +164,12 @@ type BoolPropertyFnWithDomainIDAndWorkflowIDFilter func(domainID string, workflo // BoolPropertyFnWithTaskListInfoFilters is a wrapper to get bool property from dynamic config with three filters: domain, taskList, taskType type BoolPropertyFnWithTaskListInfoFilters func(domain string, taskList string, taskType int) bool +// IntPropertyFnWithWorkflowTypeFilter is a wrapper to get int property from dynamic config with domain as filter +type IntPropertyFnWithWorkflowTypeFilter func(domainName string, workflowType string) int + +// DurationPropertyFnWithDomainFilter is a wrapper to get duration property from dynamic config with domain as filter +type DurationPropertyFnWithWorkflowTypeFilter func(domainName string, workflowType string) time.Duration + // GetProperty gets a interface property and returns defaultValue if property is not found func (c *Collection) GetProperty(key Key, defaultValue interface{}) PropertyFn { return func() interface{} { @@ -210,6 +216,46 @@ func (c *Collection) GetIntPropertyFilteredByDomain(key Key, defaultValue int) I } } +// GetIntPropertyFilteredByWorkflowType gets property with workflow type filter and asserts that it's an integer +func (c *Collection) GetIntPropertyFilteredByWorkflowType(key Key, defaultValue int) IntPropertyFnWithWorkflowTypeFilter { + return func(domainName string, workflowType string) int { + filters := c.toFilterMap( + DomainFilter(domainName), + WorkflowTypeFilter(workflowType), + ) + val, err := c.client.GetIntValue( + key, + filters, + defaultValue, + ) + if err != nil { + c.logError(key, filters, err) + } + c.logValue(key, filters, val, defaultValue, intCompareEquals) + return val + } +} + +// GetDurationPropertyFilteredByWorkflowType gets property with workflow type filter and asserts that it's a duration +func (c *Collection) GetDurationPropertyFilteredByWorkflowType(key Key, defaultValue time.Duration) DurationPropertyFnWithWorkflowTypeFilter { + return func(domainName string, workflowType string) time.Duration { + filters := c.toFilterMap( + DomainFilter(domainName), + WorkflowTypeFilter(workflowType), + ) + val, err := c.client.GetDurationValue( + key, + filters, + defaultValue, + ) + if err != nil { + c.logError(key, filters, err) + } + c.logValue(key, filters, val, defaultValue, durationCompareEquals) + return val + } +} + // GetIntPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's an integer func (c *Collection) GetIntPropertyFilteredByTaskListInfo(key Key, defaultValue int) IntPropertyFnWithTaskListInfoFilters { return func(domain string, taskList string, taskType int) int { diff --git a/common/dynamicconfig/config_mock.go b/common/dynamicconfig/config_mock.go index 64830ee50b5..78abc1eec4b 100644 --- a/common/dynamicconfig/config_mock.go +++ b/common/dynamicconfig/config_mock.go @@ -44,6 +44,16 @@ func GetIntPropertyFilteredByShardID(value int) func(shardID int) int { return func(shardID int) int { return value } } +// GetIntPropertyFilteredByWorkflowType returns values as IntPropertyFnWithWorkflowTypeFilters +func GetIntPropertyFilteredByWorkflowType(value int) func(domainName string, workflowType string) int { + return func(domainName string, workflowType string) int { return value } +} + +// GetDurationPropertyFilteredByWorkflowType returns values as IntPropertyFnWithWorkflowTypeFilters +func GetDurationPropertyFilteredByWorkflowType(value time.Duration) func(domainName string, workflowType string) time.Duration { + return func(domainName string, workflowType string) time.Duration { return value } +} + // GetFloatPropertyFn returns value as FloatPropertyFn func GetFloatPropertyFn(value float64) func(opts ...FilterOption) float64 { return func(...FilterOption) float64 { return value } diff --git a/common/dynamicconfig/constants.go b/common/dynamicconfig/constants.go index b92c6db7c29..3ef28bb60ee 100644 --- a/common/dynamicconfig/constants.go +++ b/common/dynamicconfig/constants.go @@ -1886,12 +1886,18 @@ const ( // Default value: true // Allowed filters: N/A EnableBatcher - // EnableParentClosePolicyWorker is decides whether or not enable system workers for processing parent close policy task + // EnableParentClosePolicyWorker decides whether or not enable system workers for processing parent close policy task // KeyName: system.enableParentClosePolicyWorker // Value type: Bool // Default value: true // Allowed filters: N/A EnableParentClosePolicyWorker + // EnableESAnalyzer decides whether to enable system workers for processing ElasticSearch Analyzer + // KeyName: system.enableESAnalyzer + // Value type: Bool + // Default value: false + // Allowed filters: N/A + EnableESAnalyzer // EnableStickyQuery is indicates if sticky query should be enabled per domain // KeyName: system.enableStickyQuery // Value type: Bool @@ -2030,6 +2036,52 @@ const ( // TODO: https://github.com/uber/cadence/issues/3861 WorkerBlobIntegrityCheckProbability + // ESAnalyzerPause defines if we want to dynamically pause the analyzer workflow + // KeyName: worker.ESAnalyzerPause + // Value type: bool + // Default value: false + ESAnalyzerPause + // ESAnalyzerTimeWindow defines the time window ElasticSearch Analyzer will consider while taking workflow averages + // KeyName: worker.ESAnalyzerTimeWindow + // Value type: Duration + // Default value: 30 days + ESAnalyzerTimeWindow + // ESAnalyzerMaxNumDomains defines how many domains to check + // KeyName: worker.ESAnalyzerMaxNumDomains + // Value type: int + // Default value: 500 + ESAnalyzerMaxNumDomains + // ESAnalyzerMaxNumWorkflowTypes defines how many workflow types to check per domain + // KeyName: worker.ESAnalyzerMaxNumWorkflowTypes + // Value type: int + // Default value: 100 + ESAnalyzerMaxNumWorkflowTypes + // ESAnalyzerNumWorkflowsToRefresh controls how many workflows per workflow type should be refreshed per workflow type + // KeyName: worker.ESAnalyzerNumWorkflowsToRefresh + // Value type: Int + // Default value: 100 + ESAnalyzerNumWorkflowsToRefresh + // ESAnalyzerBufferWaitTime controls min time required to consider a worklow stuck + // KeyName: worker.ESAnalyzerBufferWaitTime + // Value type: Duration + // Default value: 30 minutes + ESAnalyzerBufferWaitTime + // ESAnalyzerMinNumWorkflowsForAvg controls how many workflows to have at least to rely on workflow run time avg per type + // KeyName: worker.ESAnalyzerMinNumWorkflowsForAvg + // Value type: Int + // Default value: 100 + ESAnalyzerMinNumWorkflowsForAvg + // ESAnalyzerLimitToTypes controls if we want to limit ESAnalyzer only to some workflow types + // KeyName: worker.ESAnalyzerLimitToTypes + // Value type: Int + // Default value: "" => means no limitation + ESAnalyzerLimitToTypes + // ESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains + // KeyName: worker.ESAnalyzerLimitToDomains + // Value type: Int + // Default value: "" => means no limitation + ESAnalyzerLimitToDomains + // LastKeyForTest must be the last one in this const group for testing purpose LastKeyForTest ) @@ -2073,6 +2125,7 @@ var Keys = map[Key]string{ DisallowQuery: "system.disallowQuery", EnableBatcher: "worker.enableBatcher", EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker", + EnableESAnalyzer: "system.enableESAnalyzer", EnableFailoverManager: "system.enableFailoverManager", EnableWorkflowShadower: "system.enableWorkflowShadower", EnableStickyQuery: "system.enableStickyQuery", @@ -2400,6 +2453,16 @@ var Keys = map[Key]string{ EnableArchivalCompression: "worker.EnableArchivalCompression", WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability", WorkerBlobIntegrityCheckProbability: "worker.BlobIntegrityCheckProbability", + + ESAnalyzerPause: "worker.ESAnalyzerPause", + ESAnalyzerTimeWindow: "worker.ESAnalyzerTimeWindow", + ESAnalyzerMaxNumDomains: "worker.ESAnalyzerMaxNumDomains", + ESAnalyzerMaxNumWorkflowTypes: "worker.ESAnalyzerMaxNumWorkflowTypes", + ESAnalyzerNumWorkflowsToRefresh: "worker.ESAnalyzerNumWorkflowsToRefresh", + ESAnalyzerBufferWaitTime: "worker.ESAnalyzerBufferWaitTime", + ESAnalyzerMinNumWorkflowsForAvg: "worker.ESAnalyzerMinNumWorkflowsForAvg", + ESAnalyzerLimitToTypes: "worker.ESAnalyzerLimitToTypes", + ESAnalyzerLimitToDomains: "worker.ESAnalyzerLimitToDomains", } var KeyNames map[string]Key diff --git a/common/dynamicconfig/filter.go b/common/dynamicconfig/filter.go index d24e3311233..edf65b39c68 100644 --- a/common/dynamicconfig/filter.go +++ b/common/dynamicconfig/filter.go @@ -46,6 +46,8 @@ func ParseFilter(filterName string) Filter { return ClusterName case "workflowID": return WorkflowID + case "workflowType": + return WorkflowType default: return UnknownFilter } @@ -60,6 +62,7 @@ var filters = []string{ "shardID", "clusterName", "workflowID", + "workflowType", } const ( @@ -78,6 +81,8 @@ const ( ClusterName // WorkflowID is the workflow id WorkflowID + // WorkflowType is the workflow type name + WorkflowType // LastFilterTypeForTest must be the last one in this const group for testing purpose LastFilterTypeForTest @@ -134,3 +139,10 @@ func WorkflowIDFilter(workflowID string) FilterOption { filterMap[WorkflowID] = workflowID } } + +// WorkflowType filters by workflow type name +func WorkflowTypeFilter(name string) FilterOption { + return func(filterMap map[Filter]interface{}) { + filterMap[WorkflowType] = name + } +} diff --git a/common/elasticsearch/client_v6.go b/common/elasticsearch/client_v6.go index 67c53cf5d6a..e791675db60 100644 --- a/common/elasticsearch/client_v6.go +++ b/common/elasticsearch/client_v6.go @@ -448,6 +448,49 @@ func (c *elasticV6) search(ctx context.Context, p *searchParametersV6) (*elastic return searchService.Do(ctx) } +func (c *elasticV6) SearchRaw(ctx context.Context, index string, query string) (*RawResponse, error) { + // There's slight differences between the v6 and v7 response preventing us to move + // this to a common function + esResult, err := c.client.Search(index).Source(query).Do(ctx) + if err != nil { + return nil, err + } + + if esResult.Error != nil { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: %#v", esResult.Error), + } + } else if esResult.TimedOut { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: Request timed out: %v ms", esResult.TookInMillis), + } + } + + result := RawResponse{ + TookInMillis: esResult.TookInMillis, + } + + result.Hits = SearchHits{ + TotalHits: esResult.TotalHits(), + } + if esResult.Hits != nil && len(esResult.Hits.Hits) > 0 { + result.Hits.Hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0, len(esResult.Hits.Hits)) + for _, hit := range esResult.Hits.Hits { + workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit) + result.Hits.Hits = append(result.Hits.Hits, workflowExecutionInfo) + } + } + + if len(esResult.Aggregations) > 0 { + result.Aggregations = make(map[string]json.RawMessage, len(esResult.Aggregations)) + for key, agg := range esResult.Aggregations { + result.Aggregations[key] = *agg + } + } + + return &result, nil +} + func (c *elasticV6) searchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error) { return c.client.Search(index).Source(query).Do(ctx) } @@ -609,6 +652,8 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit } record := &p.InternalVisibilityWorkflowExecutionInfo{ + DomainID: source.DomainID, + WorkflowType: source.WorkflowType, WorkflowID: source.WorkflowID, RunID: source.RunID, TypeName: source.WorkflowType, diff --git a/common/elasticsearch/client_v7.go b/common/elasticsearch/client_v7.go index bb6da970dcc..dd4158e9856 100644 --- a/common/elasticsearch/client_v7.go +++ b/common/elasticsearch/client_v7.go @@ -447,6 +447,43 @@ func (c *elasticV7) search(ctx context.Context, p *searchParametersV7) (*elastic return searchService.Do(ctx) } +func (c *elasticV7) SearchRaw(ctx context.Context, index string, query string) (*RawResponse, error) { + // There's slight differences between the v6 and v7 response preventing us to move + // this to a common function + esResult, err := c.client.Search(index).Source(query).Do(ctx) + if err != nil { + return nil, err + } + + if esResult.Error != nil { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: %#v", esResult.Error), + } + } else if esResult.TimedOut { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch Error: Request timed out: %v ms", esResult.TookInMillis), + } + } + + result := RawResponse{ + TookInMillis: esResult.TookInMillis, + Aggregations: esResult.Aggregations, + } + + result.Hits = SearchHits{ + TotalHits: esResult.TotalHits(), + } + if esResult.Hits != nil && len(esResult.Hits.Hits) > 0 { + result.Hits.Hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0, len(esResult.Hits.Hits)) + for _, hit := range esResult.Hits.Hits { + workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit) + result.Hits.Hits = append(result.Hits.Hits, workflowExecutionInfo) + } + } + + return &result, nil +} + func (c *elasticV7) searchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error) { return c.client.Search(index).Source(query).Do(ctx) } @@ -608,6 +645,8 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit } record := &p.InternalVisibilityWorkflowExecutionInfo{ + DomainID: source.DomainID, + WorkflowType: source.WorkflowType, WorkflowID: source.WorkflowID, RunID: source.RunID, TypeName: source.WorkflowType, diff --git a/common/elasticsearch/interfaces.go b/common/elasticsearch/interfaces.go index 1d701a247e6..0419b299c2e 100644 --- a/common/elasticsearch/interfaces.go +++ b/common/elasticsearch/interfaces.go @@ -22,6 +22,7 @@ package elasticsearch import ( "context" + "encoding/json" "fmt" "time" @@ -58,6 +59,8 @@ type ( Search(ctx context.Context, request *SearchRequest) (*SearchResponse, error) // SearchByQuery is the generic purpose searching SearchByQuery(ctx context.Context, request *SearchByQueryRequest) (*SearchResponse, error) + // SearchRaw is for searching with raw json. Returns RawResult object which is subset of ESv6 and ESv7 response + SearchRaw(ctx context.Context, index, query string) (*RawResponse, error) // ScanByQuery is also generic purpose searching, but implemented with ScrollService of ElasticSearch, // which is more performant for pagination, but comes with some limitation of in-parallel requests. ScanByQuery(ctx context.Context, request *ScanByQueryRequest) (*SearchResponse, error) @@ -211,6 +214,7 @@ type ( WorkflowID string RunID string WorkflowType string + DomainID string StartTime int64 ExecutionTime int64 CloseTime int64 @@ -223,4 +227,15 @@ type ( NumClusters int16 Attr map[string]interface{} } + + SearchHits struct { + TotalHits int64 + Hits []*p.InternalVisibilityWorkflowExecutionInfo + } + + RawResponse struct { + TookInMillis int64 + Hits SearchHits + Aggregations map[string]json.RawMessage + } ) diff --git a/common/elasticsearch/mocks/GenericClient.go b/common/elasticsearch/mocks/GenericClient.go index 7e6b2793050..51458af2dee 100644 --- a/common/elasticsearch/mocks/GenericClient.go +++ b/common/elasticsearch/mocks/GenericClient.go @@ -169,6 +169,28 @@ func (_m *GenericClient) Search(ctx context.Context, request *elasticsearch.Sear return r0, r1 } +func (_m *GenericClient) SearchRaw(ctx context.Context, index string, query string) (*elasticsearch.RawResponse, error) { + ret := _m.Called(ctx, index, query) + + var r0 *elasticsearch.RawResponse + if rf, ok := ret.Get(0).(func(context.Context, string, string) *elasticsearch.RawResponse); ok { + r0 = rf(ctx, index, query) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*elasticsearch.RawResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, index, query) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + // SearchByQuery provides a mock function with given fields: ctx, request func (_m *GenericClient) SearchByQuery(ctx context.Context, request *elasticsearch.SearchByQueryRequest) (*persistence.InternalListWorkflowExecutionsResponse, error) { ret := _m.Called(ctx, request) diff --git a/common/log/tag/tags.go b/common/log/tag/tags.go index c5751a6da7b..457787c5a50 100644 --- a/common/log/tag/tags.go +++ b/common/log/tag/tags.go @@ -607,6 +607,11 @@ func ESDocID(id string) Tag { return newStringTag("es-doc-id", id) } +// ESAggregationID returns tag for ESDocID +func ESAggregationID(id string) Tag { + return newStringTag("es-agg-id", id) +} + // LoggingCallAtKey is reserved tag const LoggingCallAtKey = "logging-call-at" @@ -809,3 +814,8 @@ func ArchivalBlobIntegrityCheckFailReason(blobIntegrityCheckFailReason string) T func ArchivalBlobstoreContextTimeout(blobstoreContextTimeout time.Duration) Tag { return newDurationTag("archival-blobstore-context-timeout", blobstoreContextTimeout) } + +// VisibilityQuery returns tag for the query for getting visibility records +func VisibilityQuery(query string) Tag { + return newStringTag("visibility-query", query) +} diff --git a/common/metrics/defs.go b/common/metrics/defs.go index e3b16edf948..869407a8af2 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1184,6 +1184,8 @@ const ( ShardScannerScope // CheckDataCorruptionWorkflowScope is scope used by the data corruption workflow CheckDataCorruptionWorkflowScope + // ESAnalyzerScope is scope used by ElasticSearch Analyzer (esanalyzer) workflow + ESAnalyzerScope NumWorkerScopes ) @@ -1719,6 +1721,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{ HistoryScavengerScope: {operation: "historyscavenger"}, BatcherScope: {operation: "batcher"}, ParentClosePolicyProcessorScope: {operation: "ParentClosePolicyProcessor"}, + ESAnalyzerScope: {operation: "ESAnalyzer"}, }, } @@ -2234,6 +2237,9 @@ const ( DataCorruptionWorkflowFailure DataCorruptionWorkflowSuccessCount DataCorruptionWorkflowSkipCount + ESAnalyzerNumStuckWorkflowsDiscovered + ESAnalyzerNumStuckWorkflowsRefreshed + ESAnalyzerNumStuckWorkflowsFailedToRefresh NumWorkerMetrics ) @@ -2757,6 +2763,9 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{ DataCorruptionWorkflowSuccessCount: {metricName: "data_corruption_workflow_success", metricType: Counter}, DataCorruptionWorkflowCount: {metricName: "data_corruption_workflow_count", metricType: Counter}, DataCorruptionWorkflowSkipCount: {metricName: "data_corruption_workflow_skips", metricType: Counter}, + ESAnalyzerNumStuckWorkflowsDiscovered: {metricName: "es_analyzer_num_stuck_workflows_discovered", metricType: Counter}, + ESAnalyzerNumStuckWorkflowsRefreshed: {metricName: "es_analyzer_num_stuck_workflows_refreshed", metricType: Counter}, + ESAnalyzerNumStuckWorkflowsFailedToRefresh: {metricName: "es_analyzer_num_stuck_workflows_failed_to_refresh", metricType: Counter}, }, } diff --git a/common/persistence/dataStoreInterfaces.go b/common/persistence/dataStoreInterfaces.go index 9e312b77310..4eeffa7fb48 100644 --- a/common/persistence/dataStoreInterfaces.go +++ b/common/persistence/dataStoreInterfaces.go @@ -629,6 +629,8 @@ type ( // InternalVisibilityWorkflowExecutionInfo is visibility info for internal response InternalVisibilityWorkflowExecutionInfo struct { + DomainID string + WorkflowType string WorkflowID string RunID string TypeName string diff --git a/common/persistence/elasticsearch/decodeBench_test.go b/common/persistence/elasticsearch/decodeBench_test.go index deb54b6f783..cebb3039d02 100644 --- a/common/persistence/elasticsearch/decodeBench_test.go +++ b/common/persistence/elasticsearch/decodeBench_test.go @@ -64,6 +64,8 @@ func BenchmarkJSONDecodeToType(b *testing.B) { var source *es.VisibilityRecord json.Unmarshal(*bytes, &source) record := &p.InternalVisibilityWorkflowExecutionInfo{ + DomainID: source.DomainID, + WorkflowType: source.WorkflowType, WorkflowID: source.WorkflowID, RunID: source.RunID, TypeName: source.WorkflowType, @@ -95,6 +97,8 @@ func BenchmarkJSONDecodeToMap(b *testing.B) { historyLen, _ := source[definition.HistoryLength].(json.Number).Int64() record := &p.InternalVisibilityWorkflowExecutionInfo{ + DomainID: source[definition.DomainID].(string), + WorkflowType: source[definition.WorkflowType].(string), WorkflowID: source[definition.WorkflowID].(string), RunID: source[definition.RunID].(string), TypeName: source[definition.WorkflowType].(string), diff --git a/service/history/config/config.go b/service/history/config/config.go index 791cbf5e6ab..d1ed56da1c0 100644 --- a/service/history/config/config.go +++ b/service/history/config/config.go @@ -219,6 +219,8 @@ type Config struct { EnableParentClosePolicy dynamicconfig.BoolPropertyFnWithDomainFilter // whether or not enable system workers for processing parent close policy task EnableParentClosePolicyWorker dynamicconfig.BoolPropertyFn + // whether to enable system workers for processing ElasticSearch Analyzer + EnableESAnalyzer dynamicconfig.BoolPropertyFn // parent close policy will be processed by sys workers(if enabled) if // the number of children greater than or equal to this threshold ParentClosePolicyThreshold dynamicconfig.IntPropertyFnWithDomainFilter diff --git a/service/worker/esanalyzer/analyzer.go b/service/worker/esanalyzer/analyzer.go new file mode 100644 index 00000000000..1dff26d5f2a --- /dev/null +++ b/service/worker/esanalyzer/analyzer.go @@ -0,0 +1,136 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package esanalyzer + +import ( + "context" + "time" + + "github.com/opentracing/opentracing-go" + "github.com/uber-go/tally" + "go.uber.org/cadence/.gen/go/cadence/workflowserviceclient" + "go.uber.org/cadence/.gen/go/shared" + cclient "go.uber.org/cadence/client" + "go.uber.org/cadence/worker" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/client/frontend" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/config" + "github.com/uber/cadence/common/dynamicconfig" + es "github.com/uber/cadence/common/elasticsearch" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/resource" + "github.com/uber/cadence/service/worker/workercommon" +) + +type ( + // Analyzer is the background sub-system to query ElasticSearch and execute mitigations + Analyzer struct { + svcClient workflowserviceclient.Interface + frontendClient frontend.Client + clientBean client.Bean + esClient es.GenericClient + logger log.Logger + metricsClient metrics.Client + tallyScope tally.Scope + visibilityIndexName string + resource resource.Resource + domainCache cache.DomainCache + config *Config + } + + // Config contains all configs for ElasticSearch Analyzer + Config struct { + ESAnalyzerPause dynamicconfig.BoolPropertyFn + ESAnalyzerTimeWindow dynamicconfig.DurationPropertyFn + ESAnalyzerMaxNumDomains dynamicconfig.IntPropertyFn + ESAnalyzerMaxNumWorkflowTypes dynamicconfig.IntPropertyFn + ESAnalyzerLimitToTypes dynamicconfig.StringPropertyFn + ESAnalyzerLimitToDomains dynamicconfig.StringPropertyFn + ESAnalyzerNumWorkflowsToRefresh dynamicconfig.IntPropertyFnWithWorkflowTypeFilter + ESAnalyzerBufferWaitTime dynamicconfig.DurationPropertyFnWithWorkflowTypeFilter + ESAnalyzerMinNumWorkflowsForAvg dynamicconfig.IntPropertyFnWithWorkflowTypeFilter + } +) + +const startUpDelay = time.Second * 10 + +// New returns a new instance as daemon +func New( + svcClient workflowserviceclient.Interface, + frontendClient frontend.Client, + clientBean client.Bean, + esClient es.GenericClient, + esConfig *config.ElasticSearchConfig, + logger log.Logger, + metricsClient metrics.Client, + tallyScope tally.Scope, + resource resource.Resource, + domainCache cache.DomainCache, + config *Config, +) *Analyzer { + return &Analyzer{ + svcClient: svcClient, + frontendClient: frontendClient, + clientBean: clientBean, + esClient: esClient, + logger: logger, + metricsClient: metricsClient, + tallyScope: tallyScope, + visibilityIndexName: esConfig.Indices[common.VisibilityAppName], + resource: resource, + domainCache: domainCache, + config: config, + } +} + +// Start starts the scanner +func (a *Analyzer) Start() error { + ctx := context.Background() + a.StartWorkflow(ctx) + + workerOpts := worker.Options{ + MetricsScope: a.tallyScope, + BackgroundActivityContext: ctx, + Tracer: opentracing.GlobalTracer(), + } + esWorker := worker.New(a.svcClient, common.SystemLocalDomainName, taskListName, workerOpts) + err := esWorker.Start() + return err +} + +func (a *Analyzer) StartWorkflow(ctx context.Context) { + initWorkflow(a) + go workercommon.StartWorkflowWithRetry(esanalyzerWFTypeName, startUpDelay, a.resource, func(client cclient.Client) error { + _, err := client.StartWorkflow(ctx, wfOptions, esanalyzerWFTypeName) + switch err.(type) { + case *shared.WorkflowExecutionAlreadyStartedError: + return nil + default: + a.logger.Error("Failed to start ElasticSearch Analyzer", tag.Error(err)) + return err + } + }) +} diff --git a/service/worker/esanalyzer/analyzer_test.go b/service/worker/esanalyzer/analyzer_test.go new file mode 100644 index 00000000000..5b88e3b9889 --- /dev/null +++ b/service/worker/esanalyzer/analyzer_test.go @@ -0,0 +1,493 @@ +// Copyright (c) 2017-2021 Uber Technologies Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABxILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package esanalyzer + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/cadence/activity" + "go.uber.org/cadence/testsuite" + "go.uber.org/cadence/worker" + "go.uber.org/cadence/workflow" + + "github.com/uber/cadence/common/dynamicconfig" + "github.com/uber/cadence/common/elasticsearch" + esMocks "github.com/uber/cadence/common/elasticsearch/mocks" + + "github.com/uber/cadence/client" + "github.com/uber/cadence/client/admin" + "github.com/uber/cadence/common/cache" + "github.com/uber/cadence/common/cluster" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/metrics/mocks" + "github.com/uber/cadence/common/persistence" + "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/history/resource" +) + +type esanalyzerWorkflowTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + activityEnv *testsuite.TestActivityEnvironment + workflowEnv *testsuite.TestWorkflowEnvironment + controller *gomock.Controller + resource *resource.Test + mockAdminClient *admin.MockClient + mockDomainCache *cache.MockDomainCache + clientBean *client.MockBean + logger *log.MockLogger + mockMetricClient *mocks.Client + mockESClient *esMocks.GenericClient + analyzer *Analyzer + workflow *Workflow + config Config + DomainID string + DomainName string + WorkflowType string + WorkflowID string + RunID string +} + +func TestESAnalyzerWorkflowTestSuite(t *testing.T) { + suite.Run(t, new(esanalyzerWorkflowTestSuite)) +} + +func (s *esanalyzerWorkflowTestSuite) SetupTest() { + s.DomainID = "deadbeef-0123-4567-890a-bcdef0123460" + s.DomainName = "test-domain" + s.WorkflowType = "test-workflow-type" + s.WorkflowID = "test-workflow_id" + s.RunID = "test-run_id" + + activeDomainCache := cache.NewGlobalDomainCacheEntryForTest( + &persistence.DomainInfo{ID: s.DomainID, Name: s.DomainName}, + &persistence.DomainConfig{Retention: 1}, + &persistence.DomainReplicationConfig{ + ActiveClusterName: cluster.TestCurrentClusterName, + Clusters: []*persistence.ClusterReplicationConfig{ + {ClusterName: cluster.TestCurrentClusterName}, + {ClusterName: cluster.TestAlternativeClusterName}, + }, + }, + 1234, + cluster.GetTestClusterMetadata(true, true), + ) + + s.config = Config{ + ESAnalyzerPause: dynamicconfig.GetBoolPropertyFn(false), + ESAnalyzerTimeWindow: dynamicconfig.GetDurationPropertyFn(time.Hour * 24 * 30), + ESAnalyzerMaxNumDomains: dynamicconfig.GetIntPropertyFn(500), + ESAnalyzerMaxNumWorkflowTypes: dynamicconfig.GetIntPropertyFn(100), + ESAnalyzerLimitToTypes: dynamicconfig.GetStringPropertyFn(""), + ESAnalyzerLimitToDomains: dynamicconfig.GetStringPropertyFn(""), + ESAnalyzerNumWorkflowsToRefresh: dynamicconfig.GetIntPropertyFilteredByWorkflowType(2), + ESAnalyzerBufferWaitTime: dynamicconfig.GetDurationPropertyFilteredByWorkflowType(time.Minute * 30), + ESAnalyzerMinNumWorkflowsForAvg: dynamicconfig.GetIntPropertyFilteredByWorkflowType(100), + } + + s.activityEnv = s.NewTestActivityEnvironment() + s.workflowEnv = s.NewTestWorkflowEnvironment() + s.controller = gomock.NewController(s.T()) + s.mockDomainCache = cache.NewMockDomainCache(s.controller) + s.resource = resource.NewTest(s.controller, metrics.Worker) + s.mockAdminClient = admin.NewMockClient(s.controller) + s.clientBean = client.NewMockBean(s.controller) + s.logger = &log.MockLogger{} + s.mockMetricClient = &mocks.Client{} + s.mockESClient = &esMocks.GenericClient{} + + s.mockDomainCache.EXPECT().GetDomainByID(s.DomainID).Return(activeDomainCache, nil).AnyTimes() + s.mockDomainCache.EXPECT().GetDomain(s.DomainName).Return(activeDomainCache, nil).AnyTimes() + s.clientBean.EXPECT().GetRemoteAdminClient(cluster.TestCurrentClusterName).Return(s.mockAdminClient).AnyTimes() + + // SET UP ANALYZER + s.analyzer = &Analyzer{ + svcClient: s.resource.GetSDKClient(), + clientBean: s.clientBean, + domainCache: s.mockDomainCache, + logger: s.logger, + metricsClient: s.mockMetricClient, + esClient: s.mockESClient, + config: &s.config, + } + s.activityEnv.SetTestTimeout(time.Second * 5) + s.activityEnv.SetWorkerOptions(worker.Options{BackgroundActivityContext: context.Background()}) + + // REGISTER WORKFLOWS AND ACTIVITIES + s.workflow = &Workflow{analyzer: s.analyzer} + s.workflowEnv.RegisterWorkflowWithOptions( + s.workflow.workflowFunc, + workflow.RegisterOptions{Name: esanalyzerWFTypeName}) + + s.workflowEnv.RegisterActivityWithOptions( + s.workflow.getWorkflowTypes, + activity.RegisterOptions{Name: getWorkflowTypesActivity}) + s.workflowEnv.RegisterActivityWithOptions( + s.workflow.findStuckWorkflows, + activity.RegisterOptions{Name: findStuckWorkflowsActivity}) + s.workflowEnv.RegisterActivityWithOptions( + s.workflow.refreshStuckWorkflowsFromSameWorkflowType, + activity.RegisterOptions{Name: refreshStuckWorkflowsActivity}, + ) + + s.activityEnv.RegisterActivityWithOptions( + s.workflow.getWorkflowTypes, + activity.RegisterOptions{Name: getWorkflowTypesActivity}) + s.activityEnv.RegisterActivityWithOptions( + s.workflow.findStuckWorkflows, + activity.RegisterOptions{Name: findStuckWorkflowsActivity}) + s.activityEnv.RegisterActivityWithOptions( + s.workflow.refreshStuckWorkflowsFromSameWorkflowType, + activity.RegisterOptions{Name: refreshStuckWorkflowsActivity}, + ) +} + +func (s *esanalyzerWorkflowTestSuite) TearDownTest() { + defer s.controller.Finish() + defer s.resource.Finish(s.T()) + + s.workflowEnv.AssertExpectations(s.T()) +} + +func (s *esanalyzerWorkflowTestSuite) TestExecuteWorkflow() { + workflowTypeInfos := []WorkflowTypeInfo{ + { + Name: s.WorkflowType, + NumWorkflows: 564, + Duration: Duration{AvgExecTimeNanoseconds: float64(123 * time.Second)}, + }, + } + s.workflowEnv.OnActivity(getWorkflowTypesActivity, mock.Anything). + Return(workflowTypeInfos, nil).Times(1) + + workflows := []WorkflowInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + } + s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[0]). + Return(workflows, nil).Times(1) + + s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows).Return(nil).Times(1) + + s.workflowEnv.ExecuteWorkflow(esanalyzerWFTypeName) + err := s.workflowEnv.GetWorkflowResult(nil) + s.NoError(err) +} + +func (s *esanalyzerWorkflowTestSuite) TestExecuteWorkflowMultipleWorkflowTypes() { + workflowTypeInfos := []WorkflowTypeInfo{ + { + Name: s.WorkflowType, + NumWorkflows: 564, + Duration: Duration{AvgExecTimeNanoseconds: float64(123 * time.Second)}, + }, + { + Name: "another-workflow-type", + NumWorkflows: 778, + Duration: Duration{AvgExecTimeNanoseconds: float64(332 * time.Second)}, + }, + } + s.workflowEnv.OnActivity(getWorkflowTypesActivity, mock.Anything). + Return(workflowTypeInfos, nil).Times(1) + + workflows1 := []WorkflowInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + } + workflows2 := []WorkflowInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + } + s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[0]). + Return(workflows1, nil).Times(1) + s.workflowEnv.OnActivity(findStuckWorkflowsActivity, mock.Anything, workflowTypeInfos[1]). + Return(workflows2, nil).Times(1) + + s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows1).Return(nil).Times(1) + s.workflowEnv.OnActivity(refreshStuckWorkflowsActivity, mock.Anything, workflows2).Return(nil).Times(1) + + s.workflowEnv.ExecuteWorkflow(esanalyzerWFTypeName) + err := s.workflowEnv.GetWorkflowResult(nil) + s.NoError(err) +} + +func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowTypeSingleWorkflow() { + workflows := []WorkflowInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + } + + s.mockAdminClient.EXPECT().RefreshWorkflowTasks(gomock.Any(), &types.RefreshWorkflowTasksRequest{ + Domain: s.DomainName, + Execution: &types.WorkflowExecution{ + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + }).Return(nil).Times(1) + s.logger.On("Info", "Refreshed stuck workflow", mock.Anything).Return().Once() + s.mockMetricClient.On("IncCounter", metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Once() + + _, err := s.activityEnv.ExecuteActivity(s.workflow.refreshStuckWorkflowsFromSameWorkflowType, workflows) + s.NoError(err) +} + +func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowTypeMultipleWorkflows() { + anotherWorkflowID := "another-worklow-id" + anotherRunID := "another-run-id" + + workflows := []WorkflowInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + { + DomainID: s.DomainID, + WorkflowID: anotherWorkflowID, + RunID: anotherRunID, + }, + } + + expectedWorkflows := map[string]bool{s.WorkflowID: false, anotherWorkflowID: false} + expectedRunIDs := map[string]bool{s.RunID: false, anotherRunID: false} + s.mockAdminClient.EXPECT().RefreshWorkflowTasks(gomock.Any(), gomock.Any()).Return(nil).Do(func( + ctx context.Context, + request *types.RefreshWorkflowTasksRequest, + ) { + expectedWorkflows[request.Execution.WorkflowID] = true + expectedRunIDs[request.Execution.RunID] = true + }).Times(2) + s.logger.On("Info", "Refreshed stuck workflow", mock.Anything).Return().Times(2) + s.mockMetricClient.On("IncCounter", metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Times(2) + + _, err := s.activityEnv.ExecuteActivity(s.workflow.refreshStuckWorkflowsFromSameWorkflowType, workflows) + s.NoError(err) + + s.Equal(2, len(expectedWorkflows)) + s.True(expectedWorkflows[s.WorkflowID]) + s.True(expectedWorkflows[anotherWorkflowID]) + + s.Equal(2, len(expectedRunIDs)) + s.True(expectedRunIDs[s.RunID]) + s.True(expectedRunIDs[anotherRunID]) +} + +func (s *esanalyzerWorkflowTestSuite) TestRefreshStuckWorkflowsFromSameWorkflowInconsistentDomain() { + anotherDomainID := "another-domain-id" + anotherWorkflowID := "another-worklow-id" + anotherRunID := "another-run-id" + + workflows := []WorkflowInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + { + DomainID: anotherDomainID, + WorkflowID: anotherWorkflowID, + RunID: anotherRunID, + }, + } + + expectedWorkflows := map[string]bool{s.WorkflowID: false, anotherWorkflowID: false} + expectedRunIDs := map[string]bool{s.RunID: false, anotherRunID: false} + s.mockAdminClient.EXPECT().RefreshWorkflowTasks(gomock.Any(), gomock.Any()).Return(nil).Do(func( + ctx context.Context, + request *types.RefreshWorkflowTasksRequest, + ) { + expectedWorkflows[request.Execution.WorkflowID] = true + expectedRunIDs[request.Execution.RunID] = true + }).Times(1) + s.logger.On("Info", "Refreshed stuck workflow", mock.Anything).Return().Times(2) + s.mockMetricClient.On("IncCounter", metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed).Return().Times(2) + + _, err := s.activityEnv.ExecuteActivity(s.workflow.refreshStuckWorkflowsFromSameWorkflowType, workflows) + s.Error(err) + s.EqualError(err, "InternalServiceError{Message: Inconsistent worklow. Expected domainID: deadbeef-0123-4567-890a-bcdef0123460, actual: another-domain-id}") +} + +func (s *esanalyzerWorkflowTestSuite) TestFindStuckWorkflows() { + info := WorkflowTypeInfo{ + DomainID: s.DomainID, + Name: s.WorkflowType, + NumWorkflows: 123113, + Duration: Duration{AvgExecTimeNanoseconds: float64(100 * time.Minute)}, + } + + s.mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + &elasticsearch.RawResponse{ + TookInMillis: 12, + Hits: elasticsearch.SearchHits{ + TotalHits: 2, + Hits: []*persistence.InternalVisibilityWorkflowExecutionInfo{ + { + DomainID: s.DomainID, + WorkflowID: s.WorkflowID, + RunID: s.RunID, + }, + { + DomainID: s.DomainID, + WorkflowID: "workflow2", + RunID: "run2", + }, + }, + }, + }, + nil).Times(1) + s.mockMetricClient.On( + "AddCounter", + metrics.ESAnalyzerScope, + metrics.ESAnalyzerNumStuckWorkflowsDiscovered, + int64(2), + ).Return().Times(1) + + actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.findStuckWorkflows, info) + s.NoError(err) + var results []WorkflowInfo + err = actFuture.Get(&results) + s.NoError(err) + s.Equal(2, len(results)) + s.Equal(WorkflowInfo{DomainID: s.DomainID, WorkflowID: s.WorkflowID, RunID: s.RunID}, results[0]) + s.Equal(WorkflowInfo{DomainID: s.DomainID, WorkflowID: "workflow2", RunID: "run2"}, results[1]) +} + +func (s *esanalyzerWorkflowTestSuite) TestFindStuckWorkflowsNotEnoughWorkflows() { + info := WorkflowTypeInfo{ + DomainID: s.DomainID, + Name: s.WorkflowType, + NumWorkflows: int64(s.config.ESAnalyzerMinNumWorkflowsForAvg(s.DomainID, s.WorkflowType) - 1), + Duration: Duration{AvgExecTimeNanoseconds: float64(100 * time.Minute)}, + } + + s.logger.On("Warn", mock.Anything, mock.Anything).Return().Times(1) + + actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.findStuckWorkflows, info) + s.NoError(err) + var results []WorkflowInfo + err = actFuture.Get(&results) + s.NoError(err) + s.Equal(0, len(results)) +} + +func (s *esanalyzerWorkflowTestSuite) TestFindStuckWorkflowsMinNumWorkflowValidationSkipped() { + info := WorkflowTypeInfo{ + DomainID: s.DomainID, + Name: s.WorkflowType, + NumWorkflows: int64(s.config.ESAnalyzerMinNumWorkflowsForAvg(s.DomainID, s.WorkflowType) - 1), + Duration: Duration{AvgExecTimeNanoseconds: float64(100 * time.Minute)}, + } + + s.config.ESAnalyzerLimitToTypes = dynamicconfig.GetStringPropertyFn(s.WorkflowType) + s.logger.On("Info", mock.Anything, mock.Anything).Return().Times(1) + s.mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything). + Return(&elasticsearch.RawResponse{}, nil).Times(1) + + actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.findStuckWorkflows, info) + s.NoError(err) + var results []WorkflowInfo + err = actFuture.Get(&results) + s.NoError(err) + s.Equal(0, len(results)) +} + +func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypes() { + esResult := struct { + Buckets []DomainInfo `json:"buckets"` + }{ + Buckets: []DomainInfo{ + { + DomainID: "aaa-bbb-ccc", + WFTypeContainer: WorkflowTypeInfoContainer{ + WorkflowTypes: []WorkflowTypeInfo{ + { + Name: s.WorkflowType, + NumWorkflows: 564, + Duration: Duration{AvgExecTimeNanoseconds: float64(123 * time.Second)}, + }, + { + Name: "another-workflow-type", + NumWorkflows: 745, + Duration: Duration{AvgExecTimeNanoseconds: float64(987 * time.Second)}, + }, + }, + }, + }, + }, + } + encoded, err := json.Marshal(esResult) + s.NoError(err) + + s.mockESClient.On("SearchRaw", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return( + &elasticsearch.RawResponse{ + TookInMillis: 12, + Aggregations: map[string]json.RawMessage{ + "domains": json.RawMessage(encoded), + }, + }, + nil).Times(1) + s.logger.On("Info", mock.Anything, mock.Anything).Return().Once() + + actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.getWorkflowTypes) + s.NoError(err) + var results []WorkflowTypeInfo + err = actFuture.Get(&results) + s.NoError(err) + s.Equal(2, len(results)) + s.Equal(normalizeDomainInfos(esResult.Buckets), results) +} + +func (s *esanalyzerWorkflowTestSuite) TestGetWorkflowTypesFromConfig() { + workflowTypes := []WorkflowTypeInfo{ + {DomainID: s.DomainID, Name: "workflow1"}, + {DomainID: s.DomainID, Name: "workflow2"}, + } + + s.config.ESAnalyzerLimitToTypes = dynamicconfig.GetStringPropertyFn(`["test-domain/workflow1","test-domain/workflow2"]`) + s.logger.On("Info", mock.Anything, mock.Anything).Return().Once() + + actFuture, err := s.activityEnv.ExecuteActivity(s.workflow.getWorkflowTypes) + s.NoError(err) + var results []WorkflowTypeInfo + err = actFuture.Get(&results) + s.NoError(err) + s.Equal(2, len(results)) + s.Equal(workflowTypes, results) +} diff --git a/service/worker/esanalyzer/workflow.go b/service/worker/esanalyzer/workflow.go new file mode 100644 index 00000000000..265b46d8f20 --- /dev/null +++ b/service/worker/esanalyzer/workflow.go @@ -0,0 +1,547 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package esanalyzer + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "go.uber.org/cadence" + "go.uber.org/cadence/activity" + cclient "go.uber.org/cadence/client" + "go.uber.org/cadence/workflow" + "go.uber.org/zap" + + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/types" +) + +const ( + domainsAggKey = "domains" + wfTypesAggKey = "wfTypes" + + // workflow constants + esAnalyzerWFID = "cadence-sys-tl-esanalyzer" + taskListName = "cadence-sys-es-analyzer" + esanalyzerWFTypeName = "cadence-sys-es-analyzer-workflow" + getWorkflowTypesActivity = "cadence-sys-es-analyzer-get-workflow-types" + findStuckWorkflowsActivity = "cadence-sys-es-analyzer-find-stuck-workflows" + refreshStuckWorkflowsActivity = "cadence-sys-es-analyzer-refresh-stuck-workflows" +) + +type ( + Workflow struct { + analyzer *Analyzer + } + Duration struct { + AvgExecTimeNanoseconds float64 `json:"value"` + } + + WorkflowTypeInfo struct { + DomainID string // this won't come from ES result json + Name string `json:"key"` + NumWorkflows int64 `json:"doc_count"` + Duration Duration `json:"duration"` + } + + WorkflowTypeInfoContainer struct { + WorkflowTypes []WorkflowTypeInfo `json:"buckets"` + } + + DomainInfo struct { + DomainID string `json:"key"` + NumWorkflows int64 `json:"doc_count"` + WFTypeContainer WorkflowTypeInfoContainer `json:"wfTypes"` + } + + WorkflowInfo struct { + DomainID string `json:"DomainID"` + WorkflowID string `json:"WorkflowID"` + RunID string `json:"RunID"` + } +) + +var ( + retryPolicy = cadence.RetryPolicy{ + InitialInterval: 10 * time.Second, + BackoffCoefficient: 1.7, + MaximumInterval: 5 * time.Minute, + ExpirationInterval: time.Hour, + } + + getWorkflowTypesOptions = workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: 5 * time.Minute, + RetryPolicy: &retryPolicy, + } + findStuckWorkflowsOptions = workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: 3 * time.Minute, + RetryPolicy: &retryPolicy, + } + refreshStuckWorkflowsOptions = workflow.ActivityOptions{ + ScheduleToStartTimeout: time.Minute, + StartToCloseTimeout: 10 * time.Minute, + RetryPolicy: &retryPolicy, + } + + wfOptions = cclient.StartWorkflowOptions{ + ID: esAnalyzerWFID, + TaskList: taskListName, + ExecutionStartToCloseTimeout: 24 * time.Hour, + CronSchedule: "0 * * * *", // "At minute 0" => every hour + } +) + +func initWorkflow(a *Analyzer) { + w := Workflow{analyzer: a} + workflow.RegisterWithOptions(w.workflowFunc, workflow.RegisterOptions{Name: esanalyzerWFTypeName}) + activity.RegisterWithOptions(w.getWorkflowTypes, activity.RegisterOptions{Name: getWorkflowTypesActivity}) + activity.RegisterWithOptions(w.findStuckWorkflows, activity.RegisterOptions{Name: findStuckWorkflowsActivity}) + activity.RegisterWithOptions( + w.refreshStuckWorkflowsFromSameWorkflowType, + activity.RegisterOptions{Name: refreshStuckWorkflowsActivity}, + ) +} + +// workflowFunc queries ElasticSearch to detect issues and mitigates them +func (w *Workflow) workflowFunc(ctx workflow.Context) error { + if w.analyzer.config.ESAnalyzerPause() { + logger := workflow.GetLogger(ctx) + logger.Info("Skipping ESAnalyzer execution cycle since it was paused") + return nil + } + + // list of workflows with avg workflow duration + var wfTypes []WorkflowTypeInfo + err := workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, getWorkflowTypesOptions), + getWorkflowTypesActivity, + ).Get(ctx, &wfTypes) + if err != nil { + return err + } + + for _, info := range wfTypes { + var stuckWorkflows []WorkflowInfo + err := workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, findStuckWorkflowsOptions), + findStuckWorkflowsActivity, + info, + ).Get(ctx, &stuckWorkflows) + if err != nil { + return err + } + if len(stuckWorkflows) == 0 { + continue + } + + err = workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, refreshStuckWorkflowsOptions), + refreshStuckWorkflowsActivity, + stuckWorkflows, + ).Get(ctx, nil) + if err != nil { + return err + } + } + return nil +} + +// refreshStuckWorkflowsFromSameWorkflowType is activity to refresh stuck workflows from the same domain +func (w *Workflow) refreshStuckWorkflowsFromSameWorkflowType( + ctx context.Context, + workflows []WorkflowInfo, +) error { + logger := activity.GetLogger(ctx) + domainID := workflows[0].DomainID + domainEntry, err := w.analyzer.domainCache.GetDomainByID(domainID) + if err != nil { + logger.Error("Failed to get domain entry", zap.Error(err), zap.String("DomainID", domainID)) + return err + } + domainName := domainEntry.GetInfo().Name + clusterName := domainEntry.GetReplicationConfig().ActiveClusterName + + adminClient := w.analyzer.clientBean.GetRemoteAdminClient(clusterName) + for _, workflow := range workflows { + if workflow.DomainID != domainID { + return types.InternalServiceError{ + Message: fmt.Sprintf( + "Inconsistent worklow. Expected domainID: %v, actual: %v", + domainID, + workflow.DomainID), + } + } + + err = adminClient.RefreshWorkflowTasks(ctx, &types.RefreshWorkflowTasksRequest{ + Domain: domainName, + Execution: &types.WorkflowExecution{ + WorkflowID: workflow.WorkflowID, + RunID: workflow.RunID, + }, + }) + + if err != nil { + // Errors might happen if the workflow is already closed. Instead of failing the workflow + // log the error and continue + logger.Error("Failed to refresh stuck workflow", + zap.Error(err), + zap.String("domainName", domainName), + zap.String("workflowID", workflow.WorkflowID), + zap.String("runID", workflow.RunID), + ) + w.analyzer.metricsClient.IncCounter(metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsFailedToRefresh) + } else { + logger.Info("Refreshed stuck workflow", + zap.String("domainName", domainName), + zap.String("workflowID", workflow.WorkflowID), + zap.String("runID", workflow.RunID), + ) + w.analyzer.metricsClient.IncCounter(metrics.ESAnalyzerScope, metrics.ESAnalyzerNumStuckWorkflowsRefreshed) + } + } + + return nil +} + +func getFindStuckWorkflowsQuery( + startDateTime int64, + endTime int64, + domainID string, + workflowType string, + maxNumWorkflows int, +) (string, error) { + wfTypeMarshaled, err := json.Marshal(workflowType) + if err != nil { + return "", err + } + // No need to marshal domainID: it comes from domainEntry and its type is uuid + return fmt.Sprintf(` + { + "query": { + "bool": { + "must": [ + { + "range" : { + "StartTime" : { + "gte" : "%d", + "lte" : "%d" + } + } + }, + { + "match" : { + "DomainID" : "%s" + } + }, + { + "match" : { + "WorkflowType" : %s + } + } + ], + "must_not": { + "exists": { + "field": "CloseTime" + } + } + } + }, + "size": %d + } + `, startDateTime, endTime, domainID, string(wfTypeMarshaled), maxNumWorkflows), nil +} + +// findStuckWorkflows is activity to find open workflows that are live significantly longer than average +func (w *Workflow) findStuckWorkflows(ctx context.Context, info WorkflowTypeInfo) ([]WorkflowInfo, error) { + logger := activity.GetLogger(ctx) + domainEntry, err := w.analyzer.domainCache.GetDomainByID(info.DomainID) + if err != nil { + logger.Error("Failed to get domain entry", zap.Error(err), zap.String("DomainID", info.DomainID)) + return nil, err + } + domainName := domainEntry.GetInfo().Name + + minNumWorkflowsNeeded := int64(w.analyzer.config.ESAnalyzerMinNumWorkflowsForAvg(domainName, info.Name)) + if len(w.analyzer.config.ESAnalyzerLimitToTypes()) > 0 || len(w.analyzer.config.ESAnalyzerLimitToDomains()) > 0 { + logger.Info("Skipping minimum workflow count validation since workflow types were passed from config") + } else if info.NumWorkflows < minNumWorkflowsNeeded { + logger.Warn(fmt.Sprintf( + "Skipping workflow type '%s' because it doesn't have enough(%d) workflows to avg", + info.Name, + minNumWorkflowsNeeded, + )) + return nil, nil + } + + startDateTime := time.Now().Add(-w.analyzer.config.ESAnalyzerTimeWindow()).UnixNano() + + // allow some buffer time to any workflow + maxEndTimeAllowed := time.Now().Add( + -w.analyzer.config.ESAnalyzerBufferWaitTime(domainName, info.Name), + ).UnixNano() + + // if the workflow exec time takes longer than 3x avg time, we refresh + endTime := time.Now().Add( + -time.Duration((int64(info.Duration.AvgExecTimeNanoseconds) * 3)), + ).UnixNano() + if endTime > maxEndTimeAllowed { + endTime = maxEndTimeAllowed + } + + maxNumWorkflows := w.analyzer.config.ESAnalyzerNumWorkflowsToRefresh(domainName, info.Name) + query, err := getFindStuckWorkflowsQuery(startDateTime, endTime, info.DomainID, info.Name, maxNumWorkflows) + if err != nil { + logger.Error("Failed to create ElasticSearch query for stuck workflows", + zap.Error(err), + zap.Int64("startDateTime", startDateTime), + zap.Int64("endTime", endTime), + zap.String("workflowType", info.Name), + zap.Int("maxNumWorkflows", maxNumWorkflows), + ) + return nil, err + } + response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, query) + if err != nil { + logger.Error("Failed to query ElasticSearch for stuck workflows", + zap.Error(err), + zap.String("VisibilityQuery", query), + ) + return nil, err + } + + // Return a simpler structure to reduce activity output size + workflows := []WorkflowInfo{} + if response.Hits.Hits != nil { + for _, hit := range response.Hits.Hits { + workflows = append(workflows, WorkflowInfo{ + DomainID: hit.DomainID, + WorkflowID: hit.WorkflowID, + RunID: hit.RunID, + }) + } + } + + if len(workflows) > 0 { + w.analyzer.metricsClient.AddCounter( + metrics.ESAnalyzerScope, + metrics.ESAnalyzerNumStuckWorkflowsDiscovered, + int64(len(workflows))) + } + + return workflows, nil +} + +func (w *Workflow) getDomainsLimitQuery() (string, error) { + limitToDomains := w.analyzer.config.ESAnalyzerLimitToDomains() + + domainsLimitQuery := "" + if len(limitToDomains) > 0 { + var domainNames []string + err := json.Unmarshal([]byte(limitToDomains), &domainNames) + if err != nil { + return "", err + } + if len(domainNames) > 0 { + domainIDs := []string{} + for _, domainName := range domainNames { + domainEntry, err := w.analyzer.domainCache.GetDomain(domainName) + if err != nil { + return "", err + } + domainIDs = append(domainIDs, domainEntry.GetInfo().ID) + } + + marshaledDomains, err := json.Marshal(domainIDs) + if err != nil { + return "", err + } + domainsLimitQuery = fmt.Sprintf(`, + { + "terms" : { + "DomainID" : %s + } + } + `, string(marshaledDomains)) + } + } + return domainsLimitQuery, nil +} + +func (w *Workflow) getWorkflowTypesQuery() (string, error) { + domainsLimitQuery, err := w.getDomainsLimitQuery() + if err != nil { + return "", nil + } + startDateTime := time.Now().Add(-w.analyzer.config.ESAnalyzerTimeWindow()).UnixNano() + maxNumDomains := w.analyzer.config.ESAnalyzerMaxNumDomains() + maxNumWorkflowTypes := w.analyzer.config.ESAnalyzerMaxNumWorkflowTypes() + + return fmt.Sprintf(` + { + "query": { + "bool": { + "must": [ + { + "range" : { + "StartTime" : { + "gte" : "%d" + } + } + }, + { + "exists": { + "field": "CloseTime" + } + } + %s + ] + } + }, + "size": 0, + "aggs" : { + "%s" : { + "terms" : { "field" : "DomainID", "size": %d }, + "aggs": { + "%s" : { + "terms" : { "field" : "WorkflowType", "size": %d }, + "aggs": { + "duration" : { + "avg" : { + "script" : "(doc['CloseTime'].value - doc['StartTime'].value)" + } + } + } + } + } + } + } + } + `, startDateTime, domainsLimitQuery, domainsAggKey, maxNumDomains, wfTypesAggKey, maxNumWorkflowTypes), nil +} + +func (w *Workflow) getWorkflowTypesFromDynamicConfig( + ctx context.Context, + config string, + logger *zap.Logger, +) ([]WorkflowTypeInfo, error) { + results := []WorkflowTypeInfo{} + var entries []string + err := json.Unmarshal([]byte(config), &entries) + if err != nil { + return nil, err + } + + domainNameToID := map[string]string{} + + for _, domainWFTypePair := range entries { + index := strings.Index(domainWFTypePair, "/") + // -1 no delimiter, 0 means the entry starts with / + if index < 1 || len(domainWFTypePair) <= (index+1) { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("Bad Workflow type entry '%v'", domainWFTypePair), + } + } + domainName := domainWFTypePair[:index] + wfType := domainWFTypePair[index+1:] + if _, ok := domainNameToID[domainName]; !ok { + domainEntry, err := w.analyzer.domainCache.GetDomain(domainName) + if err != nil { + logger.Error("Failed to get domain entry", + zap.Error(err), + zap.String("DomainName", domainName)) + return nil, err + } + domainNameToID[domainName] = domainEntry.GetInfo().ID + } + + results = append(results, WorkflowTypeInfo{ + DomainID: domainNameToID[domainName], + Name: wfType, + }) + } + + return results, nil + +} + +func normalizeDomainInfos(infos []DomainInfo) []WorkflowTypeInfo { + results := []WorkflowTypeInfo{} + for _, domainInfo := range infos { + for _, wfType := range domainInfo.WFTypeContainer.WorkflowTypes { + results = append(results, WorkflowTypeInfo{ + DomainID: domainInfo.DomainID, + Name: wfType.Name, + }) + } + } + return results +} + +// getWorkflowTypes is activity to get workflow type list from ElasticSearch +func (w *Workflow) getWorkflowTypes(ctx context.Context) ([]WorkflowTypeInfo, error) { + logger := activity.GetLogger(ctx) + + limitToTypes := w.analyzer.config.ESAnalyzerLimitToTypes() + if len(limitToTypes) > 0 { + return w.getWorkflowTypesFromDynamicConfig(ctx, limitToTypes, logger) + } + + query, err := w.getWorkflowTypesQuery() + if err != nil { + return nil, err + } + + response, err := w.analyzer.esClient.SearchRaw(ctx, w.analyzer.visibilityIndexName, query) + if err != nil { + logger.Error("Failed to query ElasticSearch to find workflow type info", + zap.Error(err), + zap.String("VisibilityQuery", query), + ) + return nil, err + } + agg, foundAggregation := response.Aggregations[domainsAggKey] + if !foundAggregation { + return nil, types.InternalServiceError{ + Message: fmt.Sprintf("ElasticSearch error: aggeration failed. Query: %v", query), + } + } + + var domains struct { + Buckets []DomainInfo `json:"buckets"` + } + err = json.Unmarshal(agg, &domains) + if err != nil { + return nil, types.InternalServiceError{ + Message: "ElasticSearch error parsing aggeration", + } + } + + // This log is supposed to be fired at max once an hour; it's not invasive and can help + // get some workflow statistics. Size can be quite big though; not sure what the limit is. + logger.Info(fmt.Sprintf("WorkflowType stats: %#v", domains.Buckets)) + + return normalizeDomainInfos(domains.Buckets), nil +} diff --git a/service/worker/scanner/scanner.go b/service/worker/scanner/scanner.go index a6970e78d5a..6ca63a58eab 100644 --- a/service/worker/scanner/scanner.go +++ b/service/worker/scanner/scanner.go @@ -34,7 +34,6 @@ import ( "go.uber.org/cadence/worker" "github.com/uber/cadence/common" - "github.com/uber/cadence/common/backoff" "github.com/uber/cadence/common/cluster" "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" @@ -42,6 +41,7 @@ import ( "github.com/uber/cadence/common/resource" "github.com/uber/cadence/service/worker/scanner/shardscanner" "github.com/uber/cadence/service/worker/scanner/tasklist" + "github.com/uber/cadence/service/worker/workercommon" ) const ( @@ -168,7 +168,9 @@ func (s *Scanner) Start() error { } func (s *Scanner) startScanner(ctx context.Context, options client.StartWorkflowOptions, workflowName string) context.Context { - go s.startWorkflowWithRetry(options, workflowName, nil) + go workercommon.StartWorkflowWithRetry(workflowName, scannerStartUpDelay, s.context.resource, func(client client.Client) error { + return s.startWorkflow(client, options, workflowName, nil) + }) return NewScannerContext(ctx, workflowName, s.context) } @@ -183,16 +185,19 @@ func (s *Scanner) startShardScanner( config.ScannerWFTypeName, shardscanner.NewShardScannerContext(s.context.resource, config), ) - go s.startWorkflowWithRetry( - config.StartWorkflowOptions, + go workercommon.StartWorkflowWithRetry( config.ScannerWFTypeName, - shardscanner.ScannerWorkflowParams{ - Shards: shardscanner.Shards{ - Range: &shardscanner.ShardRange{ - Min: 0, - Max: s.context.cfg.Persistence.NumHistoryShards, + scannerStartUpDelay, + s.context.resource, + func(client client.Client) error { + return s.startWorkflow(client, config.StartWorkflowOptions, config.ScannerWFTypeName, shardscanner.ScannerWorkflowParams{ + Shards: shardscanner.Shards{ + Range: &shardscanner.ShardRange{ + Min: 0, + Max: s.context.cfg.Persistence.NumHistoryShards, + }, }, - }, + }) }) workerTaskListNames = append(workerTaskListNames, config.StartWorkflowOptions.TaskList) @@ -204,13 +209,16 @@ func (s *Scanner) startShardScanner( config.FixerWFTypeName, shardscanner.NewShardFixerContext(s.context.resource, config), ) - go s.startWorkflowWithRetry( - config.StartFixerOptions, + go workercommon.StartWorkflowWithRetry( config.FixerWFTypeName, - shardscanner.FixerWorkflowParams{ - ScannerWorkflowWorkflowID: config.StartWorkflowOptions.ID, - }, - ) + scannerStartUpDelay, + s.context.resource, + func(client client.Client) error { + return s.startWorkflow(client, config.StartFixerOptions, config.FixerWFTypeName, + shardscanner.FixerWorkflowParams{ + ScannerWorkflowWorkflowID: config.StartWorkflowOptions.ID, + }) + }) workerTaskListNames = append(workerTaskListNames, config.StartFixerOptions.TaskList) } @@ -218,36 +226,6 @@ func (s *Scanner) startShardScanner( return ctx, workerTaskListNames } -func (s *Scanner) startWorkflowWithRetry( - options client.StartWorkflowOptions, - workflowType string, - workflowArg interface{}, -) { - // let history / matching service warm up - time.Sleep(scannerStartUpDelay) - res := s.context.resource - sdkClient := client.NewClient( - res.GetSDKClient(), - common.SystemLocalDomainName, - nil, /* &client.Options{} */ - ) - policy := backoff.NewExponentialRetryPolicy(time.Second) - policy.SetMaximumInterval(time.Minute) - policy.SetExpirationInterval(backoff.NoInterval) - throttleRetry := backoff.NewThrottleRetry( - backoff.WithRetryPolicy(policy), - backoff.WithRetryableError(func(_ error) bool { return true }), - ) - err := throttleRetry.Do(context.Background(), func() error { - return s.startWorkflow(sdkClient, options, workflowType, workflowArg) - }) - if err != nil { - res.GetLogger().Fatal("unable to start scanner", tag.WorkflowType(workflowType), tag.Error(err)) - } else { - res.GetLogger().Info("starting scanner", tag.WorkflowType(workflowType)) - } -} - func (s *Scanner) startWorkflow( client client.Client, options client.StartWorkflowOptions, diff --git a/service/worker/service.go b/service/worker/service.go index 9939de83034..42390d8d4ab 100644 --- a/service/worker/service.go +++ b/service/worker/service.go @@ -40,6 +40,7 @@ import ( "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/worker/archiver" "github.com/uber/cadence/service/worker/batcher" + "github.com/uber/cadence/service/worker/esanalyzer" "github.com/uber/cadence/service/worker/failovermanager" "github.com/uber/cadence/service/worker/indexer" "github.com/uber/cadence/service/worker/parentclosepolicy" @@ -72,6 +73,7 @@ type ( IndexerCfg *indexer.Config ScannerCfg *scanner.Config BatcherCfg *batcher.Config + ESAnalyzerCfg *esanalyzer.Config failoverManagerCfg *failovermanager.Config ThrottledLogRPS dynamicconfig.IntPropertyFn PersistenceGlobalMaxQPS dynamicconfig.IntPropertyFn @@ -81,6 +83,7 @@ type ( EnableFailoverManager dynamicconfig.BoolPropertyFn EnableWorkflowShadower dynamicconfig.BoolPropertyFn DomainReplicationMaxRetryDuration dynamicconfig.DurationPropertyFn + EnableESAnalyzer dynamicconfig.BoolPropertyFn } ) @@ -155,8 +158,20 @@ func NewConfig(params *resource.Params) *Config { AdminOperationToken: dc.GetStringProperty(dynamicconfig.AdminOperationToken, common.DefaultAdminOperationToken), ClusterMetadata: params.ClusterMetadata, }, + ESAnalyzerCfg: &esanalyzer.Config{ + ESAnalyzerPause: dc.GetBoolProperty(dynamicconfig.ESAnalyzerPause, common.DefaultESAnalyzerPause), + ESAnalyzerTimeWindow: dc.GetDurationProperty(dynamicconfig.ESAnalyzerTimeWindow, common.DefaultESAnalyzerTimeWindow), + ESAnalyzerMaxNumDomains: dc.GetIntProperty(dynamicconfig.ESAnalyzerMaxNumDomains, common.DefaultESAnalyzerMaxNumDomains), + ESAnalyzerMaxNumWorkflowTypes: dc.GetIntProperty(dynamicconfig.ESAnalyzerMaxNumWorkflowTypes, common.DefaultESAnalyzerMaxNumWorkflowTypes), + ESAnalyzerLimitToTypes: dc.GetStringProperty(dynamicconfig.ESAnalyzerLimitToTypes, common.DefaultESAnalyzerLimitToTypes), + ESAnalyzerLimitToDomains: dc.GetStringProperty(dynamicconfig.ESAnalyzerLimitToDomains, common.DefaultESAnalyzerLimitToDomains), + ESAnalyzerNumWorkflowsToRefresh: dc.GetIntPropertyFilteredByWorkflowType(dynamicconfig.ESAnalyzerNumWorkflowsToRefresh, common.DefaultESAnalyzerNumWorkflowsToRefresh), + ESAnalyzerBufferWaitTime: dc.GetDurationPropertyFilteredByWorkflowType(dynamicconfig.ESAnalyzerBufferWaitTime, common.DefaultESAnalyzerBufferWaitTime), + ESAnalyzerMinNumWorkflowsForAvg: dc.GetIntPropertyFilteredByWorkflowType(dynamicconfig.ESAnalyzerMinNumWorkflowsForAvg, common.DefaultESAnalyzerMinNumWorkflowsForAvg), + }, EnableBatcher: dc.GetBoolProperty(dynamicconfig.EnableBatcher, true), EnableParentClosePolicyWorker: dc.GetBoolProperty(dynamicconfig.EnableParentClosePolicyWorker, true), + EnableESAnalyzer: dc.GetBoolProperty(dynamicconfig.EnableESAnalyzer, false), EnableFailoverManager: dc.GetBoolProperty(dynamicconfig.EnableFailoverManager, true), EnableWorkflowShadower: dc.GetBoolProperty(dynamicconfig.EnableWorkflowShadower, true), ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.WorkerThrottledLogRPS, 20), @@ -212,6 +227,9 @@ func (s *Service) Start() { if s.config.EnableParentClosePolicyWorker() { s.startParentClosePolicyProcessor() } + if s.config.EnableESAnalyzer() { + s.startESAnalyzer() + } if s.config.EnableFailoverManager() { s.startFailoverManager() } @@ -252,6 +270,26 @@ func (s *Service) startParentClosePolicyProcessor() { } } +func (s *Service) startESAnalyzer() { + analyzer := esanalyzer.New( + s.params.PublicClient, + s.GetFrontendClient(), + s.GetClientBean(), + s.params.ESClient, + s.params.ESConfig, + s.GetLogger(), + s.GetMetricsClient(), + s.params.MetricScope, + s.Resource, + s.GetDomainCache(), + s.config.ESAnalyzerCfg, + ) + + if err := analyzer.Start(); err != nil { + s.GetLogger().Fatal("error starting esanalyzer", tag.Error(err)) + } +} + func (s *Service) startBatcher() { params := &batcher.BootstrapParams{ Config: *s.config.BatcherCfg, diff --git a/service/worker/workercommon/util.go b/service/worker/workercommon/util.go new file mode 100644 index 00000000000..0e124c3b434 --- /dev/null +++ b/service/worker/workercommon/util.go @@ -0,0 +1,65 @@ +// Copyright (c) 2017 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package workercommon + +import ( + "context" + "fmt" + "time" + + "go.uber.org/cadence/client" + + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/resource" +) + +func StartWorkflowWithRetry( + workflowType string, + startUpDelay time.Duration, + resource resource.Resource, + startWorkflow func(client client.Client) error, +) error { + // let history / matching service warm up + time.Sleep(startUpDelay) + sdkClient := client.NewClient( + resource.GetSDKClient(), + common.SystemLocalDomainName, + nil, /* &client.Options{} */ + ) + policy := backoff.NewExponentialRetryPolicy(time.Second) + policy.SetMaximumInterval(time.Minute) + policy.SetExpirationInterval(backoff.NoInterval) + throttleRetry := backoff.NewThrottleRetry( + backoff.WithRetryPolicy(policy), + backoff.WithRetryableError(func(_ error) bool { return true }), + ) + err := throttleRetry.Do(context.Background(), func() error { + return startWorkflow(sdkClient) + }) + if err != nil { + panic(fmt.Sprintf("unreachable: %#v", err)) + } else { + resource.GetLogger().Info("starting workflow", tag.WorkflowType(workflowType)) + } + return err +}