forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
78 lines (70 loc) · 1.73 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package kapacitor
import (
"log"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/tick"
)
type StreamNode struct {
node
s *pipeline.StreamNode
expression *tick.StatefulExpr
dimensions []string
allDimensions bool
db string
rp string
name string
}
// Create a new StreamNode which filters data from a source.
func newStreamNode(et *ExecutingTask, n *pipeline.StreamNode, l *log.Logger) (*StreamNode, error) {
sn := &StreamNode{
node: node{Node: n, et: et, logger: l},
s: n,
db: n.Database,
rp: n.RetentionPolicy,
name: n.Measurement,
}
sn.node.runF = sn.runStream
sn.allDimensions, sn.dimensions = determineDimensions(n.Dimensions)
if n.Expression != nil {
sn.expression = tick.NewStatefulExpr(n.Expression)
}
return sn, nil
}
func (s *StreamNode) runStream([]byte) error {
for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() {
if s.matches(pt) {
if s.s.Truncate != 0 {
pt.Time = pt.Time.Truncate(s.s.Truncate)
}
pt = setGroupOnPoint(pt, s.allDimensions, s.dimensions)
for _, child := range s.outs {
err := child.CollectPoint(pt)
if err != nil {
return err
}
}
}
}
return nil
}
func (s *StreamNode) matches(p models.Point) bool {
if s.db != "" && p.Database != s.db {
return false
}
if s.rp != "" && p.RetentionPolicy != s.rp {
return false
}
if s.name != "" && p.Name != s.name {
return false
}
if s.expression != nil {
if pass, err := EvalPredicate(s.expression, p.Fields, p.Tags); err != nil {
s.logger.Println("E! error while evaluating WHERE expression:", err)
return false
} else {
return pass
}
}
return true
}