forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
100 lines (90 loc) · 2.84 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package pipeline
import (
"time"
"github.com/influxdb/kapacitor/tick"
)
// A StreamNode represents the source of data being
// streamed to Kapacitor via any of its inputs.
// The stream node allows you to select which portion of the stream
// you want to process.
// The `stream` variable in stream tasks is an instance of
// a StreamNode.
//
// Example:
// stream
// .from()
// .database('mydb')
// .retentionPolicy('myrp')
// .measurement('mymeasurement')
// .where(lambda: "host" =~ /logger\d+/)
// .window()
// ...
//
// The above example selects only data points from the database `mydb`
// and retention policy `myrp` and measurement `mymeasurement` where
// the tag `host` matches the regex `logger\d+`
type StreamNode struct {
chainnode
// An expression to filter the data stream.
// tick:ignore
Expression tick.Node
// The database name.
// If empty any database will be used.
Database string
// The retention policy name
// If empty any retention policy will be used.
RetentionPolicy string
// The measurement name
// If empty any measurement will be used.
Measurement string
// Optional duration for truncating timestamps.
// Helpful to ensure data points land on specfic boundaries
// Example:
// stream
// .from().measurement('mydata')
// .truncate(1s)
//
// All incoming data will be truncated to 1 second resolution.
Truncate time.Duration
}
func newStreamNode() *StreamNode {
return &StreamNode{
chainnode: newBasicChainNode("stream", StreamEdge, StreamEdge),
}
}
// Creates a new stream node that can be further
// filtered using the Database, RetentionPolicy, Measurement and Where properties.
// From can be called multiple times to create multiple
// independent forks of the data stream.
//
// Example:
// // Select the 'cpu' measurement from just the database 'mydb'
// // and retention policy 'myrp'.
// var cpu = stream.from()
// .database('mydb')
// .retentionPolicy('myrp')
// .measurement('cpu')
// // Select the 'load' measurement from any database and retention policy.
// var load = stream.from()
// .measurement('load')
// // Join cpu and load streams and do further processing.
// cpu.join(load)
// .as('cpu', 'load')
// ...
//
func (s *StreamNode) From() *StreamNode {
f := newStreamNode()
s.linkChild(f)
return f
}
// Filter the current stream using the given expression.
// This expression is a Kapacitor expression. Kapacitor
// expressions are a superset of InfluxQL WHERE expressions.
// See the `Expression` docs for more information.
//
// If empty then all data points are considered to match.
// tick:property
func (s *StreamNode) Where(expression tick.Node) *StreamNode {
s.Expression = expression
return s
}