Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

Commit

Permalink
Revert "PIP-675: Add support for persistent store"
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 authored Dec 10, 2019
1 parent 965bdf0 commit 19da569
Show file tree
Hide file tree
Showing 25 changed files with 359 additions and 844 deletions.
34 changes: 0 additions & 34 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,40 +140,6 @@ Examples when using `Behavior = DURATION_IS_GREGORIAN`
* If `Duration = 0` (Minutes) then the rate limit will reset to `Current = 0` at the end of the minute the rate limit was created.
* If `Duration = 4` (Months) then the rate limit will reset to `Current = 0` at the end of the month the rate limit was created.

## Gubernator as a library
If you are using golang, you can use Gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
on top. We do this internally here at mailgun with a service we creatively
called `ratelimits` which keeps track of the limits imposed on a per account
basis. In this way you can utilize the power and speed of Gubernator but still
layer business logic and integrate domain specific problems into your rate
limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
Gubernator server would. All you need to do is provide the GRPC server instance
and tell Gubernator where the peers in your cluster are located. The
`cmd/gubernator/main.go` is a great example of how to use Gubernator as a
library.

### Optional Disk Persistence
While the Gubernator server currently doesn't directly support disk
persistence, the Gubernator library does provide interfaces through which
library users can implement persistence. The Gubernator library has two
interfaces available for disk persistence. Depending on the use case an
implementor can implement the [Loader](/store.go) interface and only support persistence
of rate limits at startup and shutdown, or users can implement the [Store](/store.go)
interface and Gubernator will continuously call `OnChange()` and `Get()` to
keep the in memory cache and persistent store up to date with the latest rate
limit data. Both interfaces *can* be implemented simultaneously to ensure data
is always saved to persistent storage.

For those who choose to implement the `Store` interface, it is not required to
store ALL the rate limits received via `OnChange()`. For instance; If you wish
to support rate limit durations longer than a minute, day or month, calls to
`OnChange()` can check the duration of a rate limit and decide to only persist
those rate limits that have durations over a self determined limit.

### API
All methods are accessed via GRPC but are also exposed via HTTP using the
[GRPC Gateway](https://github.com/grpc-ecosystem/grpc-gateway)
Expand Down
127 changes: 41 additions & 86 deletions algorithms.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,25 @@ limitations under the License.
package gubernator

import (
"github.com/mailgun/gubernator/cache"
"time"
)

// Implements token bucket algorithm for rate limiting. https://en.wikipedia.org/wiki/Token_bucket
func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
item, ok := c.GetItem(r.HashKey())
if s != nil {
if !ok {
// Check our store for the item
if item, ok = s.Get(r); ok {
c.Add(item)
}
}
}

func tokenBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
item, ok := c.Get(r.HashKey())
if ok {
// The following semantic allows for requests of more than the limit to be rejected, but subsequent
// requests within the same duration that are under the limit to succeed. IE: client attempts to
// send 1000 emails but 100 is their limit. The request is rejected as over the limit, but since we
// don't store OVER_LIMIT in the cache the client can retry within the same rate limit duration with
// 100 emails and the request will succeed.

rl, ok := item.Value.(*RateLimitResp)
rl, ok := item.(*RateLimitResp)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
if s != nil {
s.Remove(r.HashKey())
}
return tokenBucket(s, c, r)
}

// Client is only interested in retrieving the current status
if r.Hits == 0 {
return rl, nil
}

if s != nil {
defer func() {
s.OnChange(r, item)
}()
return tokenBucket(c, r)
}

// If we are already at the limit
Expand All @@ -66,6 +44,11 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
return rl, nil
}

// Client is only interested in retrieving the current status
if r.Hits == 0 {
return rl, nil
}

// If requested hits takes the remainder
if rl.Remaining == r.Hits {
rl.Remaining = 0
Expand All @@ -84,8 +67,9 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

// Add a new rate limit to the cache
expire := MillisecondNow() + r.Duration
expire := cache.MillisecondNow() + r.Duration
if r.Behavior == Behavior_DURATION_IS_GREGORIAN {
var err error
expire, err = GregorianExpiration(time.Now(), r.Duration)
if err != nil {
return nil, err
Expand All @@ -104,42 +88,27 @@ func tokenBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
status.Remaining = r.Limit
}

item = &CacheItem{
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: status,
ExpireAt: expire,
}

c.Add(item)
if s != nil {
s.OnChange(r, item)
}
c.Add(r.HashKey(), status, expire)
return status, nil
}

// Implements leaky bucket algorithm for rate limiting https://en.wikipedia.org/wiki/Leaky_bucket
func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err error) {
now := MillisecondNow()
item, ok := c.GetItem(r.HashKey())
if s != nil {
if !ok {
// Check our store for the item
if item, ok = s.Get(r); ok {
c.Add(item)
}
}
func leakyBucket(c cache.Cache, r *RateLimitReq) (*RateLimitResp, error) {
type LeakyBucket struct {
Limit int64
Duration int64
LimitRemaining int64
TimeStamp int64
}

now := cache.MillisecondNow()
item, ok := c.Get(r.HashKey())
if ok {
b, ok := item.Value.(*LeakyBucketItem)
b, ok := item.(*LeakyBucket)
if !ok {
// Client switched algorithms; perhaps due to a migration?
c.Remove(r.HashKey())
if s != nil {
s.Remove(r.HashKey())
}
return leakyBucket(s, c, r)
return leakyBucket(c, r)
}

duration := r.Duration
Expand All @@ -165,25 +134,19 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
elapsed := now - b.TimeStamp
leak := int64(elapsed / rate)

b.Remaining += leak
if b.Remaining > b.Limit {
b.Remaining = b.Limit
b.LimitRemaining += leak
if b.LimitRemaining > b.Limit {
b.LimitRemaining = b.Limit
}

rl := &RateLimitResp{
Limit: b.Limit,
Remaining: b.Remaining,
Remaining: b.LimitRemaining,
Status: Status_UNDER_LIMIT,
}

if s != nil {
defer func() {
s.OnChange(r, item)
}()
}

// If we are already at the limit
if b.Remaining == 0 {
if b.LimitRemaining == 0 {
rl.Status = Status_OVER_LIMIT
rl.ResetTime = now + rate
return rl, nil
Expand All @@ -195,15 +158,15 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

// If requested hits takes the remainder
if b.Remaining == r.Hits {
b.Remaining = 0
if b.LimitRemaining == r.Hits {
b.LimitRemaining = 0
rl.Remaining = 0
return rl, nil
}

// If requested is more than available, then return over the limit
// without updating the bucket.
if r.Hits > b.Remaining {
if r.Hits > b.LimitRemaining {
rl.Status = Status_OVER_LIMIT
rl.ResetTime = now + rate
return rl, nil
Expand All @@ -214,8 +177,8 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
return rl, nil
}

b.Remaining -= r.Hits
rl.Remaining = b.Remaining
b.LimitRemaining -= r.Hits
rl.Remaining = b.LimitRemaining
c.UpdateExpiration(r.HashKey(), now*duration)
return rl, nil
}
Expand All @@ -233,11 +196,11 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
}

// Create a new leaky bucket
b := LeakyBucketItem{
Remaining: r.Limit - r.Hits,
Limit: r.Limit,
Duration: duration,
TimeStamp: now,
b := LeakyBucket{
LimitRemaining: r.Limit - r.Hits,
Limit: r.Limit,
Duration: duration,
TimeStamp: now,
}

rl := RateLimitResp{
Expand All @@ -251,18 +214,10 @@ func leakyBucket(s Store, c Cache, r *RateLimitReq) (resp *RateLimitResp, err er
if r.Hits > r.Limit {
rl.Status = Status_OVER_LIMIT
rl.Remaining = 0
b.Remaining = 0
b.LimitRemaining = 0
}

item = &CacheItem{
ExpireAt: now + duration,
Algorithm: r.Algorithm,
Key: r.HashKey(),
Value: &b,
}
c.Add(item)
if s != nil {
s.OnChange(r, item)
}
c.Add(r.HashKey(), &b, now+duration)

return &rl, nil
}
12 changes: 12 additions & 0 deletions architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,17 @@ limit request if the cluster is large enough. GLOBAL should only be used for
extremely high volume rate limits that don't scale well with the traditional
non `GLOBAL` behavior.

## Gubernator as a library
If you are using golang, you can use gubernator as a library. This is useful if
you wish to implement a rate limit service with your own company specific model
on top. We do this internally here at mailgun with a service we creatively
called `ratelimits` which keeps track of the limits imposed on a per account
basis. In this way you can utilize the power and speed of gubernator but still
layer business logic and integrate domain specific problems into your rate
limiting service.

When you use the library, your service becomes a full member of the cluster
participating in the same consistent hashing and caching as a stand alone
gubernator server would. All you need to do is provide the GRPC server instance
and tell gubernator where the peers in your cluster are located.

4 changes: 2 additions & 2 deletions benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ package gubernator_test

import (
"context"
guber "github.com/mailgun/gubernator/v2"
"github.com/mailgun/gubernator/v2/cluster"
guber "github.com/mailgun/gubernator"
"github.com/mailgun/gubernator/cluster"
"github.com/mailgun/holster"
"testing"
)
Expand Down
Loading

0 comments on commit 19da569

Please sign in to comment.