Skip to content

Commit

Permalink
Implement ItemBucketRateLimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
mborsz committed Feb 18, 2020
1 parent 208df38 commit 6846a0a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/util/workqueue/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_test(
deps = [
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/golang.org/x/time/rate:go_default_library",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,54 @@ func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
func (r *BucketRateLimiter) Forget(item interface{}) {
}

// ItemBucketRateLimiter implements a workqueue ratelimiter API using standard rate.Limiter.
// Each key is using a separate limiter.
type ItemBucketRateLimiter struct {
r rate.Limit
burst int

limitersLock sync.Mutex
limiters map[interface{}]*rate.Limiter
}

var _ RateLimiter = &ItemBucketRateLimiter{}

// NewItemBucketRateLimiter creates new ItemBucketRateLimiter instance.
func NewItemBucketRateLimiter(r rate.Limit, burst int) *ItemBucketRateLimiter {
return &ItemBucketRateLimiter{
r: r,
burst: burst,
limiters: make(map[interface{}]*rate.Limiter),
}
}

// When returns a time.Duration which we need to wait before item is processed.
func (r *ItemBucketRateLimiter) When(item interface{}) time.Duration {
r.limitersLock.Lock()
defer r.limitersLock.Unlock()

limiter, ok := r.limiters[item]
if !ok {
limiter = rate.NewLimiter(r.r, r.burst)
r.limiters[item] = limiter
}

return limiter.Reserve().Delay()
}

// NumRequeues returns always 0 (doesn't apply to ItemBucketRateLimiter).
func (r *ItemBucketRateLimiter) NumRequeues(item interface{}) int {
return 0
}

// Forget removes item from the internal state.
func (r *ItemBucketRateLimiter) Forget(item interface{}) {
r.limitersLock.Lock()
defer r.limitersLock.Unlock()

delete(r.limiters, item)
}

// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
// dealing with max failures and expiration are up to the caller
type ItemExponentialFailureRateLimiter struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package workqueue
import (
"testing"
"time"

"golang.org/x/time/rate"
)

func TestItemExponentialFailureRateLimiter(t *testing.T) {
Expand Down Expand Up @@ -96,6 +98,33 @@ func TestItemExponentialFailureRateLimiterOverFlow(t *testing.T) {

}

func TestItemBucketRateLimiter(t *testing.T) {
limiter := NewItemBucketRateLimiter(rate.Every(100*time.Millisecond), 1)

// Use initial burst.
if got := limiter.When("one"); got != 0 {
t.Errorf("limiter.When(two) = %v; want 0", got)
}
for i := 0; i < 1000; i++ {
limiter.When("one")
}
// limiter.When should be at this point = 1000 * rate.Limit.
// We set the threshold 1s below this value to avoid race conditions.
if got, want := limiter.When("one"), 990*100*time.Millisecond; got < want {
t.Errorf("limiter.When(one) = %v; want at least %v", got, want)
}

if got := limiter.When("two"); got != 0 {
t.Errorf("limiter.When(two) = %v; want 0", got)
}

limiter.Forget("one")
// Use new budget.
if got := limiter.When("one"); got != 0 {
t.Errorf("limiter.When(two) = %v; want 0", got)
}
}

func TestItemFastSlowRateLimiter(t *testing.T) {
limiter := NewItemFastSlowRateLimiter(5*time.Millisecond, 10*time.Second, 3)

Expand Down

0 comments on commit 6846a0a

Please sign in to comment.