Skip to content

Commit

Permalink
normalize throttler errors
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Jan 24, 2021
1 parent 251b293 commit 6b1a11f
Show file tree
Hide file tree
Showing 11 changed files with 560 additions and 194 deletions.
44 changes: 22 additions & 22 deletions README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -134,34 +134,34 @@ thr := NewThrottlerAll( // throttles only if all children throttle

| Throttler | Definition | Description |
|---|---|---|
| echo | `func NewThrottlerEcho(err error) Throttler` | Always throttles with the specified error back. |
| echo | `func NewThrottlerEcho(err error) Throttler` | Always throttles with the specified error back.<br> - could return any specified error; |
| wait | `func NewThrottlerWait(duration time.Duration) Throttler` | Always waits for the specified duration. |
| square | `func NewThrottlerSquare(duration time.Duration, limit time.Duration, reset bool) Throttler` | Always waits for square growing *[1, 4, 9, 16, ...]* multiplier on the specified initial duration, up until the specified duration limit is reached.<br> If reset is set then after throttler riches the specified duration limit next multiplier value will be reseted. |
| jitter | `func NewThrottlerJitter(initial time.Duration, limit time.Duration, reset bool, jitter float64) Throttler` | Waits accordingly to undelying square throttler but also adds the provided jitter delta distribution on top.<br> Jitter value is normalized to [0.0, 1.0] range and defines which part of square delay could be randomized in percents.<br> Implementation uses `math/rand` as PRNG function and expects rand seeding by a client. |
| context | `func NewThrottlerContext() Throttler` | Always throttless on *done* context. |
| panic | `func NewThrottlerPanic() Throttler` | Always panics. |
| each | `func NewThrottlerEach(threshold uint64) Throttler` | Throttles each periodic *i-th* call defined by the specified threshold. |
| before | `func NewThrottlerBefore(threshold uint64) Throttler` | Throttles each call below the *i-th* call defined by the specified threshold. |
| after | `func NewThrottlerAfter(threshold uint64) Throttler` | Throttles each call after the *i-th* call defined by the specified threshold. |
| chance | `func NewThrottlerChance(threshold float64) Throttler` | Throttles each call with the chance *p* defined by the specified threshold.<br> Chance value is normalized to *[0.0, 1.0]* range.<br> Implementation uses `math/rand` as PRNG function and expects rand seeding by a client. |
| running | `func NewThrottlerRunning(threshold uint64) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold. |
| context | `func NewThrottlerContext() Throttler` | Always throttless on *done* context.<br> - could return `ErrorInternal`; |
| panic | `func NewThrottlerPanic() Throttler` | Always panics with `ErrorInternal`. |
| each | `func NewThrottlerEach(threshold uint64) Throttler` | Throttles each periodic *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| before | `func NewThrottlerBefore(threshold uint64) Throttler` | Throttles each call below the *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| after | `func NewThrottlerAfter(threshold uint64) Throttler` | Throttles each call after the *i-th* call defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| chance | `func NewThrottlerChance(threshold float64) Throttler` | Throttles each call with the chance *p* defined by the specified threshold.<br> Chance value is normalized to *[0.0, 1.0]* range.<br> Implementation uses `math/rand` as PRNG function and expects rand seeding by a client.<br> - could return `ErrorThreshold`; |
| running | `func NewThrottlerRunning(threshold uint64) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold.<br> - could return `ErrorThreshold`; |
| buffered | `func NewThrottlerBuffered(threshold uint64) Throttler` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until the running quota is available again. |
| priority | `func NewThrottlerPriority(threshold uint64, levels uint8) Throttler` | Waits on call which exeeds the running quota *acquired - release* *q* defined by the specified threshold until the running quota is available again.<br> Running quota is not equally distributed between *n* levels of priority defined by the specified levels.<br> Use `func WithPriority(ctx context.Context, priority uint8) context.Context` to override context call priority, *1* by default. |
| timed | `func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates. |
| latency | `func NewThrottlerLatency(threshold time.Duration, retention time.Duration) Throttler` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration.<br> Use `func WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. |
| percentile | `func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) Throttler` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> Percentile values are kept in bounded buffer with capacity *c* defined by the specified capacity. <br> If retention is set then throttler state will be reseted after retention duration.<br> Use `func WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*. |
| monitor | `func NewThrottlerMonitor(mnt Monitor, threshold Stats) Throttler` | Throttles call if any of the stats returned by provided monitor exceeds any of the stats defined by the specified threshold or if any internal error occurred.<br> Builtin `Monitor` implementations come with stats caching by default.<br> Use builtin `NewMonitorSystem` to create go system monitor instance. |
| metric | `func NewThrottlerMetric(mtc Metric) Throttler` | Throttles call if boolean metric defined by the specified boolean metric is reached or if any internal error occurred.<br> Builtin `Metric` implementations come with boolean metric caching by default.<br> Use builtin `NewMetricPrometheus` to create Prometheus metric instance. |
| enqueuer | `func NewThrottlerEnqueue(enq Enqueuer) Throttler` | Always enqueues message to the specified queue throttles only if any internal error occurred.<br> Use `func WithMessage(ctx context.Context, message interface{}) context.Context` to specify context message for enqueued message and `func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context` to specify context message marshaler.<br> Builtin `Enqueuer` implementations come with connection reuse and retries by default.<br> Use builtin `func NewEnqueuerRabbit(url string, queue string, retries uint64) Enqueuer` to create RabbitMQ enqueuer instance or `func NewEnqueuerKafka(net string, url string, topic string, retries uint64) Enqueuer` to create Kafka enqueuer instance. |
| adaptive | `func NewThrottlerAdaptive(threshold uint64, interval time.Duration, quantum time.Duration, step uint64, thr Throttler) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br> Provided adapted throttler adjusts the running quota of adapter throttler by changing the value by *d* defined by the specified step, it subtracts *d^2* from the running quota if adapted throttler throttles or adds *d* to the running quota if it doesn't. |
| pattern | `func NewThrottlerPattern(patterns ...Pattern) Throttler` | Throttles if matching throttler from provided patterns throttles.<br> Use `func WithKey(ctx context.Context, key string) context.Context` to specify key for regexp pattern throttler matching.<br> `Pattern` defines a pair of regexp and related throttler. |
| ring | `func NewThrottlerRing(thrs ...Throttler) Throttler` | Throttles if the *i-th* call throttler from provided list throttle. |
| all | `func NewThrottlerAll(thrs ...Throttler) Throttler` | Throttles call if all provided throttlers throttle. |
| any | `func NewThrottlerAny(thrs ...Throttler) Throttler` | Throttles call if any of provided throttlers throttle. |
| not | `func NewThrottlerNot(thr Throttler) Throttler` | Throttles call if provided throttler doesn't throttle. |
| timed | `func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br> - could return `ErrorThreshold`; |
| latency | `func NewThrottlerLatency(threshold time.Duration, retention time.Duration) Throttler` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once.<br> If retention is set then throttler state will be reseted after retention duration.<br> Use `func WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*.<br> - could return `ErrorThreshold`; |
| percentile | `func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) Throttler` | Throttles each call after the call latency *l* defined by the specified threshold was exeeded once considering the specified percentile.<br> Percentile values are kept in bounded buffer with capacity *c* defined by the specified capacity. <br> If retention is set then throttler state will be reseted after retention duration.<br> Use `func WithTimestamp(ctx context.Context, ts time.Time) context.Context` to specify running duration between throttler *acquire* and *release*.<br> - could return `ErrorThreshold`; |
| monitor | `func NewThrottlerMonitor(mnt Monitor, threshold Stats) Throttler` | Throttles call if any of the stats returned by provided monitor exceeds any of the stats defined by the specified threshold or if any internal error occurred.<br> Builtin `Monitor` implementations come with stats caching by default.<br> Use builtin `NewMonitorSystem` to create go system monitor instance.<br> - could return `ErrorInternal`;<br> - could return `ErrorThreshold`; |
| metric | `func NewThrottlerMetric(mtc Metric) Throttler` | Throttles call if boolean metric defined by the specified boolean metric is reached or if any internal error occurred.<br> Builtin `Metric` implementations come with boolean metric caching by default.<br> Use builtin `NewMetricPrometheus` to create Prometheus metric instance.<br> - could return `ErrorInternal`;<br> - could return `ErrorThreshold`; |
| enqueuer | `func NewThrottlerEnqueue(enq Enqueuer) Throttler` | Always enqueues message to the specified queue throttles only if any internal error occurred.<br> Use `func WithMessage(ctx context.Context, message interface{}) context.Context` to specify context message for enqueued message and `func WithMarshaler(ctx context.Context, mrsh Marshaler) context.Context` to specify context message marshaler.<br> Builtin `Enqueuer` implementations come with connection reuse and retries by default.<br> Use builtin `func NewEnqueuerRabbit(url string, queue string, retries uint64) Enqueuer` to create RabbitMQ enqueuer instance or `func NewEnqueuerKafka(net string, url string, topic string, retries uint64) Enqueuer` to create Kafka enqueuer instance.<br> - could return `ErrorInternal`; |
| adaptive | `func NewThrottlerAdaptive(threshold uint64, interval time.Duration, quantum time.Duration, step uint64, thr Throttler) Throttler` | Throttles each call which exeeds the running quota *acquired - release* *q* defined by the specified threshold in the specified interval.<br> Periodically each specified interval the running quota number is reseted.<br> If quantum is set then quantum will be used instead of interval to provide the running quota delta updates.<br> Provided adapted throttler adjusts the running quota of adapter throttler by changing the value by *d* defined by the specified step, it subtracts *d^2* from the running quota if adapted throttler throttles or adds *d* to the running quota if it doesn't.<br> - could return `ErrorThreshold`; |
| pattern | `func NewThrottlerPattern(patterns ...Pattern) Throttler` | Throttles if matching throttler from provided patterns throttles.<br> Use `func WithKey(ctx context.Context, key string) context.Context` to specify key for regexp pattern throttler matching.<br> `Pattern` defines a pair of regexp and related throttler.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
| ring | `func NewThrottlerRing(thrs ...Throttler) Throttler` | Throttles if the *i-th* call throttler from provided list throttle.<br> - could return `ErrorInternal`;<br> - could return any underlying throttler error; |
| all | `func NewThrottlerAll(thrs ...Throttler) Throttler` | Throttles call if all provided throttlers throttle.<br> - could return `ErrorInternal`; |
| any | `func NewThrottlerAny(thrs ...Throttler) Throttler` | Throttles call if any of provided throttlers throttle.<br> - could return `ErrorInternal`; |
| not | `func NewThrottlerNot(thr Throttler) Throttler` | Throttles call if provided throttler doesn't throttle.<br> - could return `ErrorInternal`; |
| suppress | `func NewThrottlerSuppress(thr Throttler) Throttler` | Suppresses provided throttler to never throttle. |
| retry | `func NewThrottlerRetry(thr Throttler, retries uint64) Throttler` | Retries provided throttler error up until the provided retries threshold.<br> Internally retry uses square throttler with `DefaultRetriedDuration` initial duration. |
| cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.<br> Only non throttling calls are cached for the provided cache duration. |
| retry | `func NewThrottlerRetry(thr Throttler, retries uint64) Throttler` | Retries provided throttler error up until the provided retries threshold.<br> Internally retry uses square throttler with `DefaultRetriedDuration` initial duration.<br> - could return any underlying throttler error; |
| cache | `func NewThrottlerCache(thr Throttler, cache time.Duration) Throttler` | Caches provided throttler calls for the provided cache duration, throttler release resulting resets cache.<br> Only non throttling calls are cached for the provided cache duration.<br> - could return any underlying throttler error; |

## Integrations

Expand Down
2 changes: 1 addition & 1 deletion context.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (ctx ctxthr) Done() <-chan struct{} {
err := ctx.Err()
if err != nil {
close(ch)
log("context is canceled due %v", err)
log("context is canceled due: %v", err)
}
return err
}))
Expand Down
10 changes: 7 additions & 3 deletions context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
func TestContext(t *testing.T) {
cctx, cancel := context.WithCancel(context.Background())
cancel()
testerr := errors.New("test")
ctx := WithParams(
context.Background(),
time.Now(),
Expand All @@ -26,12 +27,15 @@ func TestContext(t *testing.T) {
err error
}{
"Context with throttler should be done on throttling": {
ctx: WithThrottler(context.Background(), tmock{aerr: errors.New("test")}, ms1_0),
err: fmt.Errorf("throttler error has happened %w", errors.New("test")),
ctx: WithThrottler(context.Background(), tmock{aerr: testerr}, ms1_0),
err: fmt.Errorf("throttler error has happened %w", testerr),
},
"Context with throttler should be done on throttling after": {
ctx: WithThrottler(context.Background(), NewThrottlerAfter(1), ms1_0),
err: fmt.Errorf("throttler error has happened %w", errors.New("throttler has exceed threshold")),
err: fmt.Errorf(
"throttler error has happened %w",
ErrorThreshold{Throttler: "after", Threshold: strpair{current: 3, threshold: 1}},
),
},
"Context with throttler should be done with canceled context": {
ctx: WithThrottler(cctx, tmock{}, ms1_0),
Expand Down
90 changes: 90 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package gohalt

import (
"fmt"
"time"
)

type strbool bool

func (b strbool) String() string {
return fmt.Sprintf("%t", bool(b))
}

type strpair struct {
current uint64
threshold uint64
}

func (p strpair) String() string {
return fmt.Sprintf("%d out of %d", p.current, p.threshold)
}

type strpercent float64

func (p strpercent) String() string {
return fmt.Sprintf("%.4f%%", float64(p)*100)
}

type strdurations struct {
current time.Duration
threshold time.Duration
}

func (d strdurations) String() string {
return fmt.Sprintf("%s out of %s", d.current, d.threshold)
}

type strstats struct {
current Stats
threshold Stats
}

func (s strstats) String() string {
return fmt.Sprintf(
`
%d out of %d bytes
%d out of %d bytes
%d out of %d ns
%.4f out of %.4f %%
`,
s.current.MEMAlloc,
s.threshold.MEMAlloc,
s.current.MEMSystem,
s.threshold.MEMSystem,
s.current.CPUPause,
s.threshold.CPUPause,
s.current.CPUUsage*100,
s.threshold.CPUUsage*100,
)
}

// ErrorThreshold defines error type
// that occurs if throttler reaches specified threshold.
type ErrorThreshold struct {
Throttler string
Threshold fmt.Stringer
}

func (err ErrorThreshold) Error() string {
return fmt.Sprintf(
"throttler %q has reached its threshold: %s",
err.Throttler,
err.Threshold,
)
}

// ErrorInternal defines error type
// that occurs if throttler internal error happens.
type ErrorInternal struct {
Throttler string
Message string
}

func (err ErrorInternal) Error() string {
return fmt.Sprintf(
"throttler %q internal error happened: %s",
err.Throttler,
err.Message,
)
}
2 changes: 1 addition & 1 deletion executors.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func retried(retries uint64, run Runnable) Runnable {
if err == nil {
return
}
log("retry error happened %v", err)
log("retry error happened: %v", err)
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (mtc *mtcprometheus) pull(ctx context.Context, api prometheus.API, query st
return err
}
for _, warn := range warns {
log("prometheus warning happened %s", warn)
log("prometheus warning happened: %s", warn)
}
vec, ok := val.(model.Vector)
if !ok || vec.Len() != 1 {
Expand Down
8 changes: 8 additions & 0 deletions monitors.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ type Stats struct {
CPUUsage float64
}

// Compare checks if provided stats is below current stats.
func (s Stats) Compare(stats Stats) bool {
return (s.MEMAlloc > 0 && stats.MEMAlloc >= s.MEMAlloc) ||
(s.MEMSystem > 0 && stats.MEMSystem >= s.MEMSystem) ||
(s.CPUPause > 0 && stats.CPUPause >= s.CPUPause) ||
(s.CPUUsage > 0 && stats.CPUUsage >= s.CPUUsage)
}

// Monitor defines system monitor interface that returns the system stats.
type Monitor interface {
// Stats returns system stats or internal error if any happened.
Expand Down
6 changes: 3 additions & 3 deletions runners.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewRunnerSync(ctx context.Context, thr Throttler) Runner {
r.err = err
cancel()
}
log("sync runner error happened %v", err)
log("sync runner error happened: %v", err)
}
}
return &r
Expand Down Expand Up @@ -97,7 +97,7 @@ func NewRunnerAsync(ctx context.Context, thr Throttler) Runner {
r.err = err
cancel()
})
log("async runner error happened %v", err)
log("async runner error happened: %v", err)
}
}
return &r
Expand All @@ -109,7 +109,7 @@ func (r *rasync) Run(run Runnable) {
defer r.wg.Done()
select {
case <-r.ctx.Done():
r.report(fmt.Errorf("context error has happened %w", r.ctx.Err()))
r.report(fmt.Errorf("context error happened %w", r.ctx.Err()))
return
default:
}
Expand Down
Loading

0 comments on commit 6b1a11f

Please sign in to comment.