Skip to content

Commit

Permalink
Moved healthcheck to sorted sets
Browse files Browse the repository at this point in the history
  • Loading branch information
lonelycode committed Oct 7, 2015
1 parent b3f97e0 commit db8d253
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 33 deletions.
39 changes: 23 additions & 16 deletions api_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"strconv"
"strings"
"time"
)

type HealthPrefix string
Expand Down Expand Up @@ -48,10 +47,11 @@ func (h *DefaultHealthChecker) Init(storeType StorageHandler) {

func (h *DefaultHealthChecker) CreateKeyName(subKey HealthPrefix) string {
var newKey string
now := time.Now().UnixNano()
//now := time.Now().UnixNano()

// Key should be API-ID.SubKey.123456789
newKey = strings.Join([]string{h.APIID, string(subKey), strconv.FormatInt(now, 10)}, ".")
//newKey = strings.Join([]string{h.APIID, string(subKey), strconv.FormatInt(now, 10)}, ".")
newKey = strings.Join([]string{h.APIID, string(subKey)}, ".")

return newKey
}
Expand All @@ -66,24 +66,29 @@ func (h *DefaultHealthChecker) StoreCounterVal(counterType HealthPrefix, value s
if config.HealthCheck.EnableHealthChecks {
searchStr := h.CreateKeyName(counterType)
log.Debug("Adding Healthcheck to: ", searchStr)
go h.storage.SetKey(searchStr, value, config.HealthCheck.HealthCheckValueTimeout)
//go h.storage.SetKey(searchStr, value, config.HealthCheck.HealthCheckValueTimeout)
valConv, _ := strconv.Atoi(value)
go h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, int64(valConv))
}
}

func (h *DefaultHealthChecker) getAvgCount(prefix HealthPrefix) float64 {
searchStr := strings.Join([]string{h.APIID, string(prefix)}, ".")
//searchStr := strings.Join([]string{h.APIID, string(prefix)}, ".")

searchStr := h.CreateKeyName(prefix)
log.Debug("Searching for: ", searchStr)
keys := h.storage.GetKeys(searchStr)
log.Debug("Found ", keys)
var count int64
count = int64(len(keys))

var count int
count, _ = h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, 0)

//count = int64(len(keys))
divisor := float64(config.HealthCheck.HealthCheckValueTimeout)
if divisor == 0 {
log.Warning("The Health Check sample timeout is set to 0, samples will never be deleted!!!")
divisor = 60.0
}
if count > 0 {
return roundValue(float64(count) / divisor)
return roundValue((float64(count) - 1) / divisor)
}

return 0.00
Expand All @@ -107,19 +112,21 @@ func (h *DefaultHealthChecker) GetApiHealthValues() (HealthCheckValues, error) {
// Get the micro latency graph, an average upstream latency
searchStr := strings.Join([]string{h.APIID, string(RequestLog)}, ".")
log.Debug("Searching KV for: ", searchStr)
kv := h.storage.GetKeysAndValuesWithFilter(searchStr)
log.Debug("Found: ", kv)
//kv := h.storage.GetKeysAndValuesWithFilter(searchStr)
_, vals := h.storage.SetRollingWindow(searchStr, config.HealthCheck.HealthCheckValueTimeout, 0)
log.Info("Found: ", vals)
var runningTotal int
if len(kv) > 0 {
for _, v := range kv {
vInt, cErr := strconv.Atoi(v)
if len(vals) > 0 {
for _, v := range vals {
log.Info("V is: ", string(v.([]byte)))
vInt, cErr := strconv.Atoi(string(v.([]byte)))
if cErr != nil {
log.Error("Couldn't convert tracked latency value to Int, vl is: ")
} else {
runningTotal += vInt
}
}
values.AvgUpstreamLatency = roundValue(float64(runningTotal / len(kv)))
values.AvgUpstreamLatency = roundValue(float64(runningTotal / len(vals)))
}

return values, nil
Expand Down
4 changes: 2 additions & 2 deletions ldap_auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,9 @@ func (l *LDAPStorageHandler) notifyReadOnly() bool {
return false
}

func (s *LDAPStorageHandler) SetRollingWindow(keyName string, per int64, expire int64) int {
func (s *LDAPStorageHandler) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
log.Warning("Not Implemented!")
return 0
return 0, []interface{}{}
}

func (s LDAPStorageHandler) GetSet(keyName string) (map[string]string, error) {
Expand Down
17 changes: 11 additions & 6 deletions redis_cluster_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,14 +590,14 @@ func (r *RedisClusterStorageManager) RemoveFromSet(keyName string, value string)
}
}

// IncrementWithExpire will increment a key in redis
func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64, expire int64) int {
// SetRollingWindow will append to a sorted set in redis and extract a timed window of values
func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64, value_override int64) (int, []interface{}) {

log.Debug("Incrementing raw key: ", keyName)
if r.db == nil {
log.Info("Connection dropped, connecting..")
r.Connect()
r.SetRollingWindow(keyName, per, expire)
r.SetRollingWindow(keyName, per, value_override)
} else {
log.Debug("keyName is: ", keyName)
now := time.Now()
Expand All @@ -615,7 +615,12 @@ func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64,

ZADD := rediscluster.ClusterTransaction{}
ZADD.Cmd = "ZADD"
ZADD.Args = []interface{}{keyName, now.UnixNano(), strconv.Itoa(int(now.UnixNano()))}

if value_override != -1 {
ZADD.Args = []interface{}{keyName, now.UnixNano(), strconv.Itoa(int(value_override))}
} else {
ZADD.Args = []interface{}{keyName, now.UnixNano(), strconv.Itoa(int(now.UnixNano()))}
}

EXPIRE := rediscluster.ClusterTransaction{}
EXPIRE.Cmd = "EXPIRE"
Expand All @@ -631,7 +636,7 @@ func (r *RedisClusterStorageManager) SetRollingWindow(keyName string, per int64,
log.Error("Multi command failed: ", err)
}

return intVal
return intVal, r[1].([]interface{})
}
return 0
return 0, []interface{}{}
}
4 changes: 2 additions & 2 deletions rpc_storage_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (r *RPCStorageHandler) AppendToSet(keyName string, value string) {
}

// SetScrollingWindow is used in the rate limiter to handle rate limits fairly.
func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, expire int64) int {
func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
start := time.Now() // get current time
ibd := InboundData{
KeyName: keyName,
Expand All @@ -424,7 +424,7 @@ func (r *RPCStorageHandler) SetRollingWindow(keyName string, per int64, expire i
elapsed := time.Since(start)
log.Debug("SetRollingWindow took ", elapsed)

return intVal.(int)
return intVal.(int), []interface{}{}

}

Expand Down
2 changes: 1 addition & 1 deletion session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type SessionLimiter struct{}
func (l SessionLimiter) doRollingWindowWrite(key, rateLimiterKey, rateLimiterSentinelKey string, currentSession *SessionState, store StorageHandler) {
log.Debug("[RATELIMIT] Inbound raw key is: ", key)
log.Debug("[RATELIMIT] Rate limiter key is: ", rateLimiterKey)
ratePerPeriodNow := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), int64(currentSession.Per))
ratePerPeriodNow, _ := store.SetRollingWindow(rateLimiterKey, int64(currentSession.Per), -1)

log.Debug("Num Requests: ", ratePerPeriodNow)

Expand Down
12 changes: 6 additions & 6 deletions storage_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type StorageHandler interface {
DeleteKeys([]string) bool
Decrement(string)
IncrememntWithExpire(string, int64) int64
SetRollingWindow(string, int64, int64) int
SetRollingWindow(string, int64, int64) (int, []interface{})
GetSet(string) (map[string]string, error)
AddToSet(string, string)
RemoveFromSet(string, string)
Expand All @@ -59,9 +59,9 @@ func (s *InMemoryStorageManager) Decrement(n string) {
log.Warning("Not implemented!")
}

func (s *InMemoryStorageManager) SetRollingWindow(keyName string, per int64, expire int64) int {
func (s *InMemoryStorageManager) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
log.Warning("Not Implemented!")
return 0
return 0, []interface{}{}
}

func (s *InMemoryStorageManager) IncrememntWithExpire(n string, i int64) int64 {
Expand Down Expand Up @@ -730,7 +730,7 @@ func (r *RedisStorageManager) AppendToSet(keyName string, value string) {
}

// IncrementWithExpire will increment a key in redis
func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire int64) int {
func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire int64) (int, []interface{}) {
db := r.pool.Get()
defer db.Close()

Expand Down Expand Up @@ -765,9 +765,9 @@ func (r *RedisStorageManager) SetRollingWindow(keyName string, per int64, expire
log.Error("Multi command failed: ", err)
}

return intVal
return intVal, r[1].([]interface{})
}
return 0
return 0, []interface{}{}
}

func (r *RedisStorageManager) GetSet(keyName string) (map[string]string, error) {
Expand Down

0 comments on commit db8d253

Please sign in to comment.