Skip to content

Commit

Permalink
Domain specific rate limiter (cadence-workflow#2208)
Browse files Browse the repository at this point in the history
Implementation and benchmarks, not used yet
  • Loading branch information
shreyassrivatsan authored Jul 17, 2019
1 parent 8982453 commit 4c63136
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 4 deletions.
45 changes: 44 additions & 1 deletion common/quotas/limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package quotas

import (
"fmt"
"testing"
"time"

Expand All @@ -31,8 +32,8 @@ import (
)

const (
defaultDomain = "test"
defaultRps = 1200
defaultDomain = "test"
_minBurst = 10000
)

Expand All @@ -57,3 +58,45 @@ func BenchmarkRateLimiter(b *testing.B) {
policy.Allow()
}
}

func BenchmarkDomainRateLimiter(b *testing.B) {
policy := newDomainRateLimiter(defaultRps)
for n := 0; n < b.N; n++ {
policy.Allow(defaultDomain)
}
}

func BenchmarkDomainRateLimiter20Domains(b *testing.B) {
numDomains := 100
policy := newDomainRateLimiter(defaultRps)
domains := getDomains(numDomains)
for n := 0; n < b.N; n++ {
policy.Allow(domains[n%numDomains])
}
}

func BenchmarkDomainRateLimiter100Domains(b *testing.B) {
numDomains := 100
policy := newDomainRateLimiter(defaultRps)
domains := getDomains(numDomains)
for n := 0; n < b.N; n++ {
policy.Allow(domains[n%numDomains])
}
}

func BenchmarkDomainRateLimiter1000Domains(b *testing.B) {
numDomains := 100
policy := newDomainRateLimiter(defaultRps)
domains := getDomains(numDomains)
for n := 0; n < b.N; n++ {
policy.Allow(domains[n%numDomains])
}
}

func getDomains(n int) []string {
domains := make([]string, n)
for i := 0; i < n; i++ {
domains = append(domains, fmt.Sprintf("domains%v", i))
}
return domains
}
72 changes: 69 additions & 3 deletions common/quotas/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@
package quotas

import (
"time"
"sync"

"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/common/tokenbucket"
"golang.org/x/time/rate"
)

const domainRps = 400

type simpleRateLimitPolicy struct {
tb tokenbucket.TokenBucket
}
Expand All @@ -40,6 +44,68 @@ func (s *simpleRateLimitPolicy) Allow() bool {
return ok
}

func (s *simpleRateLimitPolicy) Wait(d time.Duration) bool {
return s.tb.Consume(1, d)
// DomainRateLimitPolicy indicates a domain specific rate limit policy
type DomainRateLimitPolicy struct {
sync.RWMutex
rps dynamicconfig.IntPropertyFn
domainLimiters map[string]*rate.Limiter
globalLimiter Policy
}

// NewDomainRateLimiter returns a new domain quota rate limiter. This is about
// an order of magnitude slower than
func NewDomainRateLimiter(rps RPSFunc) *DomainRateLimitPolicy {
rl := &DomainRateLimitPolicy{
domainLimiters: map[string]*rate.Limiter{},
globalLimiter: NewDynamicRateLimiter(rps),
}
return rl
}

func newDomainRateLimiter(rps int) *DomainRateLimitPolicy {
initialRps := float64(rps)
rl := &DomainRateLimitPolicy{
domainLimiters: map[string]*rate.Limiter{},
globalLimiter: NewRateLimiter(&initialRps, _defaultRPSTTL, rps),
}
return rl
}

// Allow attempts to allow a request to go through. The method returns
// immediately with a true or false indicating if the request can make
// progress
func (d *DomainRateLimitPolicy) Allow(domain string) bool {
// check if we have a per-domain limiter - if not create a default one for
// the domain.
d.RLock()
limiter, ok := d.domainLimiters[domain]
d.RUnlock()

if !ok {
// create a new limiter
domainLimiter := rate.NewLimiter(rate.Limit(domainRps), domainRps)

// verify that it is needed and add to map
d.Lock()
limiter, ok = d.domainLimiters[domain]
if !ok {
d.domainLimiters[domain] = domainLimiter
limiter = domainLimiter
}
d.Unlock()
}

// take a reservation with the domain limiter first
rsv := limiter.Reserve()
if !rsv.OK() {
return false
}

// ensure that the reservation does not break the global rate limit, if it
// does, cancel the reservation and do not allow to proceed.
if !d.globalLimiter.Allow() {
rsv.Cancel()
return false
}
return true
}

0 comments on commit 4c63136

Please sign in to comment.