Skip to content

Commit

Permalink
Refactor advanced visibility config (cadence-workflow#2400)
Browse files Browse the repository at this point in the history
  • Loading branch information
vancexu authored Aug 31, 2019
1 parent faec6e5 commit e8c8e50
Show file tree
Hide file tree
Showing 27 changed files with 322 additions and 117 deletions.
27 changes: 19 additions & 8 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,24 +146,35 @@ func (s *server) startService() common.Daemon {
log.Fatalf("need to provide an endpoint config for PublicClient")
}

params.ESConfig = &s.cfg.ElasticSearch
params.ESConfig.Enable = dc.GetBoolProperty(dynamicconfig.EnableVisibilityToKafka, params.ESConfig.Enable)() // force override with dynamic config
advancedVisMode := dc.GetStringProperty(
dynamicconfig.AdvancedVisibilityWritingMode,
common.GetDefaultAdvancedVisibilityWritingMode(params.PersistenceConfig.IsAdvancedVisibilityConfigExist()),
)()
isAdvancedVisEnabled := advancedVisMode != common.AdvancedVisibilityWritingModeOff
if params.ClusterMetadata.IsGlobalDomainEnabled() {
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, true, params.ESConfig.Enable)
} else if params.ESConfig.Enable {
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, false, params.ESConfig.Enable)
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, true, isAdvancedVisEnabled)
} else if isAdvancedVisEnabled {
params.MessagingClient = messaging.NewKafkaClient(&s.cfg.Kafka, params.MetricsClient, zap.NewNop(), params.Logger, params.MetricScope, false, isAdvancedVisEnabled)
} else {
params.MessagingClient = nil
}

// enable visibility to kafka and enable visibility to elastic search are using one config
if params.ESConfig.Enable {
esClient, err := elasticsearch.NewClient(&s.cfg.ElasticSearch)
if isAdvancedVisEnabled {
// verify config of advanced visibility store
advancedVisStoreKey := s.cfg.Persistence.AdvancedVisibilityStore
advancedVisStore, ok := s.cfg.Persistence.DataStores[advancedVisStoreKey]
if !ok {
log.Fatalf("not able to find advanced visibility store in config: %v", advancedVisStoreKey)
}

params.ESConfig = advancedVisStore.ElasticSearch
esClient, err := elasticsearch.NewClient(params.ESConfig)
if err != nil {
log.Fatalf("error creating elastic search client: %v", err)
}
params.ESClient = esClient

// verify index name
indexName, ok := params.ESConfig.Indices[common.VisibilityAppName]
if !ok || len(indexName) == 0 {
log.Fatalf("elastic search config missing visibility index")
Expand Down
10 changes: 10 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,3 +118,13 @@ const (
// ArchivalPaused is the status for pausing archival
ArchivalPaused = "paused"
)

// enum for dynamic config AdvancedVisibilityWritingMode
const (
// AdvancedVisibilityWritingModeOff means do not write to advanced visibility store
AdvancedVisibilityWritingModeOff = "off"
// AdvancedVisibilityWritingModeOn means only write to advanced visibility store
AdvancedVisibilityWritingModeOn = "on"
// AdvancedVisibilityWritingModeDual means write to both normal visibility and advanced visibility store
AdvancedVisibilityWritingModeDual = "dual"
)
1 change: 0 additions & 1 deletion common/elasticsearch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
// Config for connecting to ElasticSearch
type (
Config struct {
Enable bool `yaml:enable`
URL url.URL `yaml:url`
Indices map[string]string `yaml:indices`
}
Expand Down
7 changes: 4 additions & 3 deletions common/persistence/elasticsearch/esVisibilityManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,17 @@ func NewESVisibilityManager(indexName string, esClient es.Client, config *config

if config != nil {
// wrap with rate limiter
if config.MaxQPS() != 0 {
if config.MaxQPS != nil && config.MaxQPS() != 0 {
esRateLimiter := quotas.NewDynamicRateLimiter(
func() float64 {
return float64(config.MaxQPS())
},
)
visibilityFromES = p.NewVisibilityPersistenceRateLimitedClient(visibilityFromES, esRateLimiter, log)
}
// wrap with advanced rate limit for list
visibilityFromES = p.NewVisibilitySamplingClient(visibilityFromES, config, metricsClient, log)
if config.EnableSampling != nil && config.EnableSampling() {
visibilityFromES = p.NewVisibilitySamplingClient(visibilityFromES, config, metricsClient, log)
}
}
if metricsClient != nil {
// wrap with metrics
Expand Down
35 changes: 30 additions & 5 deletions common/persistence/visibilityWrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
package persistence

import (
"fmt"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/service/dynamicconfig"
)

Expand All @@ -29,18 +33,21 @@ type (
visibilityManager VisibilityManager
esVisibilityManager VisibilityManager
enableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithDomainFilter
advancedVisWritingMode dynamicconfig.StringPropertyFn
}
)

var _ VisibilityManager = (*visibilityManagerWrapper)(nil)

// NewVisibilityManagerWrapper create a visibility manager that operate on DB or ElasticSearch based on dynamic config.
func NewVisibilityManagerWrapper(visibilityManager, esVisibilityManager VisibilityManager,
enableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithDomainFilter) VisibilityManager {
enableReadVisibilityFromES dynamicconfig.BoolPropertyFnWithDomainFilter,
advancedVisWritingMode dynamicconfig.StringPropertyFn) VisibilityManager {
return &visibilityManagerWrapper{
visibilityManager: visibilityManager,
esVisibilityManager: esVisibilityManager,
enableReadVisibilityFromES: enableReadVisibilityFromES,
advancedVisWritingMode: advancedVisWritingMode,
}
}

Expand All @@ -58,21 +65,39 @@ func (v *visibilityManagerWrapper) GetName() string {
}

func (v *visibilityManagerWrapper) RecordWorkflowExecutionStarted(request *RecordWorkflowExecutionStartedRequest) error {
if v.esVisibilityManager != nil {
switch v.advancedVisWritingMode() {
case common.AdvancedVisibilityWritingModeOff:
return v.visibilityManager.RecordWorkflowExecutionStarted(request)
case common.AdvancedVisibilityWritingModeOn:
return v.esVisibilityManager.RecordWorkflowExecutionStarted(request)
case common.AdvancedVisibilityWritingModeDual:
if err := v.esVisibilityManager.RecordWorkflowExecutionStarted(request); err != nil {
return err
}
return v.visibilityManager.RecordWorkflowExecutionStarted(request)
default:
return &shared.InternalServiceError{
Message: fmt.Sprintf("Unknown advanced visibility writing mode: %s", v.advancedVisWritingMode()),
}
}
return v.visibilityManager.RecordWorkflowExecutionStarted(request)
}

func (v *visibilityManagerWrapper) RecordWorkflowExecutionClosed(request *RecordWorkflowExecutionClosedRequest) error {
if v.esVisibilityManager != nil {
switch v.advancedVisWritingMode() {
case common.AdvancedVisibilityWritingModeOff:
return v.visibilityManager.RecordWorkflowExecutionClosed(request)
case common.AdvancedVisibilityWritingModeOn:
return v.esVisibilityManager.RecordWorkflowExecutionClosed(request)
case common.AdvancedVisibilityWritingModeDual:
if err := v.esVisibilityManager.RecordWorkflowExecutionClosed(request); err != nil {
return err
}
return v.visibilityManager.RecordWorkflowExecutionClosed(request)
default:
return &shared.InternalServiceError{
Message: fmt.Sprintf("Unknown advanced visibility writing mode: %s", v.advancedVisWritingMode()),
}
}
return v.visibilityManager.RecordWorkflowExecutionClosed(request)
}

func (v *visibilityManagerWrapper) UpsertWorkflowExecution(request *UpsertWorkflowExecutionRequest) error {
Expand Down
6 changes: 4 additions & 2 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ type (
Kafka messaging.KafkaConfig `yaml:"kafka"`
// Archival is the config for archival
Archival Archival `yaml:"archival"`
// ElasticSearch is config for connecting to ElasticSearch
ElasticSearch elasticsearch.Config `yaml:"elasticsearch"`
// PublicClient is config for connecting to cadence frontend
PublicClient PublicClient `yaml:"publicClient"`
// DynamicConfigClient is the config for setting up the file based dynamic config client
Expand Down Expand Up @@ -123,6 +121,8 @@ type (
DefaultStore string `yaml:"defaultStore" validate:"nonzero"`
// VisibilityStore is the name of the datastore to be used for visibility records
VisibilityStore string `yaml:"visibilityStore" validate:"nonzero"`
// AdvancedVisibilityStore is the name of the datastore to be used for visibility records
AdvancedVisibilityStore string `yaml:"advancedVisibilityStore"`
// HistoryMaxConns is the desired number of conns to history store. Value specified
// here overrides the MaxConns config specified as part of datastore
HistoryMaxConns int `yaml:"historyMaxConns"`
Expand All @@ -143,6 +143,8 @@ type (
Cassandra *Cassandra `yaml:"cassandra"`
// SQL contains the config for a SQL based datastore
SQL *SQL `yaml:"sql"`
// ElasticSearch contains the config for a ElasticSearch datastore
ElasticSearch *elasticsearch.Config `yaml:"elasticsearch"`
}

// VisibilityConfig is config for visibility sampling
Expand Down
5 changes: 5 additions & 0 deletions common/service/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,8 @@ func (c *Persistence) Validate() error {
}
return nil
}

// IsAdvancedVisibilityConfigExist returns whether user specified advancedVisibilityStore in config
func (c *Persistence) IsAdvancedVisibilityConfigExist() bool {
return len(c.AdvancedVisibilityStore) != 0
}
6 changes: 3 additions & 3 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var keys = map[Key]string{
EnableNewKafkaClient: "system.enableNewKafkaClient",
EnableVisibilitySampling: "system.enableVisibilitySampling",
EnableReadFromClosedExecutionV2: "system.enableReadFromClosedExecutionV2",
EnableVisibilityToKafka: "system.enableVisibilityToKafka",
AdvancedVisibilityWritingMode: "system.advancedVisibilityWritingMode",
EnableReadVisibilityFromES: "system.enableReadVisibilityFromES",
HistoryArchivalStatus: "system.historyArchivalStatus",
EnableReadFromHistoryArchival: "system.enableReadFromHistoryArchival",
Expand Down Expand Up @@ -235,8 +235,8 @@ const (
EnableVisibilitySampling
// EnableReadFromClosedExecutionV2 is key for enable read from cadence_visibility.closed_executions_v2
EnableReadFromClosedExecutionV2
// EnableVisibilityToKafka is key for enable kafka
EnableVisibilityToKafka
// AdvancedVisibilityWritingMode is key for how to write to advanced visibility
AdvancedVisibilityWritingMode
// EmitShardDiffLog whether emit the shard diff log
EmitShardDiffLog
// EnableReadVisibilityFromES is key for enable read from elastic search
Expand Down
9 changes: 9 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,3 +501,12 @@ func ConvertIndexedValueTypeToThriftType(fieldType interface{}, logger log.Logge
return fieldType.(workflow.IndexedValueType) // it will panic and been captured by logger
}
}

// GetDefaultAdvancedVisibilityWritingMode get default advancedVisibilityWritingMode based on
// whether related config exists in static config file.
func GetDefaultAdvancedVisibilityWritingMode(isAdvancedVisConfigExist bool) string {
if isAdvancedVisConfigExist {
return AdvancedVisibilityWritingModeOn
}
return AdvancedVisibilityWritingModeOff
}
12 changes: 0 additions & 12 deletions config/development.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,6 @@ kafka:
cluster: test
cadence-visibility-dev-dlq:
cluster: test
applications:
visibility:
topic: cadence-visibility-dev
dlq-topic: cadence-visibility-dev-dlq

elasticsearch:
enable: false
url:
scheme: "http"
host: "127.0.0.1:9200"
indices:
visibility: cadence-visibility-dev

publicClient:
hostPort: "localhost:7933"
Expand Down
135 changes: 135 additions & 0 deletions config/development_es.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
persistence:
defaultStore: cass-default
visibilityStore: cass-visibility
advancedVisibilityStore: es-visibility
numHistoryShards: 4
datastores:
cass-default:
cassandra:
hosts: "127.0.0.1"
keyspace: "cadence"
consistency: "One"
cass-visibility:
cassandra:
hosts: "127.0.0.1"
keyspace: "cadence_visibility"
consistency: "One"
es-visibility:
elasticsearch:
url:
scheme: "http"
host: "127.0.0.1:9200"
indices:
visibility: cadence-visibility-dev

ringpop:
name: cadence
bootstrapMode: hosts
bootstrapHosts: ["127.0.0.1:7933", "127.0.0.1:7934", "127.0.0.1:7935"]
maxJoinDuration: 30s

services:
frontend:
rpc:
port: 7933
bindOnLocalHost: true
metrics:
statsd:
hostPort: "127.0.0.1:8125"
prefix: "cadence"
pprof:
port: 7936

matching:
rpc:
port: 7935
bindOnLocalHost: true
metrics:
statsd:
hostPort: "127.0.0.1:8125"
prefix: "cadence"
pprof:
port: 7938

history:
rpc:
port: 7934
bindOnLocalHost: true
metrics:
statsd:
hostPort: "127.0.0.1:8125"
prefix: "cadence"
pprof:
port: 7937

worker:
rpc:
port: 7939
bindOnLocalHost: true
metrics:
statsd:
hostPort: "127.0.0.1:8125"
prefix: "cadence"
pprof:
port: 7940

clusterMetadata:
enableGlobalDomain: false
failoverVersionIncrement: 10
masterClusterName: "active"
currentClusterName: "active"
clusterInformation:
active:
enabled: true
initialFailoverVersion: 0
rpcName: "cadence-frontend"
rpcAddress: "localhost:7933"

dcRedirectionPolicy:
policy: "noop"
toDC: ""

archival:
history:
status: "enabled"
enableRead: true
provider:
filestore:
fileMode: "0666"
dirMode: "0766"
visibility:
status: "disabled"
enableRead: false

domainDefaults:
archival:
history:
status: "enabled"
URI: "file:///tmp/cadence_archival/development"
visibility:
status: "disabled"

kafka:
tls:
enabled: false
clusters:
test:
brokers:
- 127.0.0.1:9092
topics:
cadence-visibility-dev:
cluster: test
cadence-visibility-dev-dlq:
cluster: test
applications:
visibility:
topic: cadence-visibility-dev
dlq-topic: cadence-visibility-dev-dlq

publicClient:
hostPort: "localhost:7933"

dynamicConfigClient:
filepath: "config/dynamicconfig/development_es.yaml"
pollInterval: "10s"

Loading

0 comments on commit e8c8e50

Please sign in to comment.