forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
142 lines (128 loc) · 3.49 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package kapacitor
import (
"errors"
"fmt"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
)
type StreamNode struct {
node
s *pipeline.StreamNode
}
// Create a new StreamNode which copies all data to children
func newStreamNode(et *ExecutingTask, n *pipeline.StreamNode, d NodeDiagnostic) (*StreamNode, error) {
sn := &StreamNode{
node: node{Node: n, et: et, diag: d},
s: n,
}
sn.node.runF = sn.runSourceStream
return sn, nil
}
func (n *StreamNode) runSourceStream([]byte) error {
for m, ok := n.ins[0].Emit(); ok; m, ok = n.ins[0].Emit() {
for _, child := range n.outs {
err := child.Collect(m)
if err != nil {
return err
}
}
}
return nil
}
type FromNode struct {
node
s *pipeline.FromNode
expression stateful.Expression
scopePool stateful.ScopePool
tagNames []string
allDimensions bool
db string
rp string
name string
}
// Create a new FromNode which filters data from a source.
func newFromNode(et *ExecutingTask, n *pipeline.FromNode, d NodeDiagnostic) (*FromNode, error) {
sn := &FromNode{
node: node{Node: n, et: et, diag: d},
s: n,
db: n.Database,
rp: n.RetentionPolicy,
name: n.Measurement,
}
sn.node.runF = sn.runStream
sn.allDimensions, sn.tagNames = determineTagNames(n.Dimensions, nil)
if n.Lambda != nil {
expr, err := stateful.NewExpression(n.Lambda.Expression)
if err != nil {
return nil, fmt.Errorf("Failed to compile from expression: %v", err)
}
sn.expression = expr
sn.scopePool = stateful.NewScopePool(ast.FindReferenceVariables(n.Lambda.Expression))
}
return sn, nil
}
func (n *FromNode) runStream([]byte) error {
consumer := edge.NewConsumerWithReceiver(
n.ins[0],
edge.NewReceiverFromForwardReceiverWithStats(
n.outs,
edge.NewTimedForwardReceiver(n.timer, n),
),
)
return consumer.Consume()
}
func (n *FromNode) BeginBatch(edge.BeginBatchMessage) (edge.Message, error) {
return nil, errors.New("from does not support batch data")
}
func (n *FromNode) BatchPoint(edge.BatchPointMessage) (edge.Message, error) {
return nil, errors.New("from does not support batch data")
}
func (n *FromNode) EndBatch(edge.EndBatchMessage) (edge.Message, error) {
return nil, errors.New("from does not support batch data")
}
func (n *FromNode) Point(p edge.PointMessage) (edge.Message, error) {
if n.matches(p) {
p = p.ShallowCopy()
if n.s.Truncate != 0 {
p.SetTime(p.Time().Truncate(n.s.Truncate))
}
if n.s.Round != 0 {
p.SetTime(p.Time().Round(n.s.Round))
}
p.SetDimensions(models.Dimensions{
ByName: n.s.GroupByMeasurementFlag,
TagNames: computeTagNames(p.Tags(), n.allDimensions, n.tagNames, nil),
})
return p, nil
}
return nil, nil
}
func (n *FromNode) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (n *FromNode) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}
func (n *FromNode) matches(p edge.PointMessage) bool {
if n.db != "" && p.Database() != n.db {
return false
}
if n.rp != "" && p.RetentionPolicy() != n.rp {
return false
}
if n.name != "" && p.Name() != n.name {
return false
}
if n.expression != nil {
if pass, err := EvalPredicate(n.expression, n.scopePool, p); err != nil {
n.diag.Error("failed to evaluate WHERE expression", err)
return false
} else {
return pass
}
}
return true
}