forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
grouped.go
129 lines (113 loc) · 3.17 KB
/
grouped.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package edge
import (
"errors"
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
)
// GroupedConsumer reads messages off an edge and passes them by group to receivers created from a grouped receiver.
type GroupedConsumer interface {
Consumer
// CardinalityVar is an exported var that indicates the current number of groups being managed.
CardinalityVar() expvar.IntVar
}
// GroupedReceiver creates and deletes receivers as groups are created and deleted.
type GroupedReceiver interface {
// NewGroup signals that a new group has been discovered in the data.
// Information on the group and the message that first triggered its creation are provided.
NewGroup(group GroupInfo, first PointMeta) (Receiver, error)
}
// GroupInfo identifies and contians information about a specific group.
type GroupInfo struct {
ID models.GroupID
Tags models.Tags
Dimensions models.Dimensions
}
type groupedConsumer struct {
consumer Consumer
gr GroupedReceiver
groups map[models.GroupID]Receiver
current Receiver
cardinality *expvar.Int
}
// NewGroupedConsumer creates a new grouped consumer for edge e and grouped receiver r.
func NewGroupedConsumer(e Edge, r GroupedReceiver) GroupedConsumer {
gc := &groupedConsumer{
gr: r,
groups: make(map[models.GroupID]Receiver),
cardinality: new(expvar.Int),
}
gc.consumer = NewConsumerWithReceiver(e, gc)
return gc
}
func (c *groupedConsumer) Consume() error {
return c.consumer.Consume()
}
func (c *groupedConsumer) CardinalityVar() expvar.IntVar {
return c.cardinality
}
func (c *groupedConsumer) getOrCreateGroup(group GroupInfo, first PointMeta) (Receiver, error) {
r, ok := c.groups[group.ID]
if !ok {
c.cardinality.Add(1)
recv, err := c.gr.NewGroup(group, first)
if err != nil {
return nil, err
}
c.groups[group.ID] = recv
r = recv
}
return r, nil
}
func (c *groupedConsumer) BeginBatch(begin BeginBatchMessage) error {
r, err := c.getOrCreateGroup(begin.GroupInfo(), begin)
if err != nil {
return err
}
c.current = r
return r.BeginBatch(begin)
}
func (c *groupedConsumer) BatchPoint(p BatchPointMessage) error {
if c.current == nil {
return errors.New("received batch point without batch")
}
return c.current.BatchPoint(p)
}
func (c *groupedConsumer) EndBatch(end EndBatchMessage) error {
err := c.current.EndBatch(end)
c.current = nil
return err
}
func (c *groupedConsumer) BufferedBatch(batch BufferedBatchMessage) error {
begin := batch.Begin()
r, err := c.getOrCreateGroup(begin.GroupInfo(), begin)
if err != nil {
return err
}
return receiveBufferedBatch(r, batch)
}
func (c *groupedConsumer) Point(p PointMessage) error {
r, err := c.getOrCreateGroup(p.GroupInfo(), p)
if err != nil {
return err
}
return r.Point(p)
}
func (c *groupedConsumer) Barrier(b BarrierMessage) error {
// Barriers messages apply to all gorups
for _, r := range c.groups {
if err := r.Barrier(b); err != nil {
return err
}
}
return nil
}
func (c *groupedConsumer) DeleteGroup(d DeleteGroupMessage) error {
id := d.GroupID()
r, ok := c.groups[id]
if ok {
delete(c.groups, id)
c.cardinality.Add(-1)
return r.DeleteGroup(d)
}
return nil
}