Skip to content

Commit

Permalink
fix percetile throttler tests
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Aug 24, 2020
1 parent 28b460e commit 117438f
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 28 deletions.
12 changes: 12 additions & 0 deletions executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gohalt
import (
"context"
"sync"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -32,6 +33,17 @@ func delayed(after time.Duration, run Runnable) Runnable {
}
}

func locked(run Runnable) Runnable {
var lock uint64
return func(ctx context.Context) error {
defer atomic.AddUint64(&lock, ^uint64(0))
if atomic.AddUint64(&lock, 1) > 1 {
return nil
}
return run(ctx)
}
}

func cached(cache time.Duration, run Runnable) Runnable {
var ts time.Time
return func(ctx context.Context) error {
Expand Down
52 changes: 27 additions & 25 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,7 @@ type tbackoff struct {
}

func NewThrottlerBackoff(duration time.Duration, limit time.Duration, reset bool) *tbackoff {
return &tbackoff{
duration: duration,
limit: limit,
reset: reset,
}
return &tbackoff{duration: duration, limit: limit, reset: reset}
}

func (thr *tbackoff) accept(ctx context.Context, v tvisitor) {
Expand Down Expand Up @@ -346,13 +342,14 @@ type ttimed struct {
}

func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) ttimed {
thr := NewThrottlerAfter(threshold)
tafter := NewThrottlerAfter(threshold)
delta, window := threshold, interval
if quantum > 0 && interval > quantum {
delta = uint64(math.Ceil(float64(threshold) / (float64(interval) / float64(quantum))))
window = quantum
}
loop := once(
thr := ttimed{tafter: tafter, interval: interval, quantum: quantum}
thr.loop = once(
loop(window, func(ctx context.Context) error {
if current := atomic.AddUint64(&thr.current, ^(delta - 1)); int64(current) < 0 {
// fix running discrepancies
Expand All @@ -361,7 +358,7 @@ func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Du
return ctx.Err()
}),
)
return ttimed{tafter: thr, loop: loop, interval: interval, quantum: quantum}
return thr
}

func (thr ttimed) accept(ctx context.Context, v tvisitor) {
Expand Down Expand Up @@ -440,13 +437,19 @@ func (thr tmetric) Release(context.Context) error {
}

type tlatency struct {
reset Runnable
latency uint64
threshold time.Duration
retention time.Duration
}

func NewThrottlerLatency(threshold time.Duration, retention time.Duration) *tlatency {
return &tlatency{threshold: threshold, retention: retention}
thr := &tlatency{threshold: threshold, retention: retention}
thr.reset = delayed(retention, func(context.Context) error {
atomic.StoreUint64(&thr.latency, 0)
return nil
})
return thr
}

func (thr *tlatency) accept(ctx context.Context, v tvisitor) {
Expand All @@ -461,18 +464,17 @@ func (thr *tlatency) Acquire(context.Context) error {
}

func (thr *tlatency) Release(ctx context.Context) error {
latency := uint64(time.Now().UTC().UnixNano() - ctxTimestamp(ctx))
ctxTs := ctxTimestamp(ctx)
latency := uint64(time.Now().UTC().UnixNano() - ctxTs)
if latency >= uint64(thr.threshold) && atomic.LoadUint64(&thr.latency) == 0 {
atomic.StoreUint64(&thr.latency, latency)
gorun(ctx, delayed(thr.retention, func(context.Context) error {
atomic.StoreUint64(&thr.latency, 0)
return nil
}))
gorun(ctx, thr.reset)
}
return nil
}

type tpercentile struct {
reset Runnable
latencies *blatheap
threshold time.Duration
percentile float64
Expand All @@ -484,12 +486,14 @@ func NewThrottlerPercentile(threshold time.Duration, percentile float64, retenti
if percentile > 1.0 {
percentile = 1.0
}
return tpercentile{
latencies: &blatheap{},
threshold: threshold,
percentile: percentile,
retention: retention,
}
thr := tpercentile{latencies: &blatheap{}, threshold: threshold, percentile: percentile, retention: retention}
thr.reset = locked(
delayed(thr.retention, func(context.Context) error {
thr.latencies.Prune()
return nil
}),
)
return thr
}

func (thr tpercentile) accept(ctx context.Context, v tvisitor) {
Expand All @@ -500,18 +504,16 @@ func (thr tpercentile) Acquire(ctx context.Context) error {
if length := thr.latencies.Len(); length > 0 {
at := int(math.Round(float64(length-1) * thr.percentile))
if latency := thr.latencies.At(at); latency >= uint64(thr.threshold) {
gorun(ctx, delayed(thr.retention, func(context.Context) error {
thr.latencies.Prune()
return nil
}))
gorun(ctx, thr.reset)
return errors.New("throttler has exceed latency threshold")
}
}
return nil
}

func (thr tpercentile) Release(ctx context.Context) error {
latency := uint64(time.Now().UTC().UnixNano() - ctxTimestamp(ctx))
ctxTs := ctxTimestamp(ctx)
latency := uint64(time.Now().UTC().UnixNano() - ctxTs)
heap.Push(thr.latencies, latency)
return nil
}
Expand Down
39 changes: 36 additions & 3 deletions throttlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
ms2_0 time.Duration = 2 * time.Millisecond
ms3_0 time.Duration = 3 * time.Millisecond
ms5_0 time.Duration = 5 * time.Millisecond
ms7_0 time.Duration = 7 * time.Millisecond
)

type tcase struct {
Expand All @@ -29,6 +30,7 @@ type tcase struct {
errs []error
durs []time.Duration
idx int64
over bool
}

func (t *tcase) run(index int) (err error, dur time.Duration) {
Expand Down Expand Up @@ -67,8 +69,11 @@ func (t *tcase) run(index int) (err error, dur time.Duration) {
_ = act(ctx)
}
}
// imitate over releasing
for i := 0; i < index+1; i++ {
limit := 1
if t.over { // imitate over releasing
limit = index + 1
}
for i := 0; i < limit; i++ {
if err := t.thr.Release(ctx); err != nil {
return err, dur
}
Expand Down Expand Up @@ -201,6 +206,7 @@ func TestThrottlerPattern(t *testing.T) {
errors.New("throttler has exceed running threshold"),
errors.New("throttler has exceed running threshold"),
},
over: true,
},
"Throttler buffered should throttle on threshold": {
tms: 3,
Expand All @@ -215,6 +221,7 @@ func TestThrottlerPattern(t *testing.T) {
ms0_9,
ms0_9,
},
over: true,
},
"Throttler priority should throttle on threshold": {
tms: 3,
Expand All @@ -229,6 +236,7 @@ func TestThrottlerPattern(t *testing.T) {
ms0_9,
ms0_9,
},
over: true,
},
"Throttler priority should not throttle on priority": {
tms: 7,
Expand Down Expand Up @@ -429,7 +437,7 @@ func TestThrottlerPattern(t *testing.T) {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 0.5, ms5_0),
ctxs: []context.Context{
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms1_0).UTC().UnixNano()),
context.Background(),
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms5_0).UTC().UnixNano()),
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms5_0).UTC().UnixNano()),
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms1_0).UTC().UnixNano()),
Expand All @@ -443,6 +451,31 @@ func TestThrottlerPattern(t *testing.T) {
errors.New("throttler has exceed latency threshold"),
},
},
"Throttler percentile should throttle on latency above threshold after retention": {
tms: 5,
thr: NewThrottlerPercentile(ms3_0, 1.5, ms5_0),
ctxs: []context.Context{
context.Background(),
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms5_0).UTC().UnixNano()),
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms5_0).UTC().UnixNano()),
context.WithValue(context.Background(), ghctxtimestamp, time.Now().Add(-ms1_0).UTC().UnixNano()),
context.Background(),
},
pres: []Runnable{
nil,
nil,
nil,
nil,
delayed(ms7_0, nope),
},
errs: []error{
nil,
nil,
errors.New("throttler has exceed latency threshold"),
errors.New("throttler has exceed latency threshold"),
nil,
},
},
}
for tname, tcase := range table {
t.Run(tname, func(t *testing.T) {
Expand Down

0 comments on commit 117438f

Please sign in to comment.