Skip to content

Commit

Permalink
Make persistence config dynamic for system scanner (cadence-workflow#…
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored Feb 20, 2020
1 parent 9975e7d commit 909c329
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 43 deletions.
11 changes: 6 additions & 5 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/uber/cadence/common/persistence/sql"
"github.com/uber/cadence/common/quotas"
"github.com/uber/cadence/common/service/config"
"github.com/uber/cadence/common/service/dynamicconfig"
)

type (
Expand Down Expand Up @@ -138,7 +139,7 @@ func NewFactory(
logger: logger,
clusterName: clusterName,
}
limiters := buildRatelimiters(cfg)
limiters := buildRateLimiters(cfg)
factory.init(clusterName, limiters)
return factory
}
Expand Down Expand Up @@ -322,18 +323,18 @@ func (f *factoryImpl) init(clusterName string, limiters map[string]quotas.Limite
f.datastores[storeTypeVisibility] = visibilityDataStore
}

func buildRatelimiters(cfg *config.Persistence) map[string]quotas.Limiter {
func buildRateLimiters(cfg *config.Persistence) map[string]quotas.Limiter {
result := make(map[string]quotas.Limiter, len(cfg.DataStores))
for dsName, ds := range cfg.DataStores {
qps := 0
var qps dynamicconfig.IntPropertyFn
if ds.Cassandra != nil {
qps = ds.Cassandra.MaxQPS
}
if ds.SQL != nil {
qps = ds.SQL.MaxQPS
}
if qps > 0 {
result[dsName] = quotas.NewSimpleRateLimiter(qps)
if qps != nil {
result[dsName] = quotas.NewDynamicRateLimiter(func() float64 { return float64(qps()) })
}
}
return result
Expand Down
4 changes: 2 additions & 2 deletions common/service/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ type (
// Datacenter is the data center filter arg for cassandra
Datacenter string `yaml:"datacenter"`
// MaxQPS is the max request rate to this datastore
MaxQPS int `yaml:"maxQPS"`
MaxQPS dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
// MaxConns is the max number of connections to this datastore for a single keyspace
MaxConns int `yaml:"maxConns"`
// TLS configuration
Expand All @@ -211,7 +211,7 @@ type (
// ConnectAttributes is a set of key-value attributes to be sent as part of connect data_source_name url
ConnectAttributes map[string]string `yaml:"connectAttributes"`
// MaxQPS the max request rate on this datastore
MaxQPS int `yaml:"maxQPS"`
MaxQPS dynamicconfig.IntPropertyFn `yaml:"-" json:"-"`
// MaxConns the max number of connections to this datastore
MaxConns int `yaml:"maxConns"`
// MaxIdleConns is the max number of idle connections to this datastore
Expand Down
8 changes: 6 additions & 2 deletions common/service/config/persistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@

package config

import "fmt"
import (
"fmt"

"github.com/uber/cadence/common/service/dynamicconfig"
)

const (
// StoreTypeSQL refers to sql based storage as persistence store
Expand All @@ -30,7 +34,7 @@ const (
)

// SetMaxQPS sets the MaxQPS value for the given datastore
func (c *Persistence) SetMaxQPS(key string, qps int) {
func (c *Persistence) SetMaxQPS(key string, qps dynamicconfig.IntPropertyFn) {
ds, ok := c.DataStores[key]
if !ok {
return
Expand Down
2 changes: 1 addition & 1 deletion service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func NewService(
serviceConfig := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger), params.PersistenceConfig.NumHistoryShards, isAdvancedVisExistInConfig)

params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS())
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS)
params.PersistenceConfig.VisibilityConfig = &config.VisibilityConfig{
VisibilityListMaxQPS: serviceConfig.VisibilityListMaxQPS,
EnableSampling: serviceConfig.EnableVisibilitySampling,
Expand Down
2 changes: 1 addition & 1 deletion service/history/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func NewService(
params.PersistenceConfig.IsAdvancedVisibilityConfigExist())

params.PersistenceConfig.HistoryMaxConns = serviceConfig.HistoryMgrNumConns()
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS())
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS)
params.PersistenceConfig.VisibilityConfig = &config.VisibilityConfig{
VisibilityOpenMaxQPS: serviceConfig.VisibilityOpenMaxQPS,
VisibilityClosedMaxQPS: serviceConfig.VisibilityClosedMaxQPS,
Expand Down
5 changes: 1 addition & 4 deletions service/matching/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ func NewService(
) (resource.Resource, error) {

serviceConfig := NewConfig(dynamicconfig.NewCollection(params.DynamicConfig, params.Logger))
params.PersistenceConfig.SetMaxQPS(
params.PersistenceConfig.DefaultStore,
serviceConfig.PersistenceMaxQPS(),
)
params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, serviceConfig.PersistenceMaxQPS)
serviceResource, err := resource.New(
params,
common.MatchingServiceName,
Expand Down
1 change: 0 additions & 1 deletion service/worker/scanner/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ func New(
) *Scanner {

cfg := params.Config
cfg.Persistence.SetMaxQPS(cfg.Persistence.DefaultStore, cfg.PersistenceMaxQPS())
zapLogger, err := zap.NewProduction()
if err != nil {
resource.GetLogger().Fatal("failed to initialize zap logger", tag.Error(err))
Expand Down
65 changes: 39 additions & 26 deletions service/worker/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,11 @@ type (
Service struct {
resource.Resource

status int32
stopC chan struct{}
params *service.BootstrapParams
config *Config
scannerResource resource.Resource // separate out scanner resource because it requires a different persistence QPS
status int32
stopC chan struct{}
params *service.BootstrapParams
config *Config
}

// Config contains all the service config for worker
Expand All @@ -76,33 +77,23 @@ func NewService(
) (resource.Resource, error) {

serviceConfig := NewConfig(params)

params.PersistenceConfig.SetMaxQPS(
params.PersistenceConfig.DefaultStore,
serviceConfig.ReplicationCfg.PersistenceMaxQPS(),
)

serviceResource, err := resource.New(
params,
common.WorkerServiceName,
serviceConfig.ThrottledLogRPS,
func(
persistenceBean persistenceClient.Bean,
logger log.Logger,
) (persistence.VisibilityManager, error) {
return persistenceBean.GetVisibilityManager(), nil
},
)
serviceResource, err := getResource(params, serviceConfig.ThrottledLogRPS, serviceConfig.ReplicationCfg.PersistenceMaxQPS)
if err != nil {
return nil, err
}
scannerResource, err := getResource(params, serviceConfig.ThrottledLogRPS, serviceConfig.ScannerCfg.PersistenceMaxQPS)
if err != nil {
return nil, err
}

return &Service{
Resource: serviceResource,
status: common.DaemonStatusInitialized,
config: serviceConfig,
params: params,
stopC: make(chan struct{}),

scannerResource: scannerResource,
status: common.DaemonStatusInitialized,
config: serviceConfig,
params: params,
stopC: make(chan struct{}),
}, nil
}

Expand Down Expand Up @@ -169,6 +160,7 @@ func (s *Service) Start() {
logger.Info("worker starting", tag.ComponentWorker)

s.Resource.Start()
s.scannerResource.Start()

s.ensureSystemDomainExists()
s.startScanner()
Expand Down Expand Up @@ -202,6 +194,7 @@ func (s *Service) Stop() {
close(s.stopC)

s.Resource.Stop()
s.scannerResource.Stop()

s.params.Logger.Info("worker stopped", tag.ComponentWorker)
}
Expand Down Expand Up @@ -239,7 +232,7 @@ func (s *Service) startScanner() {
Config: *s.config.ScannerCfg,
TallyScope: s.params.MetricScope,
}
if err := scanner.New(s.Resource, params).Start(); err != nil {
if err := scanner.New(s.scannerResource, params).Start(); err != nil {
s.GetLogger().Fatal("error starting scanner", tag.Error(err))
}
}
Expand Down Expand Up @@ -341,3 +334,23 @@ func (s *Service) registerSystemDomain() {
s.GetLogger().Fatal("failed to register system domain", tag.Error(err))
}
}

func getResource(
params *service.BootstrapParams,
throttledLogRPS dynamicconfig.IntPropertyFn,
persistenceMaxQPS dynamicconfig.IntPropertyFn,
) (resource.Resource, error) {

params.PersistenceConfig.SetMaxQPS(params.PersistenceConfig.DefaultStore, persistenceMaxQPS)
return resource.New(
params,
common.WorkerServiceName,
throttledLogRPS,
func(
persistenceBean persistenceClient.Bean,
logger log.Logger,
) (persistence.VisibilityManager, error) {
return persistenceBean.GetVisibilityManager(), nil
},
)
}
2 changes: 1 addition & 1 deletion tools/cli/domainUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ func initializeMetadataMgr(
) persistence.MetadataManager {

pConfig := serviceConfig.Persistence
pConfig.SetMaxQPS(pConfig.DefaultStore, dependencyMaxQPS)
pConfig.SetMaxQPS(pConfig.DefaultStore, dynamicconfig.GetIntPropertyFn(dependencyMaxQPS))
pConfig.VisibilityConfig = &config.VisibilityConfig{
VisibilityListMaxQPS: dynamicconfig.GetIntPropertyFilteredByDomain(dependencyMaxQPS),
EnableSampling: dynamicconfig.GetBoolPropertyFn(false), // not used by domain operation
Expand Down

0 comments on commit 909c329

Please sign in to comment.