forked from grafana/carbon-relay-ng
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathspool.go
134 lines (126 loc) · 5.5 KB
/
spool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
package main
import (
"github.com/Dieterbe/go-metrics"
"github.com/graphite-ng/carbon-relay-ng/nsqd"
"time"
)
// sits in front of nsqd diskqueue.
// provides buffering (to accept input while storage is slow / sync() runs -every 1000 items- etc)
// QoS (RT vs Bulk) and controllable i/o rates
type Spool struct {
key string
InRT chan []byte
InBulk chan []byte
Out chan []byte
spoolSleep time.Duration // how long to wait between stores to spool
unspoolSleep time.Duration // how long to wait between loads from spool
queue *nsqd.DiskQueue
queueBuffer chan []byte // buffer metrics into queue because it can block
durationWrite metrics.Timer
durationBuffer metrics.Timer
numBuffered metrics.Counter // track watermark on read and write
// metrics we could do but i don't think that useful: diskqueue depth, amount going in/out diskqueue
numIncomingBulk metrics.Counter // sync channel, no need to track watermark, instead we track number seen on read
numIncomingRT metrics.Counter // more or less sync (small buff). we track number of drops in dest so no need for watermark, instead we track num seen on read
}
// parameters should be tuned so that:
// can buffer packets for the duration of 1 sync
// buffer no more then needed, esp if we know the queue is slower then the ingest rate
func NewSpool(key, spoolDir string) *Spool {
dqName := "spool_" + key
// on our virtualized box i see mean write of around 100 micros upto 250 micros, max up to 200 millis.
// in 200 millis we can get up to 10k metrics, so let's make that our queueBuffer size
// for bulk, leaving 500 micros in between every metric should be enough.
// TODO make all these configurable:
queueBuffer := 10000
maxBytesPerFile := int64(200 * 1024 * 1024)
syncEvery := int64(10000)
periodSync := 1 * time.Second
queue := nsqd.NewDiskQueue(dqName, spoolDir, maxBytesPerFile, syncEvery, periodSync).(*nsqd.DiskQueue)
spoolSleep := time.Duration(500) * time.Microsecond
unspoolSleep := time.Duration(10) * time.Microsecond
s := Spool{
key: key,
InRT: make(chan []byte, 10),
InBulk: make(chan []byte),
Out: NewSlowChan(queue.ReadChan(), unspoolSleep),
spoolSleep: spoolSleep,
unspoolSleep: unspoolSleep,
queue: queue,
queueBuffer: make(chan []byte, queueBuffer),
durationWrite: Timer("spool=" + key + ".unit=ns.operation=write"),
durationBuffer: Timer("spool=" + key + ".unit=ns.operation=buffer"),
numBuffered: Counter("spool=" + key + ".unit=Metric.status=buffered"),
numIncomingRT: Counter("spool=" + key + ".target_type=count.unit=Metric.status=incomingRT"),
numIncomingBulk: Counter("spool=" + key + ".target_type=count.unit=Metric.status=incomingBulk"),
}
go s.Writer()
go s.Buffer()
return &s
}
// provides a channel based api to the queue
func (s *Spool) Writer() {
// we always try to serve realtime traffic as much as we can
// because that's an inputstream at a fixed rate, it won't slow down
// but if no realtime traffic is coming in, then we can use spare capacity
// to read from the Bulk input, which is used to offload a known (potentially large) set of data
// that could easily exhaust the capacity of our disk queue. But it doesn't require RT processing,
// so just handle this to the extent we can
// note that this still allows for channel ops to come in on InRT and to be starved, resulting
// in some realtime traffic to be dropped, but that shouldn't be too much of an issue. experience will tell..
for {
select {
// it looks like this is never taken though, it's always RT 2 . oh well.
// because: first time we're here, there's probably nothing here, so it hangs in the default
// when it processed default:
// it just read from InRT: it's highly unlikely there's another msg ready, so we go to default again
// it just read from InBulk: *because* there was nothing on RT, so it's highly ulikely something will be there straight after, so default again
case buf := <-s.InRT:
s.numIncomingRT.Inc(1)
//pre = time.Now()
log.Debug("spool %v satisfying spool RT 1", s.key)
log.Info("spool %s %s Writer -> queue.Put\n", s.key, string(buf))
s.durationBuffer.Time(func() { s.queueBuffer <- buf })
s.numBuffered.Inc(1)
//post = time.Now()
//fmt.Println("queueBuffer duration RT 1:", post.Sub(pre).Nanoseconds())
default:
select {
case buf := <-s.InRT:
s.numIncomingRT.Inc(1)
//pre = time.Now()
log.Debug("spool %v satisfying spool RT 2", s.key)
log.Info("spool %s %s Writer -> queue.Put\n", s.key, string(buf))
s.durationBuffer.Time(func() { s.queueBuffer <- buf })
s.numBuffered.Inc(1)
//post = time.Now()
//fmt.Println("queueBuffer duration RT 2:", post.Sub(pre).Nanoseconds())
case buf := <-s.InBulk:
s.numIncomingBulk.Inc(1)
//pre = time.Now()
log.Debug("spool %v satisfying spool BULK", s.key)
log.Info("spool %s %s Writer -> queue.Put\n", s.key, string(buf))
s.durationBuffer.Time(func() { s.queueBuffer <- buf })
s.numBuffered.Inc(1)
//post = time.Now()
//fmt.Println("queueBuffer duration BULK:", post.Sub(pre).Nanoseconds())
}
}
}
}
func (s *Spool) Ingest(bulkData [][]byte) {
for _, buf := range bulkData {
s.InBulk <- buf
time.Sleep(s.spoolSleep)
}
}
func (s *Spool) Buffer() {
// TODO clean shutdown
for buf := range s.queueBuffer {
s.numBuffered.Dec(1)
//pre := time.Now()
s.durationWrite.Time(func() { s.queue.Put(buf) })
//post := time.Now()
//fmt.Println("PUT DURATION", post.Sub(pre).Nanoseconds())
}
}