Skip to content

Commit

Permalink
Refactor: not require db visibility when ES visibility is provided (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng authored Jun 3, 2021
1 parent 723ecf5 commit 208edf4
Show file tree
Hide file tree
Showing 25 changed files with 574 additions and 514 deletions.
28 changes: 5 additions & 23 deletions common/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,10 @@ type (
// DefaultStore is the name of the default data store to use
DefaultStore string `yaml:"defaultStore" validate:"nonzero"`
// VisibilityStore is the name of the datastore to be used for visibility records
VisibilityStore string `yaml:"visibilityStore" validate:"nonzero"`
// Must provide one of VisibilityStore and AdvancedVisibilityStore
VisibilityStore string `yaml:"visibilityStore"`
// AdvancedVisibilityStore is the name of the datastore to be used for visibility records
// Must provide one of VisibilityStore and AdvancedVisibilityStore
AdvancedVisibilityStore string `yaml:"advancedVisibilityStore"`
// HistoryMaxConns is the desired number of conns to history store. Value specified
// here overrides the MaxConns config specified as part of datastore
Expand All @@ -140,10 +142,10 @@ type (
NumHistoryShards int `yaml:"numHistoryShards" validate:"nonzero"`
// DataStores contains the configuration for all datastores
DataStores map[string]DataStore `yaml:"datastores"`
// VisibilityConfig is config for visibility sampling
VisibilityConfig *VisibilityConfig `yaml:"-" json:"-"`
// TODO: move dynamic config out of static config
// TransactionSizeLimit is the largest allowed transaction size
TransactionSizeLimit dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
// TODO: move dynamic config out of static config
// ErrorInjectionRate is the the rate for injecting random error
ErrorInjectionRate dynamicconfig.FloatPropertyFn `yaml:"-" json:"-"`
}
Expand All @@ -161,26 +163,6 @@ type (
ElasticSearch *ElasticSearchConfig `yaml:"elasticsearch"`
}

// VisibilityConfig is config for visibility
VisibilityConfig struct {
// EnableSampling for visibility
EnableSampling dynamicconfig.BoolPropertyFn `yaml:"-" json:"-"`
// EnableReadFromClosedExecutionV2 read closed from v2 table
EnableReadFromClosedExecutionV2 dynamicconfig.BoolPropertyFn `yaml:"-" json:"-"`
// VisibilityOpenMaxQPS max QPS for record open workflows
VisibilityOpenMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter `yaml:"-" json:"-"`
// VisibilityClosedMaxQPS max QPS for record closed workflows
VisibilityClosedMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter `yaml:"-" json:"-"`
// VisibilityListMaxQPS max QPS for list workflow
VisibilityListMaxQPS dynamicconfig.IntPropertyFnWithDomainFilter `yaml:"-" json:"-"`
// ESIndexMaxResultWindow ElasticSearch index setting max_result_window
ESIndexMaxResultWindow dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
// MaxQPS is overall max QPS
MaxQPS dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
// ValidSearchAttributes is legal indexed keys that can be used in list APIs
ValidSearchAttributes dynamicconfig.MapPropertyFn `yaml:"-" json:"-"`
}

// Cassandra contains configuration to connect to Cassandra cluster
// Deprecated: please use NoSQL instead, the structure is backward-compatible
Cassandra = NoSQL
Expand Down
13 changes: 11 additions & 2 deletions common/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,17 @@ func (c *Persistence) DefaultStoreType() string {

// Validate validates the persistence config
func (c *Persistence) Validate() error {
stores := []string{c.DefaultStore, c.VisibilityStore}
for _, st := range stores {
dbStoreKeys := []string{c.DefaultStore}

if _, ok := c.DataStores[c.VisibilityStore]; ok {
dbStoreKeys = append(dbStoreKeys, c.VisibilityStore)
} else {
if _, ok := c.DataStores[c.AdvancedVisibilityStore]; !ok {
return fmt.Errorf(" Must provide one of VisibilityStore and AdvancedVisibilityStore")
}
}

for _, st := range dbStoreKeys {
ds, ok := c.DataStores[st]
if !ok {
return fmt.Errorf("persistence config: missing config for datastore %v", st)
Expand Down
11 changes: 9 additions & 2 deletions common/persistence/client/bean.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"sync"

"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/resource/config"
"github.com/uber/cadence/common/service"
)

type (
Expand Down Expand Up @@ -73,6 +75,8 @@ type (
// NewBeanFromFactory crate a new store bean using factory
func NewBeanFromFactory(
factory Factory,
params *service.BootstrapParams,
resourceConfig *config.ResourceConfig,
) (*BeanImpl, error) {

metadataMgr, err := factory.NewMetadataManager()
Expand All @@ -85,7 +89,7 @@ func NewBeanFromFactory(
return nil, err
}

visibilityMgr, err := factory.NewVisibilityManager()
visibilityMgr, err := factory.NewVisibilityManager(params, resourceConfig)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -309,7 +313,10 @@ func (s *BeanImpl) Close() {

s.metadataManager.Close()
s.taskManager.Close()
s.visibilityManager.Close()
if s.visibilityManager != nil {
// visibilityManager can be nil
s.visibilityManager.Close()
}
s.domainReplicationQueueManager.Close()
s.shardManager.Close()
s.historyManager.Close()
Expand Down
108 changes: 95 additions & 13 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,22 @@ package client
import (
"sync"

"github.com/uber/cadence/common/log/tag"

"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/metrics"
p "github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/persistence/cassandra"
"github.com/uber/cadence/common/persistence/elasticsearch"
"github.com/uber/cadence/common/persistence/serialization"
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/quotas"
rc "github.com/uber/cadence/common/resource/config"
"github.com/uber/cadence/common/service"
)

type (
Expand All @@ -55,7 +59,7 @@ type (
// NewExecutionManager returns a new execution manager for a given shardID
NewExecutionManager(shardID int) (p.ExecutionManager, error)
// NewVisibilityManager returns a new visibility manager
NewVisibilityManager() (p.VisibilityManager, error)
NewVisibilityManager(params *service.BootstrapParams, resourceConfig *rc.ResourceConfig) (p.VisibilityManager, error)
// NewDomainReplicationQueueManager returns a new queue for domain replication
NewDomainReplicationQueueManager() (p.QueueManager, error)
}
Expand Down Expand Up @@ -246,13 +250,81 @@ func (f *factoryImpl) NewExecutionManager(shardID int) (p.ExecutionManager, erro
}

// NewVisibilityManager returns a new visibility manager
func (f *factoryImpl) NewVisibilityManager() (p.VisibilityManager, error) {
visConfig := f.config.VisibilityConfig
func (f *factoryImpl) NewVisibilityManager(
params *service.BootstrapParams,
resourceConfig *rc.ResourceConfig,
) (p.VisibilityManager, error) {
if resourceConfig.EnableReadVisibilityFromES == nil && resourceConfig.AdvancedVisibilityWritingMode == nil {
// No need to create visibility manager as no read/write needed
return nil, nil
}
var visibilityFromDB, visibilityFromES p.VisibilityManager
var err error
if params.PersistenceConfig.VisibilityStore != "" {
visibilityFromDB, err = f.newDBVisibilityManager(resourceConfig)
if err != nil {
return nil, err
}
}
if params.PersistenceConfig.AdvancedVisibilityStore != "" {
visibilityIndexName := params.ESConfig.Indices[common.VisibilityAppName]
visibilityProducer, err := params.MessagingClient.NewProducer(common.VisibilityAppName)
if err != nil {
f.logger.Fatal("Creating visibility producer failed", tag.Error(err))
}
visibilityFromES = newESVisibilityManager(
visibilityIndexName, params.ESClient, resourceConfig, visibilityProducer, params.MetricsClient, f.logger,
)
}
return p.NewVisibilityDualManager(
visibilityFromDB,
visibilityFromES,
resourceConfig.EnableReadVisibilityFromES,
resourceConfig.AdvancedVisibilityWritingMode,
f.logger,
), nil
}

// NewESVisibilityManager create a visibility manager for ElasticSearch
// In history, it only needs kafka producer for writing data;
// In frontend, it only needs ES client and related config for reading data
func newESVisibilityManager(
indexName string,
esClient es.GenericClient,
visibilityConfig *rc.ResourceConfig,
producer messaging.Producer,
metricsClient metrics.Client,
log log.Logger,
) p.VisibilityManager {

visibilityFromESStore := elasticsearch.NewElasticSearchVisibilityStore(esClient, indexName, producer, visibilityConfig, log)
visibilityFromES := p.NewVisibilityManagerImpl(visibilityFromESStore, log)

// wrap with rate limiter
if visibilityConfig.PersistenceMaxQPS != nil && visibilityConfig.PersistenceMaxQPS() != 0 {
esRateLimiter := quotas.NewDynamicRateLimiter(
func() float64 {
return float64(visibilityConfig.PersistenceMaxQPS())
},
)
visibilityFromES = p.NewVisibilityPersistenceRateLimitedClient(visibilityFromES, esRateLimiter, log)
}
if metricsClient != nil {
// wrap with metrics
visibilityFromES = elasticsearch.NewVisibilityMetricsClient(visibilityFromES, metricsClient, log)
}

return visibilityFromES
}

func (f *factoryImpl) newDBVisibilityManager(
visibilityConfig *rc.ResourceConfig,
) (p.VisibilityManager, error) {
enableReadFromClosedExecutionV2 := false
if visConfig != nil && visConfig.EnableReadFromClosedExecutionV2 != nil {
enableReadFromClosedExecutionV2 = visConfig.EnableReadFromClosedExecutionV2()
if visibilityConfig.EnableReadDBVisibilityFromClosedExecutionV2 != nil {
enableReadFromClosedExecutionV2 = visibilityConfig.EnableReadDBVisibilityFromClosedExecutionV2()
} else {
f.logger.Warn("missing visibility and EnableReadFromClosedExecutionV2 config", tag.Value(visConfig))
f.logger.Warn("missing visibility and EnableReadFromClosedExecutionV2 config", tag.Value(visibilityConfig))
}

ds := f.datastores[storeTypeVisibility]
Expand All @@ -267,8 +339,12 @@ func (f *factoryImpl) NewVisibilityManager() (p.VisibilityManager, error) {
if ds.ratelimit != nil {
result = p.NewVisibilityPersistenceRateLimitedClient(result, ds.ratelimit, f.logger)
}
if visConfig != nil && visConfig.EnableSampling() {
result = p.NewVisibilitySamplingClient(result, visConfig, f.metricsClient, f.logger)
if visibilityConfig.EnableDBVisibilitySampling != nil && visibilityConfig.EnableDBVisibilitySampling() {
result = p.NewVisibilitySamplingClient(result, &p.SamplingConfig{
VisibilityClosedMaxQPS: visibilityConfig.WriteDBVisibilityClosedMaxQPS,
VisibilityListMaxQPS: visibilityConfig.DBVisibilityListMaxQPS,
VisibilityOpenMaxQPS: visibilityConfig.WriteDBVisibilityOpenMaxQPS,
}, f.metricsClient, f.logger)
}
if f.metricsClient != nil {
result = p.NewVisibilityPersistenceMetricsClient(result, f.metricsClient, f.logger)
Expand Down Expand Up @@ -332,7 +408,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
f.logger,
getSQLParser(f.logger, common.EncodingType(defaultCfg.SQL.EncodingType), decodingTypes...))
default:
f.logger.Fatal("invalid config: one of cassandra or sql params must be specified")
f.logger.Fatal("invalid config: one of nosql or sql params must be specified for defaultDataStore")
}

for _, st := range storeTypes {
Expand All @@ -341,7 +417,13 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
}
}

visibilityCfg := f.config.DataStores[f.config.VisibilityStore]
visibilityCfg, ok := f.config.DataStores[f.config.VisibilityStore]
if !ok {
f.logger.Info("no visibilityStore is configured, will use advancedVisibilityStore")
// NOTE: f.datastores[storeTypeVisibility] will be nil
return
}

if visibilityCfg.Cassandra != nil {
f.logger.Warn("Cassandra config is deprecated, please use NoSQL with pluginName of cassandra.")
}
Expand All @@ -360,7 +442,7 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
f.logger,
getSQLParser(f.logger, common.EncodingType(visibilityCfg.SQL.EncodingType), decodingTypes...))
default:
f.logger.Fatal("invalid config: one of cassandra or sql params must be specified")
f.logger.Fatal("invalid config: one of nosql or sql params must be specified for visibilityStore")
}

f.datastores[storeTypeVisibility] = visibilityDataStore
Expand Down
62 changes: 0 additions & 62 deletions common/persistence/elasticsearch/esVisibilityManager.go

This file was deleted.

6 changes: 3 additions & 3 deletions common/persistence/elasticsearch/esVisibilityStore.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ import (
"github.com/uber/cadence/.gen/go/indexer"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/definition"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/messaging"
p "github.com/uber/cadence/common/persistence"
rc "github.com/uber/cadence/common/resource/config"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/common/types/mapper/thrift"
)
Expand All @@ -58,7 +58,7 @@ type (
index string
producer messaging.Producer
logger log.Logger
config *config.VisibilityConfig
config *rc.ResourceConfig
}
)

Expand All @@ -69,7 +69,7 @@ func NewElasticSearchVisibilityStore(
esClient es.GenericClient,
index string,
producer messaging.Producer,
config *config.VisibilityConfig,
config *rc.ResourceConfig,
logger log.Logger,
) p.VisibilityStore {
return &esVisibilityStore{
Expand Down
Loading

0 comments on commit 208edf4

Please sign in to comment.