Skip to content

Commit

Permalink
dynamic config: disable eventsv2 when using SQL persistence (cadence-…
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Mar 22, 2019
1 parent aba0009 commit a2df255
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
15 changes: 15 additions & 0 deletions common/service/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,13 @@ package config

import "fmt"

const (
// StoreTypeSQL refers to sql based storage as persistence store
StoreTypeSQL = "sql"
// StoreTypeCassandra refers to cassandra as persistence store
StoreTypeCassandra = "cassandra"
)

// SetMaxQPS sets the MaxQPS value for the given datastore
func (c *Persistence) SetMaxQPS(key string, qps int) {
ds, ok := c.DataStores[key]
Expand All @@ -35,6 +42,14 @@ func (c *Persistence) SetMaxQPS(key string, qps int) {
ds.SQL.MaxQPS = qps
}

// DefaultStoreType returns the storeType for the default persistence store
func (c *Persistence) DefaultStoreType() string {
if c.DataStores[c.DefaultStore].SQL != nil {
return StoreTypeSQL
}
return StoreTypeCassandra
}

// Validate validates the persistence config
func (c *Persistence) Validate() error {
stores := []string{c.DefaultStore, c.VisibilityStore}
Expand Down
2 changes: 1 addition & 1 deletion host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ func (c *cadenceImpl) startHistory(rpHosts []string, startWG *sync.WaitGroup, en
params.MetricsClient = metrics.NewClient(params.MetricScope, service.GetMetricsServiceIdx(params.Name, params.Logger))
params.DynamicConfig = dynamicconfig.NewNopClient()
service := service.New(params)
historyConfig := history.NewConfig(dynamicconfig.NewNopCollection(), c.numberOfHistoryShards, c.enableVisibilityToKafka)
historyConfig := history.NewConfig(dynamicconfig.NewNopCollection(), c.numberOfHistoryShards, c.enableVisibilityToKafka, config.StoreTypeCassandra)
historyConfig.HistoryMgrNumConns = dynamicconfig.GetIntPropertyFn(c.numberOfHistoryShards)
historyConfig.ExecutionMgrNumConns = dynamicconfig.GetIntPropertyFn(c.numberOfHistoryShards)
historyConfig.EnableEventsV2 = dynamicconfig.GetBoolPropertyFnFilteredByDomain(enableEventsV2)
Expand Down
5 changes: 3 additions & 2 deletions service/history/historyTestBase.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/uber/cadence/common/persistence"
persistencetests "github.com/uber/cadence/common/persistence/persistence-tests"
"github.com/uber/cadence/common/service"
cconfig "github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
)

Expand Down Expand Up @@ -511,14 +512,14 @@ func (s *TestShardContext) GetCurrentTime(cluster string) time.Time {
// NewDynamicConfigForTest return dc for test
func NewDynamicConfigForTest() *Config {
dc := dynamicconfig.NewNopCollection()
config := NewConfig(dc, 1, false)
config := NewConfig(dc, 1, false, cconfig.StoreTypeCassandra)
return config
}

// NewDynamicConfigForEventsV2Test with enableEventsV2 = true
func NewDynamicConfigForEventsV2Test() *Config {
dc := dynamicconfig.NewNopCollection()
config := NewConfig(dc, 1, false)
config := NewConfig(dc, 1, false, cconfig.StoreTypeCassandra)
config.EnableEventsV2 = dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, true)
return config
}
Expand Down
14 changes: 11 additions & 3 deletions service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ type Config struct {
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilityToKafka bool) *Config {
return &Config{
func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilityToKafka bool, storeType string) *Config {
cfg := &Config{
NumberOfShards: numberOfShards,
RPS: dc.GetIntProperty(dynamicconfig.HistoryRPS, 3000),
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
Expand Down Expand Up @@ -224,6 +224,13 @@ func NewConfig(dc *dynamicconfig.Collection, numberOfShards int, enableVisibilit

ThrottledLogRPS: dc.GetIntProperty(dynamicconfig.HistoryThrottledLogRPS, 20),
}

if storeType == config.StoreTypeSQL {
// SQL based stores don't have support for historyv2 yet, so set default to false
cfg.EnableEventsV2 = dc.GetBoolPropertyFnWithDomainFilter(dynamicconfig.EnableEventsV2, false)
}

return cfg
}

// GetShardID return the corresponding shard ID for a given workflow ID
Expand All @@ -244,7 +251,8 @@ func NewService(params *service.BootstrapParams) common.Daemon {
params.UpdateLoggerWithServiceName(common.HistoryServiceName)
config := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger),
params.PersistenceConfig.NumHistoryShards,
params.ESConfig.Enable)
params.ESConfig.Enable,
params.PersistenceConfig.DefaultStoreType())
params.ThrottledLogger = logging.NewThrottledLogger(params.Logger, config.ThrottledLogRPS)
return &Service{
params: params,
Expand Down

0 comments on commit a2df255

Please sign in to comment.