Skip to content

Commit

Permalink
throttlers reorder
Browse files Browse the repository at this point in the history
  • Loading branch information
1pkg committed Sep 28, 2020
1 parent b538f3e commit 0323184
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 226 deletions.
218 changes: 107 additions & 111 deletions throttlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,25 @@ func (thr *tsquare) Release(context.Context) error {
return nil
}

type tcontext struct{}

func NewThrottlerContext() tcontext {
return tcontext{}
}

func (thr tcontext) Acquire(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("throttler has received context error %w", ctx.Err())
default:
return nil
}
}

func (thr tcontext) Release(ctx context.Context) error {
return nil
}

type tpanic struct{}

func NewThrottlerPanic() tpanic {
Expand Down Expand Up @@ -144,6 +163,26 @@ func (thr *tbefore) Release(context.Context) error {
return nil
}

type tafter struct {
current uint64
threshold uint64
}

func NewThrottlerAfter(threshold uint64) *tafter {
return &tafter{threshold: threshold}
}

func (thr *tafter) Acquire(context.Context) error {
if current := atomicBIncr(&thr.current); current > thr.threshold {
return errors.New("throttler has exceed threshold")
}
return nil
}

func (thr *tafter) Release(context.Context) error {
return nil
}

type tchance struct {
threshold float64
}
Expand All @@ -167,26 +206,6 @@ func (thr tchance) Release(context.Context) error {
return nil
}

type tafter struct {
current uint64
threshold uint64
}

func NewThrottlerAfter(threshold uint64) *tafter {
return &tafter{threshold: threshold}
}

func (thr *tafter) Acquire(context.Context) error {
if current := atomicBIncr(&thr.current); current > thr.threshold {
return errors.New("throttler has exceed threshold")
}
return nil
}

func (thr *tafter) Release(context.Context) error {
return nil
}

type trunning struct {
running uint64
threshold uint64
Expand Down Expand Up @@ -271,9 +290,7 @@ func (thr tpriority) Release(ctx context.Context) error {

type ttimed struct {
*tafter
loop Runnable
interval time.Duration
quantum time.Duration
loop Runnable
}

func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Duration) ttimed {
Expand All @@ -283,7 +300,7 @@ func NewThrottlerTimed(threshold uint64, interval time.Duration, quantum time.Du
delta = uint64(math.Ceil(float64(threshold) / (float64(interval) / float64(quantum))))
window = quantum
}
thr := ttimed{tafter: tafter, interval: interval, quantum: quantum}
thr := ttimed{tafter: tafter}
thr.loop = once(
loop(window, func(ctx context.Context) error {
atomicBSub(&thr.current, delta)
Expand All @@ -307,63 +324,14 @@ func (thr ttimed) Release(ctx context.Context) error {
return thr.tafter.Release(ctx)
}

type tmonitor struct {
mnt Monitor
threshold Stats
}

func NewThrottlerMonitor(mnt Monitor, threshold Stats) tmonitor {
return tmonitor{mnt: mnt, threshold: threshold}
}

func (thr tmonitor) Acquire(ctx context.Context) error {
stats, err := thr.mnt.Stats(ctx)
if err != nil {
return fmt.Errorf("throttler hasn't found any stats %w", err)
}
if stats.MEMAlloc >= thr.threshold.MEMAlloc || stats.MEMSystem >= thr.threshold.MEMSystem ||
stats.CPUPause >= thr.threshold.CPUPause || stats.CPUUsage >= thr.threshold.CPUUsage {
return errors.New("throttler has exceed stats threshold")
}
return nil
}

func (thr tmonitor) Release(context.Context) error {
return nil
}

type tmetric struct {
mtc Metric
}

func NewThrottlerMetric(mtc Metric) tmetric {
return tmetric{mtc: mtc}
}

func (thr tmetric) Acquire(ctx context.Context) error {
val, err := thr.mtc.Query(ctx)
if err != nil {
return fmt.Errorf("throttler hasn't found any metric %w", err)
}
if val {
return errors.New("throttler has reached metric threshold")
}
return nil
}

func (thr tmetric) Release(context.Context) error {
return nil
}

type tlatency struct {
reset Runnable
latency uint64
threshold time.Duration
retention time.Duration
}

func NewThrottlerLatency(threshold time.Duration, retention time.Duration) *tlatency {
thr := &tlatency{threshold: threshold, retention: retention}
thr := &tlatency{threshold: threshold}
thr.reset = delayed(retention, func(context.Context) error {
atomicSet(&thr.latency, 0)
return nil
Expand Down Expand Up @@ -394,19 +362,18 @@ type tpercentile struct {
latencies *percentiles
threshold time.Duration
percentile float64
retention time.Duration
}

func NewThrottlerPercentile(threshold time.Duration, capacity uint8, percentile float64, retention time.Duration) tpercentile {
percentile = math.Abs(percentile)
if percentile > 1.0 {
percentile = 1.0
}
thr := tpercentile{threshold: threshold, percentile: percentile, retention: retention}
thr := tpercentile{threshold: threshold, percentile: percentile}
thr.latencies = &percentiles{cap: capacity}
thr.latencies.Prune()
thr.reset = locked(
delayed(thr.retention, func(context.Context) error {
delayed(retention, func(context.Context) error {
thr.latencies.Prune()
return nil
}),
Expand All @@ -432,56 +399,51 @@ func (thr tpercentile) Release(ctx context.Context) error {
return nil
}

type tadaptive struct {
ttimed
step uint64
thr Throttler
type tmonitor struct {
mnt Monitor
threshold Stats
}

func NewThrottlerAdaptive(
threshold uint64,
interval time.Duration,
quantum time.Duration,
step uint64,
thr Throttler,
) *tadaptive {
return &tadaptive{
ttimed: NewThrottlerTimed(threshold, interval, quantum),
step: step,
thr: thr,
}
func NewThrottlerMonitor(mnt Monitor, threshold Stats) tmonitor {
return tmonitor{mnt: mnt, threshold: threshold}
}

func (thr *tadaptive) Acquire(ctx context.Context) error {
err := thr.thr.Acquire(ctx)
func (thr tmonitor) Acquire(ctx context.Context) error {
stats, err := thr.mnt.Stats(ctx)
if err != nil {
atomicBSub(&thr.ttimed.threshold, thr.step*thr.step)
} else {
atomicBAdd(&thr.ttimed.threshold, thr.step)
return fmt.Errorf("throttler hasn't found any stats %w", err)
}
return thr.ttimed.Acquire(ctx)
if stats.MEMAlloc >= thr.threshold.MEMAlloc || stats.MEMSystem >= thr.threshold.MEMSystem ||
stats.CPUPause >= thr.threshold.CPUPause || stats.CPUUsage >= thr.threshold.CPUUsage {
return errors.New("throttler has exceed stats threshold")
}
return nil
}

func (thr tadaptive) Release(ctx context.Context) error {
return thr.ttimed.Release(ctx)
func (thr tmonitor) Release(context.Context) error {
return nil
}

type tcontext struct{}
type tmetric struct {
mtc Metric
}

func NewThrottlerContext() tcontext {
return tcontext{}
func NewThrottlerMetric(mtc Metric) tmetric {
return tmetric{mtc: mtc}
}

func (thr tcontext) Acquire(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("throttler has received context error %w", ctx.Err())
default:
return nil
func (thr tmetric) Acquire(ctx context.Context) error {
val, err := thr.mtc.Query(ctx)
if err != nil {
return fmt.Errorf("throttler hasn't found any metric %w", err)
}
if val {
return errors.New("throttler has reached metric threshold")
}
return nil
}

func (thr tcontext) Release(ctx context.Context) error {
func (thr tmetric) Release(context.Context) error {
return nil
}

Expand Down Expand Up @@ -512,6 +474,40 @@ func (thr tenqueue) Release(ctx context.Context) error {
return nil
}

type tadaptive struct {
ttimed
step uint64
thr Throttler
}

func NewThrottlerAdaptive(
threshold uint64,
interval time.Duration,
quantum time.Duration,
step uint64,
thr Throttler,
) *tadaptive {
return &tadaptive{
ttimed: NewThrottlerTimed(threshold, interval, quantum),
step: step,
thr: thr,
}
}

func (thr *tadaptive) Acquire(ctx context.Context) error {
err := thr.thr.Acquire(ctx)
if err != nil {
atomicBSub(&thr.ttimed.threshold, thr.step*thr.step)
} else {
atomicBAdd(&thr.ttimed.threshold, thr.step)
}
return thr.ttimed.Acquire(ctx)
}

func (thr tadaptive) Release(ctx context.Context) error {
return thr.ttimed.Release(ctx)
}

type Pattern struct {
Pattern *regexp.Regexp
Throttler Throttler
Expand Down
Loading

0 comments on commit 0323184

Please sign in to comment.