Skip to content

Commit

Permalink
Add per domain limits dynamic config (cadence-workflow#2277)
Browse files Browse the repository at this point in the history
  • Loading branch information
shreyassrivatsan authored Jul 25, 2019
1 parent 3197e1b commit f37857a
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 8 deletions.
3 changes: 3 additions & 0 deletions common/quotas/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import "context"
// RPSFunc returns a float64 as the RPS
type RPSFunc func() float64

// RPSKeyFunc returns a float64 as the RPS for the given key
type RPSKeyFunc func(key string) float64

// Info corresponds to information required to determine rate limits
type Info struct {
Domain string
Expand Down
2 changes: 1 addition & 1 deletion common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func getPolicy() Policy {
func() float64 {
return float64(defaultRps)
},
func() float64 {
func(domain string) float64 {
return float64(defaultRps)
},
)
Expand Down
10 changes: 7 additions & 3 deletions common/quotas/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (
type MultiStageRateLimiter struct {
sync.RWMutex
rps RPSFunc
domainRPS RPSFunc
domainRPS RPSKeyFunc
domainLimiters map[string]*DynamicRateLimiter
globalLimiter *DynamicRateLimiter
}

// NewMultiStageRateLimiter returns a new domain quota rate limiter. This is about
// an order of magnitude slower than
func NewMultiStageRateLimiter(rps RPSFunc, domainRps RPSFunc) *MultiStageRateLimiter {
func NewMultiStageRateLimiter(rps RPSFunc, domainRps RPSKeyFunc) *MultiStageRateLimiter {
rl := &MultiStageRateLimiter{
rps: rps,
domainRPS: domainRps,
Expand All @@ -62,7 +62,11 @@ func (d *MultiStageRateLimiter) Allow(info Info) bool {

if !ok {
// create a new limiter
domainLimiter := NewDynamicRateLimiter(d.domainRPS)
domainLimiter := NewDynamicRateLimiter(
func() float64 {
return d.domainRPS(domain)
},
)

// verify that it is needed and add to map
d.Lock()
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type Config struct {
ESIndexMaxResultWindow dynamicconfig.IntPropertyFn
HistoryMaxPageSize dynamicconfig.IntPropertyFnWithDomainFilter
RPS dynamicconfig.IntPropertyFn
DomainRPS dynamicconfig.IntPropertyFn
DomainRPS dynamicconfig.IntPropertyFnWithDomainFilter
MaxIDLengthLimit dynamicconfig.IntPropertyFn
EnableClientVersionCheck dynamicconfig.BoolPropertyFn
MinRetentionDays dynamicconfig.IntPropertyFn
Expand Down Expand Up @@ -97,7 +97,7 @@ func NewConfig(dc *dynamicconfig.Collection, numHistoryShards int, enableVisibil
ESIndexMaxResultWindow: dc.GetIntProperty(dynamicconfig.FrontendESIndexMaxResultWindow, 10000),
HistoryMaxPageSize: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendHistoryMaxPageSize, common.GetHistoryMaxPageSize),
RPS: dc.GetIntProperty(dynamicconfig.FrontendRPS, 1200),
DomainRPS: dc.GetIntProperty(dynamicconfig.FrontendDomainRPS, 1200),
DomainRPS: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendDomainRPS, 1200),
MaxIDLengthLimit: dc.GetIntProperty(dynamicconfig.MaxIDLengthLimit, 1000),
HistoryMgrNumConns: dc.GetIntProperty(dynamicconfig.FrontendHistoryMgrNumConns, 10),
MaxBadBinaries: dc.GetIntPropertyFilteredByDomain(dynamicconfig.FrontendMaxBadBinaries, 10),
Expand Down
4 changes: 2 additions & 2 deletions service/frontend/workflowHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ func NewWorkflowHandler(
func() float64 {
return float64(config.RPS())
},
func() float64 {
return float64(config.DomainRPS())
func(domain string) float64 {
return float64(config.DomainRPS(domain))
},
),
versionChecker: &versionChecker{checkVersion: config.EnableClientVersionCheck()},
Expand Down

0 comments on commit f37857a

Please sign in to comment.