forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch.go
351 lines (319 loc) · 9.47 KB
/
batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
package pipeline
import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"time"
"github.com/influxdata/influxdb/influxql"
)
// A node that handles creating several child QueryNodes.
// Each call to `query` creates a child batch node that
// can further be configured. See QueryNode
// The `batch` variable in batch tasks is an instance of
// a BatchNode.
//
// Example:
// var errors = batch
// |query('SELECT value from errors')
// ...
// var views = batch
// |query('SELECT value from views')
// ...
//
// Available Statistics:
//
// * query_errors -- number of errors when querying
// * batches_queried -- number of batches returned from queries
// * points_queried -- total number of points in batches
//
type BatchNode struct {
node
}
func newBatchNode() *BatchNode {
return &BatchNode{
node: node{
desc: "batch",
wants: NoEdge,
provides: BatchEdge,
},
}
}
// MarshalJSON converts BatchNode to JSON
// tick:ignore
func (n *BatchNode) MarshalJSON() ([]byte, error) {
type Alias BatchNode
var raw = &struct {
TypeOf
*Alias
}{
TypeOf: TypeOf{
Type: "batch",
ID: n.ID(),
},
Alias: (*Alias)(n),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an BatchNode
// tick:ignore
func (n *BatchNode) UnmarshalJSON(data []byte) error {
type Alias BatchNode
var raw = &struct {
TypeOf
*Alias
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "batch" {
return fmt.Errorf("error unmarshaling node %d of type %s as BatchNode", raw.ID, raw.Type)
}
n.setID(raw.ID)
return nil
}
// The query to execute. Must not contain a time condition
// in the `WHERE` clause or contain a `GROUP BY` clause.
// The time conditions are added dynamically according to the period, offset and schedule.
// The `GROUP BY` clause is added dynamically according to the dimensions
// passed to the `groupBy` method.
func (b *BatchNode) Query(q string) *QueryNode {
n := newQueryNode()
n.QueryStr = q
b.linkChild(n)
return n
}
// Do not add the source batch node to the dot output
// since its not really an edge.
// tick:ignore
func (b *BatchNode) dot(buf *bytes.Buffer) {
}
// A QueryNode defines a source and a schedule for
// processing batch data. The data is queried from
// an InfluxDB database and then passed into the data pipeline.
//
// Example:
// batch
// |query('''
// SELECT mean("value")
// FROM "telegraf"."default".cpu_usage_idle
// WHERE "host" = 'serverA'
// ''')
// .period(1m)
// .every(20s)
// .groupBy(time(10s), 'cpu')
// ...
//
// In the above example InfluxDB is queried every 20 seconds; the window of time returned
// spans 1 minute and is grouped into 10 second buckets.
type QueryNode struct {
chainnode `json:"-"`
// The query text
//tick:ignore
QueryStr string `json:"queryStr"`
// The period or length of time that will be queried from InfluxDB
Period time.Duration `json:"period"`
// How often to query InfluxDB.
//
// The Every property is mutually exclusive with the Cron property.
Every time.Duration `json:"every"`
// Align start and end times with the Every value
// Does not apply if Cron is used.
// tick:ignore
AlignFlag bool `tick:"Align" json:"align"`
// Define a schedule using a cron syntax.
//
// The specific cron implementation is documented here:
// https://github.com/gorhill/cronexpr#implementation
//
// The Cron property is mutually exclusive with the Every property.
Cron string `json:"cron"`
// How far back in time to query from the current time
//
// For example an Offest of 2 hours and an Every of 5m,
// Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago.
//
// This applies to Cron schedules as well. If the cron specifies to run every Sunday at
// 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried.
Offset time.Duration `json:"offset"`
// Align the group by time intervals with the start time of the query
// tick:ignore
AlignGroupFlag bool `tick:"AlignGroup" json:"alignGroup"`
// The list of dimensions for the group-by clause.
//tick:ignore
Dimensions []interface{} `tick:"GroupBy" json:"groupBy"`
// Whether to include the measurement in the group ID.
// tick:ignore
GroupByMeasurementFlag bool `tick:"GroupByMeasurement" json:"groupByMeasurement"`
// Fill the data.
// Options are:
//
// - Any numerical value
// - null - exhibits the same behavior as the default
// - previous - reports the value of the previous window
// - none - suppresses timestamps and values where the value is null
// - linear - reports the results of linear interpolation
Fill interface{} `json:"fill"`
// The name of a configured InfluxDB cluster.
// If empty the default cluster will be used.
Cluster string `json:"cluster"`
}
func newQueryNode() *QueryNode {
b := &QueryNode{
chainnode: newBasicChainNode("query", BatchEdge, BatchEdge),
}
return b
}
// MarshalJSON converts QueryNode to JSON
// tick:ignore
func (n *QueryNode) MarshalJSON() ([]byte, error) {
type Alias QueryNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Every string `json:"every"`
Offset string `json:"offset"`
}{
TypeOf: TypeOf{
Type: "query",
ID: n.ID(),
},
Alias: (*Alias)(n),
Period: influxql.FormatDuration(n.Period),
Every: influxql.FormatDuration(n.Every),
Offset: influxql.FormatDuration(n.Offset),
}
return json.Marshal(raw)
}
// UnmarshalJSON converts JSON to an QueryNode
// tick:ignore
func (n *QueryNode) UnmarshalJSON(data []byte) error {
type Alias QueryNode
var raw = &struct {
TypeOf
*Alias
Period string `json:"period"`
Every string `json:"every"`
Offset string `json:"offset"`
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "query" {
return fmt.Errorf("error unmarshaling node %d of type %s as QueryNode", raw.ID, raw.Type)
}
n.Period, err = influxql.ParseDuration(raw.Period)
if err != nil {
return err
}
n.Every, err = influxql.ParseDuration(raw.Every)
if err != nil {
return err
}
n.Offset, err = influxql.ParseDuration(raw.Offset)
if err != nil {
return err
}
n.setID(raw.ID)
return nil
}
//tick:ignore
func (n *QueryNode) ChainMethods() map[string]reflect.Value {
return map[string]reflect.Value{
"GroupBy": reflect.ValueOf(n.chainnode.GroupBy),
}
}
// Group the data by a set of dimensions.
// Can specify one time dimension.
//
// This property adds a `GROUP BY` clause to the query
// so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply.
//
// Use group by time when your period is longer than your group by time interval.
//
// Example:
// batch
// |query(...)
// .period(1m)
// .every(1m)
// .groupBy(time(10s), 'tag1', 'tag2'))
// .align()
//
// A group by time offset is also possible.
//
// Example:
// batch
// |query(...)
// .period(1m)
// .every(1m)
// .groupBy(time(10s, -5s), 'tag1', 'tag2'))
// .align()
// .offset(5s)
//
// It is recommended to use QueryNode.Align and QueryNode.Offset in conjunction with
// group by time dimensions so that the time bounds match up with the group by intervals.
// To automatically align the group by intervals to the start of the query time,
// use QueryNode.AlignGroup. This is useful in more complex situations, such as when
// the groupBy time period is longer than the query frequency.
//
// Example:
// batch
// |query(...)
// .period(5m)
// .every(30s)
// .groupBy(time(1m), 'tag1', 'tag2')
// .align()
// .alignGroup()
//
// For the above example, without QueryNode.AlignGroup, every other query issued by Kapacitor
// (at :30 past the minute) will align to :00 seconds instead of the desired :30 seconds,
// which would create 6 group by intervals instead of 5, the first and last of which
// would only have 30 seconds of data instead of a full minute.
// If the group by time offset (i.e. time(t, offset)) is used in conjunction with
// QueryNode.AlignGroup, the alignment will occur first, and will be offset
// the specified amount after.
//
// NOTE: Since QueryNode.Offset is inherently a negative property the second "offset" argument to the "time" function is negative to match.
//
// tick:property
func (b *QueryNode) GroupBy(d ...interface{}) *QueryNode {
b.Dimensions = d
return b
}
// If set will include the measurement name in the group ID.
// Along with any other group by dimensions.
//
// Example:
// batch
// |query('SELECT sum("value") FROM "telegraf"."autogen"./process_.*/')
// .groupByMeasurement()
// .groupBy('host')
//
// The above example selects data from several measurements matching `/process_.*/ and
// then each point is grouped by the host tag and measurement name.
// Thus keeping measurements in their own groups.
// tick:property
func (n *QueryNode) GroupByMeasurement() *QueryNode {
n.GroupByMeasurementFlag = true
return n
}
// Align start and stop times for quiries with even boundaries of the QueryNode.Every property.
// Does not apply if using the QueryNode.Cron property.
// tick:property
func (b *QueryNode) Align() *QueryNode {
b.AlignFlag = true
return b
}
// Align the group by time intervals with the start time of the query
// tick:property
func (b *QueryNode) AlignGroup() *QueryNode {
b.AlignGroupFlag = true
return b
}