Skip to content

Commit

Permalink
add timed throttler tests infrastructure
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Aug 1, 2020
1 parent 0b1ad5d commit 59944a6
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 37 deletions.
7 changes: 2 additions & 5 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,12 @@ func WithParams(
}

func WithPriority(ctx context.Context, priority uint8) context.Context {
if priority == 0 {
priority = 1
}
return context.WithValue(ctx, ghctxpriority, priority)
}

func ctxPriority(ctx context.Context, limit uint8) uint8 {
if val := ctx.Value(ghctxpriority); val != nil {
if priority, ok := val.(uint8); ok && priority <= limit {
if priority, ok := val.(uint8); ok && priority > 0 && priority <= limit {
return priority
}
}
Expand Down Expand Up @@ -104,7 +101,7 @@ func (ctx ctxthr) Done() <-chan struct{} {
close(ch)
return ch
}
exec(ctx, loop(ctx.freq, func(ctx context.Context) error {
gorun(ctx, loop(ctx.freq, func(ctx context.Context) error {
err := ctx.Err()
if err != nil {
close(ch)
Expand Down
15 changes: 13 additions & 2 deletions executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gohalt

import (
"context"
"sync"
"time"
)

Expand All @@ -24,7 +25,7 @@ func loop(period time.Duration, run Runnable) Runnable {
}
}

func once(after time.Duration, run Runnable) Runnable {
func delayed(after time.Duration, run Runnable) Runnable {
return func(ctx context.Context) error {
time.Sleep(after)
return run(ctx)
Expand All @@ -46,7 +47,17 @@ func cached(cache time.Duration, run Runnable) Runnable {
}
}

func exec(ctx context.Context, r Runnable) {
func once(run Runnable) Runnable {
var once sync.Once
return func(ctx context.Context) (err error) {
once.Do(func() {
err = run(ctx)
})
return err
}
}

func gorun(ctx context.Context, r Runnable) {
go func() {
_ = r(ctx)
}()
Expand Down
5 changes: 2 additions & 3 deletions generators.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (gen *generator) tvisitPriority(ctx context.Context, thr *tpriority) {
}

func (gen *generator) tvisitTimed(ctx context.Context, thr *ttimed) {
gen.thr = NewThrottlerTimed(ctx, thr.threshold, thr.interval, thr.slide)
gen.thr = NewThrottlerTimed(thr.threshold, thr.interval, thr.quantum)
}

func (gen *generator) tvisitMonitor(ctx context.Context, thr *tmonitor) {
Expand All @@ -88,10 +88,9 @@ func (gen *generator) tvisitPercentile(ctx context.Context, thr *tpercentile) {
func (gen *generator) tvisitAdaptive(ctx context.Context, thr *tadaptive) {
gen = NewGenerator(thr.thr)
gen.thr = NewThrottlerAdaptive(
ctx,
thr.threshold,
thr.interval,
thr.slide,
thr.quantum,
thr.step,
gen.Generate(ctx, nil),
)
Expand Down
46 changes: 24 additions & 22 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,7 @@ func NewThrottlerPriority(threshold uint64, levels uint8) tpriority {
slots := uint64(math.Round(float64(i) * koef))
running.Store(i, make(chan struct{}, slots))
}
thr := tpriority{threshold: threshold, levels: levels}
thr.running = running
return thr
return tpriority{running: running, threshold: threshold, levels: levels}
}

func (thr tpriority) accept(ctx context.Context, v tvisitor) {
Expand Down Expand Up @@ -303,32 +301,37 @@ func (thr tpriority) Release(ctx context.Context) error {

type ttimed struct {
*tafter
loop Runnable
interval time.Duration
slide time.Duration
quantum time.Duration
}

func NewThrottlerTimed(ctx context.Context, threshold uint64, interval time.Duration, slide time.Duration) ttimed {
func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) ttimed {
thr := NewThrottlerAfter(threshold)
delta, window := threshold, interval
if slide > 0 && interval > slide {
delta = uint64(math.Ceil(float64(delta) / float64(slide)))
window /= slide
}
exec(ctx, loop(window, func(ctx context.Context) error {
atomic.AddUint64(&thr.current, ^(delta - 1))
if current := atomic.LoadUint64(&thr.current); current >= ^uint64(0) {
atomic.StoreUint64(&thr.current, 0)
}
return ctx.Err()
}))
return ttimed{tafter: thr, interval: interval, slide: slide}
if quantum > 0 && interval > quantum {
delta = uint64(math.Ceil(float64(delta) / float64(quantum)))
window /= quantum
}
loop := once(
loop(window, func(ctx context.Context) error {
if current := atomic.AddUint64(&thr.current, ^(delta - 1)); int64(current) < 0 {
// fix running discrepancies
atomic.StoreUint64(&thr.current, 0)
}
return ctx.Err()
}),
)
return ttimed{tafter: thr, loop: loop, interval: interval, quantum: quantum}
}

func (thr ttimed) accept(ctx context.Context, v tvisitor) {
v.tvisitTimed(ctx, &thr)
}

func (thr ttimed) Acquire(ctx context.Context) error {
// start loop on first acquire
gorun(ctx, thr.loop)
return thr.tafter.Acquire(ctx)
}

Expand Down Expand Up @@ -417,7 +420,7 @@ func (thr *tlatency) Release(ctx context.Context) error {
if latency := atomic.LoadUint64(&thr.latency); latency < uint64(thr.limit) {
latency := uint64(ctxTimestamp(ctx) - time.Now().UTC().UnixNano())
atomic.StoreUint64(&thr.latency, latency)
exec(ctx, once(thr.retention, func(context.Context) error {
gorun(ctx, delayed(thr.retention, func(context.Context) error {
atomic.StoreUint64(&thr.latency, 0)
return nil
}))
Expand Down Expand Up @@ -452,7 +455,7 @@ func (thr *tpercentile) accept(ctx context.Context, v tvisitor) {
func (thr *tpercentile) Acquire(ctx context.Context) error {
at := int(math.Round(float64(thr.latencies.Len()) * thr.percentile))
if latency := time.Duration(thr.latencies.At(at)); latency > thr.limit {
exec(ctx, once(thr.retention, func(context.Context) error {
gorun(ctx, delayed(thr.retention, func(context.Context) error {
thr.latencies.Prune()
return nil
}))
Expand All @@ -474,15 +477,14 @@ type tadaptive struct {
}

func NewThrottlerAdaptive(
ctx context.Context,
limit uint64,
interval time.Duration,
slide time.Duration,
quantum time.Duration,
step uint64,
thr Throttler,
) *tadaptive {
return &tadaptive{
ttimed: NewThrottlerTimed(ctx, limit, interval, slide),
ttimed: NewThrottlerTimed(limit, interval, quantum),
step: step,
thr: thr,
}
Expand Down
68 changes: 63 additions & 5 deletions throttlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type tcase struct {
tms uint64
thr Throttler
act Runnable
pres []Runnable
ctxs []context.Context
errs []error
durs []time.Duration
Expand All @@ -26,6 +27,12 @@ func (t tcase) run(index int) (err error, dur time.Duration) {
if index < len(t.ctxs) {
ctx = t.ctxs[index]
}
// run additional pre action only if present
if index < len(t.pres) {
if pre := t.pres[index]; pre != nil {
_ = pre(ctx)
}
}
ts := time.Now()
// try catch panic into error
func() {
Expand All @@ -37,7 +44,7 @@ func (t tcase) run(index int) (err error, dur time.Duration) {
err = t.thr.Acquire(ctx)
}()
dur = time.Since(ts)
// run additional payload only if present
// run additional action only if present
if t.act != nil {
_ = t.act(ctx)
}
Expand Down Expand Up @@ -154,7 +161,7 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler running should throttle on threshold": {
tms: 3,
thr: NewThrottlerRunning(1),
act: once(time.Millisecond, nope),
act: delayed(time.Millisecond, nope),
errs: []error{
nil,
errors.New("throttler has exceed running threshold"),
Expand All @@ -164,7 +171,7 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler buffered should throttle on threshold": {
tms: 3,
thr: NewThrottlerBuffered(1),
act: once(time.Millisecond, nope),
act: delayed(time.Millisecond, nope),
durs: []time.Duration{
0,
time.Millisecond,
Expand All @@ -174,7 +181,7 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler priority should throttle on threshold": {
tms: 3,
thr: NewThrottlerPriority(1, 0),
act: once(time.Millisecond, nope),
act: delayed(time.Millisecond, nope),
durs: []time.Duration{
0,
time.Millisecond,
Expand All @@ -184,7 +191,7 @@ func TestThrottlerPattern(t *testing.T) {
"Throttler priority should not throttle on priority": {
tms: 7,
thr: NewThrottlerPriority(5, 2),
act: once(time.Millisecond, nope),
act: delayed(time.Millisecond, nope),
ctxs: []context.Context{
WithPriority(context.Background(), 1),
WithPriority(context.Background(), 1),
Expand All @@ -204,6 +211,56 @@ func TestThrottlerPattern(t *testing.T) {
time.Millisecond,
},
},
"Throttler timed should throttle after threshold": {
tms: 6,
thr: NewThrottlerTimed(
2,
time.Millisecond,
0,
),
act: delayed(time.Millisecond, nope),
pres: []Runnable{
nil,
nil,
nil,
nil,
delayed(2*time.Millisecond, nope),
delayed(2*time.Millisecond, nope),
},
errs: []error{
nil,
nil,
errors.New("throttler has exceed threshold"),
errors.New("throttler has exceed threshold"),
nil,
nil,
},
},
// "Throttler timed should throttle after threshold with quantum": {
// tms: 6,
// thr: NewThrottlerTimed(
// context.Background(),
// 2,
// 2*time.Millisecond,
// time.Millisecond,
// ),
// pres: []Runnable{
// nil,
// nil,
// nil,
// once(time.Millisecond, nope),
// once(time.Millisecond, nope),
// once(2*time.Millisecond, nope),
// },
// errs: []error{
// nil,
// nil,
// errors.New("throttler has exceed threshold"),
// nil,
// errors.New("throttler has exceed threshold"),
// errors.New("throttler has exceed threshold"),
// },
// },
"Throttler monitor should throttle with error on internal error": {
tms: 3,
thr: NewThrottlerMonitor(
Expand Down Expand Up @@ -294,6 +351,7 @@ func TestThrottlerPattern(t *testing.T) {
for i := 0; i < int(tcase.tms); i++ {
t.Run(fmt.Sprintf("run %d", i+1), func(t *testing.T) {
t.Parallel()
fmt.Println("run ", atomic.LoadInt64(&index))
index := int(atomic.AddInt64(&index, 1) - 1)
resErr, resDur := tcase.result(index)
err, dur := tcase.run(index)
Expand Down

0 comments on commit 59944a6

Please sign in to comment.