forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtimed.go
84 lines (74 loc) · 1.85 KB
/
timed.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package edge
import "github.com/influxdata/kapacitor/timer"
type timedForwardReceiver struct {
timer timer.Timer
r ForwardReceiver
}
type timedForwardBufferedReceiver struct {
timedForwardReceiver
b ForwardBufferedReceiver
}
// NewTimedForwardReceiver creates a forward receiver which times the time spent in r.
func NewTimedForwardReceiver(t timer.Timer, r ForwardReceiver) ForwardReceiver {
b, ok := r.(ForwardBufferedReceiver)
if ok {
return &timedForwardBufferedReceiver{
timedForwardReceiver: timedForwardReceiver{
timer: t,
r: r,
},
b: b,
}
}
return &timedForwardReceiver{
timer: t,
r: r,
}
}
func (tr *timedForwardReceiver) BeginBatch(begin BeginBatchMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.r.BeginBatch(begin)
tr.timer.Stop()
return
}
func (tr *timedForwardReceiver) BatchPoint(bp BatchPointMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.r.BatchPoint(bp)
tr.timer.Stop()
return
}
func (tr *timedForwardReceiver) EndBatch(end EndBatchMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.r.EndBatch(end)
tr.timer.Stop()
return
}
func (tr *timedForwardBufferedReceiver) BufferedBatch(batch BufferedBatchMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.b.BufferedBatch(batch)
tr.timer.Stop()
return
}
func (tr *timedForwardReceiver) Point(p PointMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.r.Point(p)
tr.timer.Stop()
return
}
func (tr *timedForwardReceiver) Barrier(b BarrierMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.r.Barrier(b)
tr.timer.Stop()
return
}
func (tr *timedForwardReceiver) DeleteGroup(d DeleteGroupMessage) (m Message, err error) {
tr.timer.Start()
m, err = tr.r.DeleteGroup(d)
tr.timer.Stop()
return
}
func (tr *timedForwardReceiver) Done() {
tr.timer.Start()
tr.r.Done()
tr.timer.Stop()
}