forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwhere.go
107 lines (89 loc) · 2.53 KB
/
where.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package kapacitor
import (
"errors"
"fmt"
"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
)
type WhereNode struct {
node
w *pipeline.WhereNode
endpoint string
expression stateful.Expression
scopePool stateful.ScopePool
}
// Create a new WhereNode which filters down the batch or stream by a condition
func newWhereNode(et *ExecutingTask, n *pipeline.WhereNode, d NodeDiagnostic) (wn *WhereNode, err error) {
wn = &WhereNode{
node: node{Node: n, et: et, diag: d},
w: n,
}
expr, err := stateful.NewExpression(n.Lambda.Expression)
if err != nil {
return nil, fmt.Errorf("Failed to compile expression in where clause: %v", err)
}
wn.expression = expr
wn.scopePool = stateful.NewScopePool(ast.FindReferenceVariables(n.Lambda.Expression))
wn.runF = wn.runWhere
if n.Lambda == nil {
return nil, errors.New("nil expression passed to WhereNode")
}
return
}
func (n *WhereNode) runWhere(snapshot []byte) error {
consumer := edge.NewGroupedConsumer(
n.ins[0],
n,
)
n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar())
return consumer.Consume()
}
func (n *WhereNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) {
return edge.NewReceiverFromForwardReceiverWithStats(
n.outs,
edge.NewTimedForwardReceiver(n.timer, n.newGroup()),
), nil
}
func (n *WhereNode) newGroup() *whereGroup {
return &whereGroup{
n: n,
expr: n.expression.CopyReset(),
}
}
type whereGroup struct {
n *WhereNode
expr stateful.Expression
}
func (g *whereGroup) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
begin = begin.ShallowCopy()
begin.SetSizeHint(0)
return begin, nil
}
func (g *whereGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
return g.doWhere(bp)
}
func (g *whereGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
return end, nil
}
func (g *whereGroup) Point(p edge.PointMessage) (edge.Message, error) {
return g.doWhere(p)
}
func (g *whereGroup) doWhere(p edge.FieldsTagsTimeGetterMessage) (edge.Message, error) {
pass, err := EvalPredicate(g.expr, g.n.scopePool, p)
if err != nil {
g.n.diag.Error("error while evaluating expression", err)
return nil, nil
}
if pass {
return p, nil
}
return nil, nil
}
func (g *whereGroup) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (g *whereGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}