forked from bnb-chain/bsc
-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathmeter.go
169 lines (145 loc) · 4.41 KB
/
meter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package metrics
import (
"math"
"sync"
"sync/atomic"
"time"
)
// GetOrRegisterMeter returns an existing Meter or constructs and registers a
// new Meter.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func GetOrRegisterMeter(name string, r Registry) *Meter {
if r == nil {
r = DefaultRegistry
}
return r.GetOrRegister(name, NewMeter).(*Meter)
}
// NewMeter constructs a new Meter and launches a goroutine.
// Be sure to call Stop() once the meter is of no use to allow for garbage collection.
func NewMeter() *Meter {
m := newMeter()
arbiter.add(m)
return m
}
// NewInactiveMeter returns a meter but does not start any goroutines. This
// method is mainly intended for testing.
func NewInactiveMeter() *Meter {
return newMeter()
}
// NewRegisteredMeter constructs and registers a new Meter
// and launches a goroutine.
// Be sure to unregister the meter from the registry once it is of no use to
// allow for garbage collection.
func NewRegisteredMeter(name string, r Registry) *Meter {
return GetOrRegisterMeter(name, r)
}
// MeterSnapshot is a read-only copy of the meter's internal values.
type MeterSnapshot struct {
count int64
rate1, rate5, rate15, rateMean float64
}
// Count returns the count of events at the time the snapshot was taken.
func (m *MeterSnapshot) Count() int64 { return m.count }
// Rate1 returns the one-minute moving average rate of events per second at the
// time the snapshot was taken.
func (m *MeterSnapshot) Rate1() float64 { return m.rate1 }
// Rate5 returns the five-minute moving average rate of events per second at
// the time the snapshot was taken.
func (m *MeterSnapshot) Rate5() float64 { return m.rate5 }
// Rate15 returns the fifteen-minute moving average rate of events per second
// at the time the snapshot was taken.
func (m *MeterSnapshot) Rate15() float64 { return m.rate15 }
// RateMean returns the meter's mean rate of events per second at the time the
// snapshot was taken.
func (m *MeterSnapshot) RateMean() float64 { return m.rateMean }
// Meter count events to produce exponentially-weighted moving average rates
// at one-, five-, and fifteen-minutes and a mean rate.
type Meter struct {
count atomic.Int64
uncounted atomic.Int64 // not yet added to the EWMAs
rateMean atomic.Uint64
a1, a5, a15 *EWMA
startTime time.Time
stopped atomic.Bool
}
func newMeter() *Meter {
return &Meter{
a1: NewEWMA1(),
a5: NewEWMA5(),
a15: NewEWMA15(),
startTime: time.Now(),
}
}
// Stop stops the meter, Mark() will be a no-op if you use it after being stopped.
func (m *Meter) Stop() {
if stopped := m.stopped.Swap(true); !stopped {
arbiter.remove(m)
}
}
// Mark records the occurrence of n events.
func (m *Meter) Mark(n int64) {
m.uncounted.Add(n)
}
// Snapshot returns a read-only copy of the meter.
func (m *Meter) Snapshot() *MeterSnapshot {
return &MeterSnapshot{
count: m.count.Load() + m.uncounted.Load(),
rate1: m.a1.Snapshot().Rate(),
rate5: m.a5.Snapshot().Rate(),
rate15: m.a15.Snapshot().Rate(),
rateMean: math.Float64frombits(m.rateMean.Load()),
}
}
func (m *Meter) tick() {
// Take the uncounted values, add to count
n := m.uncounted.Swap(0)
count := m.count.Add(n)
m.rateMean.Store(math.Float64bits(float64(count) / time.Since(m.startTime).Seconds()))
// Update the EWMA's internal state
m.a1.Update(n)
m.a5.Update(n)
m.a15.Update(n)
// And trigger them to calculate the rates
m.a1.tick()
m.a5.tick()
m.a15.tick()
}
var arbiter = meterTicker{meters: make(map[*Meter]struct{})}
// meterTicker ticks meters every 5s from a single goroutine.
// meters are references in a set for future stopping.
type meterTicker struct {
mu sync.RWMutex
started bool
meters map[*Meter]struct{}
}
// add adds another *Meter ot the arbiter, and starts the arbiter ticker.
func (ma *meterTicker) add(m *Meter) {
ma.mu.Lock()
defer ma.mu.Unlock()
ma.meters[m] = struct{}{}
if !ma.started {
ma.started = true
go ma.loop()
}
}
// remove removes a meter from the set of ticked meters.
func (ma *meterTicker) remove(m *Meter) {
ma.mu.Lock()
delete(ma.meters, m)
ma.mu.Unlock()
}
// loop ticks meters on a 5 second interval.
func (ma *meterTicker) loop() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
if !metricsEnabled {
continue
}
ma.mu.RLock()
for meter := range ma.meters {
meter.tick()
}
ma.mu.RUnlock()
}
}