Skip to content

Commit

Permalink
Added non-transactional rate limit handler, much improved performance
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Mar 30, 2016
1 parent eab6c1f commit 5ce7e04
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 13 deletions.
25 changes: 13 additions & 12 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,19 @@ type Config struct {
EnableHealthChecks bool `json:"enable_health_checks"`
HealthCheckValueTimeout int64 `json:"health_check_value_timeouts"`
} `json:"health_check"`
UseAsyncSessionWrite bool `json:"optimisations_use_async_session_write"`
AllowMasterKeys bool `json:"allow_master_keys"`
HashKeys bool `json:"hash_keys"`
SuppressRedisSignalReload bool `json:"suppress_redis_signal_reload"`
SupressDefaultOrgStore bool `json:"suppress_default_org_store"`
SentryCode string `json:"sentry_code"`
UseSentry bool `json:"use_sentry"`
EnforceOrgDataAge bool `json:"enforce_org_data_age"`
EnforceOrgDataDeailLogging bool `json:"enforce_org_data_detail_logging"`
EnforceOrgQuotas bool `json:"enforce_org_quotas"`
ExperimentalProcessOrgOffThread bool `json:"experimental_process_org_off_thread"`
Monitor struct {
UseAsyncSessionWrite bool `json:"optimisations_use_async_session_write"`
AllowMasterKeys bool `json:"allow_master_keys"`
HashKeys bool `json:"hash_keys"`
SuppressRedisSignalReload bool `json:"suppress_redis_signal_reload"`
SupressDefaultOrgStore bool `json:"suppress_default_org_store"`
SentryCode string `json:"sentry_code"`
UseSentry bool `json:"use_sentry"`
EnforceOrgDataAge bool `json:"enforce_org_data_age"`
EnforceOrgDataDeailLogging bool `json:"enforce_org_data_detail_logging"`
EnforceOrgQuotas bool `json:"enforce_org_quotas"`
ExperimentalProcessOrgOffThread bool `json:"experimental_process_org_off_thread"`
EnableNonTransactionalRateLimiter bool `json:"enable_non_transactional_rate_limiter"`
Monitor struct {
EnableTriggerMonitors bool `json:"enable_trigger_monitors"`
Config WebHookHandlerConf `json:"configuration"`
GlobalTriggerLimit float64 `json:"global_trigger_limit"`
Expand Down
5 changes: 5 additions & 0 deletions ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (s *LDAPStorageHandler) SetRollingWindow(keyName string, per int64, val str
return 0, []interface{}{}
}

func (s *LDAPStorageHandler) SetRollingWindowPipeline(keyName string, per int64, val string) (int, []interface{}) {
log.Warning("Not Implemented!")
return 0, []interface{}{}
}

func (s LDAPStorageHandler) GetSet(keyName string) (map[string]string, error) {
log.Error("Not implemented")
return map[string]string{}, nil
Expand Down
50 changes: 50 additions & 0 deletions redis_cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,3 +663,53 @@ func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64,
}
return 0, []interface{}{}
}

func (r *RedisClusterStorageManager) SetRollingWindowPipeline(keyName string, per int64, value_override string) (int, []interface{}) {

log.Debug("Incrementing raw key: ", keyName)
if r.db == nil {
log.Info("Connection dropped, connecting..")
r.Connect()
return r.SetRollingWindow(keyName, per, value_override)
} else {
log.Debug("keyName is: ", keyName)
now := time.Now()
log.Debug("Now is:", now)
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
log.Debug("Then is: ", onePeriodAgo)

ZREMRANGEBYSCORE := rediscluster.ClusterTransaction{}
ZREMRANGEBYSCORE.Cmd = "ZREMRANGEBYSCORE"
ZREMRANGEBYSCORE.Args = []interface{}{keyName, "-inf", onePeriodAgo.UnixNano()}

ZRANGE := rediscluster.ClusterTransaction{}
ZRANGE.Cmd = "ZRANGE"
ZRANGE.Args = []interface{}{keyName, 0, -1}

ZADD := rediscluster.ClusterTransaction{}
ZADD.Cmd = "ZADD"

if value_override != "-1" {
ZADD.Args = []interface{}{keyName, now.UnixNano(), value_override}
} else {
ZADD.Args = []interface{}{keyName, now.UnixNano(), strconv.Itoa(int(now.UnixNano()))}
}

EXPIRE := rediscluster.ClusterTransaction{}
EXPIRE.Cmd = "EXPIRE"
EXPIRE.Args = []interface{}{keyName, per}

redVal, err := redis.Values(r.db.DoPipeline([]rediscluster.ClusterTransaction{ZREMRANGEBYSCORE, ZRANGE, ZADD, EXPIRE}))

intVal := len(redVal[1].([]interface{}))

log.Debug("Returned: ", intVal)

if err != nil {
log.Error("Multi command failed: ", err)
}

return intVal, redVal[1].([]interface{})
}
return 0, []interface{}{}
}
22 changes: 22 additions & 0 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,28 @@ func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, val stri

}

// SetScrollingWindow is used in the rate limiter to handle rate limits fairly.
func (r *RPCStorageHandler) SetRollingWindowPipeline(keyName string, per int64, val string) (int, []interface{}) {
start := time.Now() // get current time
ibd := InboundData{
KeyName: keyName,
Per: per,
Expire: -1,
}

intVal, err := r.Client.Call("SetRollingWindow", ibd)
if r.IsAccessError(err) {
r.Login()
return r.SetRollingWindow(keyName, per, val)
}

elapsed := time.Since(start)
log.Debug("SetRollingWindow took ", elapsed)

return intVal.(int), []interface{}{}

}

func (r RPCStorageHandler) GetSet(keyName string) (map[string]string, error) {
log.Error("Not implemented")
return map[string]string{}, nil
Expand Down
7 changes: 6 additions & 1 deletion session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ type SessionLimiter struct{}
func (l SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, currentSession *SessionState, store StorageHandler) {
log.Debug("[RATELIMIT] Inbound raw key is: ", key)
log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey)
ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), "-1")
var ratePerPeriodNow int
if config.EnableNonTransactionalRateLimiter {
ratePerPeriodNow, _ = store.SetRollingWindowPipeline(rateLimiterKey, int64(currentSession.Per), "-1")
} else {
ratePerPeriodNow, _ = store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), "-1")
}

log.Debug("Num Requests: ", ratePerPeriodNow)

Expand Down
47 changes: 47 additions & 0 deletions storage_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type StorageHandler interface {
Decrement(string)
IncrememntWithExpire(string, int64) int64
SetRollingWindow(string, int64, string) (int, []interface{})
SetRollingWindowPipeline(string, int64, string) (int, []interface{})
GetSet(string) (map[string]string, error)
AddToSet(string, string)
RemoveFromSet(string, string)
Expand All @@ -64,6 +65,11 @@ func (s *InMemoryStorageManager) SetRollingWindow(keyName string, per int64, val
return 0, []interface{}{}
}

func (s *InMemoryStorageManager) SetRollingWindowPipeline(keyName string, per int64, val string) (int, []interface{}) {
log.Warning("Not Implemented!")
return 0, []interface{}{}
}

func (s *InMemoryStorageManager) IncrememntWithExpire(n string, i int64) int64 {
log.Warning("Not implemented!")
return 0
Expand Down Expand Up @@ -770,6 +776,47 @@ func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire
return 0, []interface{}{}
}

// IncrementWithExpire will increment a key in redis - NOT IMPLEMENTED
func (r *RedisStorageManager) SetRollingWindowPipeline(keyName string, per int64, expire string) (int, []interface{}) {
db := r.pool.Get()
defer db.Close()

log.Debug("Incrementing raw key: ", keyName)
if db == nil {
log.Info("Connection dropped, connecting..")
r.Connect()
r.SetRollingWindow(keyName, per, expire)
} else {
log.Debug("keyName is: ", keyName)
now := time.Now()
log.Debug("Now is:", now)
onePeriodAgo := now.Add(time.Duration(-1*per) * time.Second)
log.Debug("Then is: ", onePeriodAgo)

db.Send("MULTI")
// Drop the last period so we get current bucket
db.Send("ZREMRANGEBYSCORE", keyName, "-inf", onePeriodAgo.UnixNano())
// Get the set
db.Send("ZRANGE", keyName, 0, -1)
// Add this request to the pile
db.Send("ZADD", keyName, now.UnixNano(), strconv.Itoa(int(now.UnixNano())))
// REset the TTL so the key lives as long as the requests pile in
db.Send("EXPIRE", keyName, per)
r, err := redis.Values(db.Do("EXEC"))

intVal := len(r[1].([]interface{}))

log.Debug("Returned: ", intVal)

if err != nil {
log.Error("Multi command failed: ", err)
}

return intVal, r[1].([]interface{})
}
return 0, []interface{}{}
}

func (r *RedisStorageManager) GetSet(keyName string) (map[string]string, error) {
log.Debug("Getting from key set: ", keyName)
log.Info("Getting from fixed key set: ", r.fixKey(keyName))
Expand Down

0 comments on commit 5ce7e04

Please sign in to comment.