Skip to content

Commit

Permalink
ElasticSearch Analyzer (cadence-workflow#4598)
Browse files Browse the repository at this point in the history
  • Loading branch information
demirkayaender authored Nov 5, 2021
1 parent e13b668 commit 2fa2787
Show file tree
Hide file tree
Showing 20 changed files with 1,604 additions and 47 deletions.
21 changes: 21 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,27 @@ const (
TaskTypeCrossCluster TaskType = 6
)

const (
// DefaultESAnalyzerPause controls if we want to dynamically pause the analyzer
DefaultESAnalyzerPause = false
// DefaultESAnalyzerTimeWindow controls how many days to go back for ElasticSearch Analyzer
DefaultESAnalyzerTimeWindow = time.Hour * 24 * 30
// DefaultESAnalyzerMaxNumDomains controls how many domains to check
DefaultESAnalyzerMaxNumDomains = 500
// DefaultESAnalyzerMaxNumWorkflowTypes controls how many workflow types per domain to check
DefaultESAnalyzerMaxNumWorkflowTypes = 100
// DefaultESAnalyzerNumWorkflowsToRefresh controls how many workflows per workflow type should be refreshed
DefaultESAnalyzerNumWorkflowsToRefresh = 100
// DefaultESAnalyzerBufferWaitTime controls min time required to consider a worklow stuck
DefaultESAnalyzerBufferWaitTime = time.Minute * 30
// DefaultESAnalyzerMinNumWorkflowsForAvg controls how many workflows to have at least to rely on workflow run time avg per type
DefaultESAnalyzerMinNumWorkflowsForAvg = 100
// DefaultESAnalyzerLimitToTypes controls if we want to limit ESAnalyzer only to some workflow types
DefaultESAnalyzerLimitToTypes = ""
// DefaultESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains
DefaultESAnalyzerLimitToDomains = ""
)

// StickyTaskConditionFailedErrorMsg error msg for sticky task ConditionFailedError
const StickyTaskConditionFailedErrorMsg = "StickyTaskConditionFailedError"

Expand Down
46 changes: 46 additions & 0 deletions common/dynamicconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ type BoolPropertyFnWithDomainIDAndWorkflowIDFilter func(domainID string, workflo
// BoolPropertyFnWithTaskListInfoFilters is a wrapper to get bool property from dynamic config with three filters: domain, taskList, taskType
type BoolPropertyFnWithTaskListInfoFilters func(domain string, taskList string, taskType int) bool

// IntPropertyFnWithWorkflowTypeFilter is a wrapper to get int property from dynamic config with domain as filter
type IntPropertyFnWithWorkflowTypeFilter func(domainName string, workflowType string) int

// DurationPropertyFnWithDomainFilter is a wrapper to get duration property from dynamic config with domain as filter
type DurationPropertyFnWithWorkflowTypeFilter func(domainName string, workflowType string) time.Duration

// GetProperty gets a interface property and returns defaultValue if property is not found
func (c *Collection) GetProperty(key Key, defaultValue interface{}) PropertyFn {
return func() interface{} {
Expand Down Expand Up @@ -210,6 +216,46 @@ func (c *Collection) GetIntPropertyFilteredByDomain(key Key, defaultValue int) I
}
}

// GetIntPropertyFilteredByWorkflowType gets property with workflow type filter and asserts that it's an integer
func (c *Collection) GetIntPropertyFilteredByWorkflowType(key Key, defaultValue int) IntPropertyFnWithWorkflowTypeFilter {
return func(domainName string, workflowType string) int {
filters := c.toFilterMap(
DomainFilter(domainName),
WorkflowTypeFilter(workflowType),
)
val, err := c.client.GetIntValue(
key,
filters,
defaultValue,
)
if err != nil {
c.logError(key, filters, err)
}
c.logValue(key, filters, val, defaultValue, intCompareEquals)
return val
}
}

// GetDurationPropertyFilteredByWorkflowType gets property with workflow type filter and asserts that it's a duration
func (c *Collection) GetDurationPropertyFilteredByWorkflowType(key Key, defaultValue time.Duration) DurationPropertyFnWithWorkflowTypeFilter {
return func(domainName string, workflowType string) time.Duration {
filters := c.toFilterMap(
DomainFilter(domainName),
WorkflowTypeFilter(workflowType),
)
val, err := c.client.GetDurationValue(
key,
filters,
defaultValue,
)
if err != nil {
c.logError(key, filters, err)
}
c.logValue(key, filters, val, defaultValue, durationCompareEquals)
return val
}
}

// GetIntPropertyFilteredByTaskListInfo gets property with taskListInfo as filters and asserts that it's an integer
func (c *Collection) GetIntPropertyFilteredByTaskListInfo(key Key, defaultValue int) IntPropertyFnWithTaskListInfoFilters {
return func(domain string, taskList string, taskType int) int {
Expand Down
10 changes: 10 additions & 0 deletions common/dynamicconfig/config_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ func GetIntPropertyFilteredByShardID(value int) func(shardID int) int {
return func(shardID int) int { return value }
}

// GetIntPropertyFilteredByWorkflowType returns values as IntPropertyFnWithWorkflowTypeFilters
func GetIntPropertyFilteredByWorkflowType(value int) func(domainName string, workflowType string) int {
return func(domainName string, workflowType string) int { return value }
}

// GetDurationPropertyFilteredByWorkflowType returns values as IntPropertyFnWithWorkflowTypeFilters
func GetDurationPropertyFilteredByWorkflowType(value time.Duration) func(domainName string, workflowType string) time.Duration {
return func(domainName string, workflowType string) time.Duration { return value }
}

// GetFloatPropertyFn returns value as FloatPropertyFn
func GetFloatPropertyFn(value float64) func(opts ...FilterOption) float64 {
return func(...FilterOption) float64 { return value }
Expand Down
65 changes: 64 additions & 1 deletion common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1886,12 +1886,18 @@ const (
// Default value: true
// Allowed filters: N/A
EnableBatcher
// EnableParentClosePolicyWorker is decides whether or not enable system workers for processing parent close policy task
// EnableParentClosePolicyWorker decides whether or not enable system workers for processing parent close policy task
// KeyName: system.enableParentClosePolicyWorker
// Value type: Bool
// Default value: true
// Allowed filters: N/A
EnableParentClosePolicyWorker
// EnableESAnalyzer decides whether to enable system workers for processing ElasticSearch Analyzer
// KeyName: system.enableESAnalyzer
// Value type: Bool
// Default value: false
// Allowed filters: N/A
EnableESAnalyzer
// EnableStickyQuery is indicates if sticky query should be enabled per domain
// KeyName: system.enableStickyQuery
// Value type: Bool
Expand Down Expand Up @@ -2030,6 +2036,52 @@ const (
// TODO: https://github.com/uber/cadence/issues/3861
WorkerBlobIntegrityCheckProbability

// ESAnalyzerPause defines if we want to dynamically pause the analyzer workflow
// KeyName: worker.ESAnalyzerPause
// Value type: bool
// Default value: false
ESAnalyzerPause
// ESAnalyzerTimeWindow defines the time window ElasticSearch Analyzer will consider while taking workflow averages
// KeyName: worker.ESAnalyzerTimeWindow
// Value type: Duration
// Default value: 30 days
ESAnalyzerTimeWindow
// ESAnalyzerMaxNumDomains defines how many domains to check
// KeyName: worker.ESAnalyzerMaxNumDomains
// Value type: int
// Default value: 500
ESAnalyzerMaxNumDomains
// ESAnalyzerMaxNumWorkflowTypes defines how many workflow types to check per domain
// KeyName: worker.ESAnalyzerMaxNumWorkflowTypes
// Value type: int
// Default value: 100
ESAnalyzerMaxNumWorkflowTypes
// ESAnalyzerNumWorkflowsToRefresh controls how many workflows per workflow type should be refreshed per workflow type
// KeyName: worker.ESAnalyzerNumWorkflowsToRefresh
// Value type: Int
// Default value: 100
ESAnalyzerNumWorkflowsToRefresh
// ESAnalyzerBufferWaitTime controls min time required to consider a worklow stuck
// KeyName: worker.ESAnalyzerBufferWaitTime
// Value type: Duration
// Default value: 30 minutes
ESAnalyzerBufferWaitTime
// ESAnalyzerMinNumWorkflowsForAvg controls how many workflows to have at least to rely on workflow run time avg per type
// KeyName: worker.ESAnalyzerMinNumWorkflowsForAvg
// Value type: Int
// Default value: 100
ESAnalyzerMinNumWorkflowsForAvg
// ESAnalyzerLimitToTypes controls if we want to limit ESAnalyzer only to some workflow types
// KeyName: worker.ESAnalyzerLimitToTypes
// Value type: Int
// Default value: "" => means no limitation
ESAnalyzerLimitToTypes
// ESAnalyzerLimitToDomains controls if we want to limit ESAnalyzer only to some domains
// KeyName: worker.ESAnalyzerLimitToDomains
// Value type: Int
// Default value: "" => means no limitation
ESAnalyzerLimitToDomains

// LastKeyForTest must be the last one in this const group for testing purpose
LastKeyForTest
)
Expand Down Expand Up @@ -2073,6 +2125,7 @@ var Keys = map[Key]string{
DisallowQuery: "system.disallowQuery",
EnableBatcher: "worker.enableBatcher",
EnableParentClosePolicyWorker: "system.enableParentClosePolicyWorker",
EnableESAnalyzer: "system.enableESAnalyzer",
EnableFailoverManager: "system.enableFailoverManager",
EnableWorkflowShadower: "system.enableWorkflowShadower",
EnableStickyQuery: "system.enableStickyQuery",
Expand Down Expand Up @@ -2400,6 +2453,16 @@ var Keys = map[Key]string{
EnableArchivalCompression: "worker.EnableArchivalCompression",
WorkerDeterministicConstructionCheckProbability: "worker.DeterministicConstructionCheckProbability",
WorkerBlobIntegrityCheckProbability: "worker.BlobIntegrityCheckProbability",

ESAnalyzerPause: "worker.ESAnalyzerPause",
ESAnalyzerTimeWindow: "worker.ESAnalyzerTimeWindow",
ESAnalyzerMaxNumDomains: "worker.ESAnalyzerMaxNumDomains",
ESAnalyzerMaxNumWorkflowTypes: "worker.ESAnalyzerMaxNumWorkflowTypes",
ESAnalyzerNumWorkflowsToRefresh: "worker.ESAnalyzerNumWorkflowsToRefresh",
ESAnalyzerBufferWaitTime: "worker.ESAnalyzerBufferWaitTime",
ESAnalyzerMinNumWorkflowsForAvg: "worker.ESAnalyzerMinNumWorkflowsForAvg",
ESAnalyzerLimitToTypes: "worker.ESAnalyzerLimitToTypes",
ESAnalyzerLimitToDomains: "worker.ESAnalyzerLimitToDomains",
}

var KeyNames map[string]Key
Expand Down
12 changes: 12 additions & 0 deletions common/dynamicconfig/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ func ParseFilter(filterName string) Filter {
return ClusterName
case "workflowID":
return WorkflowID
case "workflowType":
return WorkflowType
default:
return UnknownFilter
}
Expand All @@ -60,6 +62,7 @@ var filters = []string{
"shardID",
"clusterName",
"workflowID",
"workflowType",
}

const (
Expand All @@ -78,6 +81,8 @@ const (
ClusterName
// WorkflowID is the workflow id
WorkflowID
// WorkflowType is the workflow type name
WorkflowType

// LastFilterTypeForTest must be the last one in this const group for testing purpose
LastFilterTypeForTest
Expand Down Expand Up @@ -134,3 +139,10 @@ func WorkflowIDFilter(workflowID string) FilterOption {
filterMap[WorkflowID] = workflowID
}
}

// WorkflowType filters by workflow type name
func WorkflowTypeFilter(name string) FilterOption {
return func(filterMap map[Filter]interface{}) {
filterMap[WorkflowType] = name
}
}
45 changes: 45 additions & 0 deletions common/elasticsearch/client_v6.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,49 @@ func (c *elasticV6) search(ctx context.Context, p *searchParametersV6) (*elastic
return searchService.Do(ctx)
}

func (c *elasticV6) SearchRaw(ctx context.Context, index string, query string) (*RawResponse, error) {
// There's slight differences between the v6 and v7 response preventing us to move
// this to a common function
esResult, err := c.client.Search(index).Source(query).Do(ctx)
if err != nil {
return nil, err
}

if esResult.Error != nil {
return nil, types.InternalServiceError{
Message: fmt.Sprintf("ElasticSearch Error: %#v", esResult.Error),
}
} else if esResult.TimedOut {
return nil, types.InternalServiceError{
Message: fmt.Sprintf("ElasticSearch Error: Request timed out: %v ms", esResult.TookInMillis),
}
}

result := RawResponse{
TookInMillis: esResult.TookInMillis,
}

result.Hits = SearchHits{
TotalHits: esResult.TotalHits(),
}
if esResult.Hits != nil && len(esResult.Hits.Hits) > 0 {
result.Hits.Hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0, len(esResult.Hits.Hits))
for _, hit := range esResult.Hits.Hits {
workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit)
result.Hits.Hits = append(result.Hits.Hits, workflowExecutionInfo)
}
}

if len(esResult.Aggregations) > 0 {
result.Aggregations = make(map[string]json.RawMessage, len(esResult.Aggregations))
for key, agg := range esResult.Aggregations {
result.Aggregations[key] = *agg
}
}

return &result, nil
}

func (c *elasticV6) searchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error) {
return c.client.Search(index).Source(query).Do(ctx)
}
Expand Down Expand Up @@ -609,6 +652,8 @@ func (c *elasticV6) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
}

record := &p.InternalVisibilityWorkflowExecutionInfo{
DomainID: source.DomainID,
WorkflowType: source.WorkflowType,
WorkflowID: source.WorkflowID,
RunID: source.RunID,
TypeName: source.WorkflowType,
Expand Down
39 changes: 39 additions & 0 deletions common/elasticsearch/client_v7.go
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,43 @@ func (c *elasticV7) search(ctx context.Context, p *searchParametersV7) (*elastic
return searchService.Do(ctx)
}

func (c *elasticV7) SearchRaw(ctx context.Context, index string, query string) (*RawResponse, error) {
// There's slight differences between the v6 and v7 response preventing us to move
// this to a common function
esResult, err := c.client.Search(index).Source(query).Do(ctx)
if err != nil {
return nil, err
}

if esResult.Error != nil {
return nil, types.InternalServiceError{
Message: fmt.Sprintf("ElasticSearch Error: %#v", esResult.Error),
}
} else if esResult.TimedOut {
return nil, types.InternalServiceError{
Message: fmt.Sprintf("ElasticSearch Error: Request timed out: %v ms", esResult.TookInMillis),
}
}

result := RawResponse{
TookInMillis: esResult.TookInMillis,
Aggregations: esResult.Aggregations,
}

result.Hits = SearchHits{
TotalHits: esResult.TotalHits(),
}
if esResult.Hits != nil && len(esResult.Hits.Hits) > 0 {
result.Hits.Hits = make([]*p.InternalVisibilityWorkflowExecutionInfo, 0, len(esResult.Hits.Hits))
for _, hit := range esResult.Hits.Hits {
workflowExecutionInfo := c.convertSearchResultToVisibilityRecord(hit)
result.Hits.Hits = append(result.Hits.Hits, workflowExecutionInfo)
}
}

return &result, nil
}

func (c *elasticV7) searchWithDSL(ctx context.Context, index, query string) (*elastic.SearchResult, error) {
return c.client.Search(index).Source(query).Do(ctx)
}
Expand Down Expand Up @@ -608,6 +645,8 @@ func (c *elasticV7) convertSearchResultToVisibilityRecord(hit *elastic.SearchHit
}

record := &p.InternalVisibilityWorkflowExecutionInfo{
DomainID: source.DomainID,
WorkflowType: source.WorkflowType,
WorkflowID: source.WorkflowID,
RunID: source.RunID,
TypeName: source.WorkflowType,
Expand Down
15 changes: 15 additions & 0 deletions common/elasticsearch/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package elasticsearch

import (
"context"
"encoding/json"
"fmt"
"time"

Expand Down Expand Up @@ -58,6 +59,8 @@ type (
Search(ctx context.Context, request *SearchRequest) (*SearchResponse, error)
// SearchByQuery is the generic purpose searching
SearchByQuery(ctx context.Context, request *SearchByQueryRequest) (*SearchResponse, error)
// SearchRaw is for searching with raw json. Returns RawResult object which is subset of ESv6 and ESv7 response
SearchRaw(ctx context.Context, index, query string) (*RawResponse, error)
// ScanByQuery is also generic purpose searching, but implemented with ScrollService of ElasticSearch,
// which is more performant for pagination, but comes with some limitation of in-parallel requests.
ScanByQuery(ctx context.Context, request *ScanByQueryRequest) (*SearchResponse, error)
Expand Down Expand Up @@ -211,6 +214,7 @@ type (
WorkflowID string
RunID string
WorkflowType string
DomainID string
StartTime int64
ExecutionTime int64
CloseTime int64
Expand All @@ -223,4 +227,15 @@ type (
NumClusters int16
Attr map[string]interface{}
}

SearchHits struct {
TotalHits int64
Hits []*p.InternalVisibilityWorkflowExecutionInfo
}

RawResponse struct {
TookInMillis int64
Hits SearchHits
Aggregations map[string]json.RawMessage
}
)
Loading

0 comments on commit 2fa2787

Please sign in to comment.