diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fd08def5..85e278220 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,6 +120,7 @@ In order to know if subscription writes are being dropped you should monitor the - [#259](https://github.com/influxdata/kapacitor/issues/259): Template Tasks have been added. - [#562](https://github.com/influxdata/kapacitor/pull/562): HTTP based subscriptions. - [#595](https://github.com/influxdata/kapacitor/pull/595): Support counting and summing empty batches to 0. +- [#596](https://github.com/influxdata/kapacitor/pull/596): Support new group by time offset i.e. time(30s, 5s) ### Bugfixes diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index 8fb8b8152..5a4eb6ba6 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -288,9 +288,7 @@ batch testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er) } - func TestBatch_CountEmptyBatch(t *testing.T) { - var script = ` batch |query(''' @@ -397,6 +395,61 @@ batch testBatcherWithOutput(t, "TestBatch_CountEmptyBatch", script, 30*time.Second, er) } +func TestBatch_GroupBy_TimeOffset(t *testing.T) { + + var script = ` +batch + |query(''' + SELECT mean("value") + FROM "telegraf"."default".cpu_usage_idle + WHERE "host" = 'serverA' +''') + .period(10s) + .every(10s) + .groupBy(time(2s, 1s), 'cpu') + |count('mean') + |window() + .period(20s) + .every(20s) + |sum('count') + |httpOut('TestBatch_SimpleMR') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu-total"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 10.0, + }}, + }, + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu0"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 10.0, + }}, + }, + { + Name: "cpu_usage_idle", + Tags: map[string]string{"cpu": "cpu1"}, + Columns: []string{"time", "sum"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC), + 10.0, + }}, + }, + }, + } + + testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er) +} + func TestBatch_Default(t *testing.T) { var script = ` diff --git a/pipeline/batch.go b/pipeline/batch.go index f152b7d9c..718dd103e 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -151,12 +151,26 @@ func (n *QueryNode) ChainMethods() map[string]reflect.Value { // // This property adds a `GROUP BY` clause to the query // so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply. -// More details: https://influxdb.com/docs/v0.9/query_language/data_exploration.html#the-group-by-clause // // Example: // batch // |query(...) // .groupBy(time(10s), 'tag1', 'tag2')) +// .align() +// +// A group by time offset is also possible +// +// Example: +// batch +// |query(...) +// .groupBy(time(10s, -5s), 'tag1', 'tag2')) +// .align() +// .offset(5s) +// +// It is recommended to use QueryNode.Align and QueryNode.Offset in conjunction with +// group by time dimensions so that the time bounds match up with the group by intervals. +// +// NOTE: Since QueryNode.Offset is inherently a negative property the second "offset" argument to the "time" function is negative to match. // // tick:property func (b *QueryNode) GroupBy(d ...interface{}) *QueryNode { diff --git a/query.go b/query.go index a3f76634a..c5e7d2f94 100644 --- a/query.go +++ b/query.go @@ -124,6 +124,21 @@ func (q *Query) Dimensions(dims []interface{}) error { &influxql.Dimension{ Expr: &influxql.Wildcard{}, }) + case TimeDimension: + q.stmt.Dimensions = append(q.stmt.Dimensions, + &influxql.Dimension{ + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{ + Val: dim.Length, + }, + &influxql.DurationLiteral{ + Val: dim.Offset, + }, + }, + }, + }) default: return fmt.Errorf("invalid dimension type:%T, must be string or time.Duration", d) @@ -141,3 +156,22 @@ func (q *Query) Fill(option influxql.FillOption, value interface{}) { func (q *Query) String() string { return q.stmt.String() } + +type TimeDimension struct { + Length time.Duration + Offset time.Duration +} + +func groupByTime(length time.Duration, offset ...time.Duration) (TimeDimension, error) { + var o time.Duration + if l := len(offset); l == 1 { + o = offset[0] + + } else if l != 0 { + return TimeDimension{}, fmt.Errorf("time() function expects 1 or 2 args, got %d", l+1) + } + return TimeDimension{ + Length: length, + Offset: o, + }, nil +} diff --git a/task_master.go b/task_master.go index 7810fd1db..0abc34b8b 100644 --- a/task_master.go +++ b/task_master.go @@ -292,7 +292,7 @@ func (tm *TaskMaster) waitForForks() { func (tm *TaskMaster) CreateTICKScope() *stateful.Scope { scope := stateful.NewScope() - scope.Set("time", func(d time.Duration) time.Duration { return d }) + scope.Set("time", groupByTime) // Add dynamic methods to the scope for UDFs if tm.UDFService != nil { for _, f := range tm.UDFService.List() {