Skip to content

Commit

Permalink
support group by time offset (influxdata#596)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook committed Jun 2, 2016
1 parent 909ad89 commit cbfe211
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ In order to know if subscription writes are being dropped you should monitor the
- [#259](https://github.com/influxdata/kapacitor/issues/259): Template Tasks have been added.
- [#562](https://github.com/influxdata/kapacitor/pull/562): HTTP based subscriptions.
- [#595](https://github.com/influxdata/kapacitor/pull/595): Support counting and summing empty batches to 0.
- [#596](https://github.com/influxdata/kapacitor/pull/596): Support new group by time offset i.e. time(30s, 5s)


### Bugfixes
Expand Down
57 changes: 55 additions & 2 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,7 @@ batch

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_CountEmptyBatch(t *testing.T) {

var script = `
batch
|query('''
Expand Down Expand Up @@ -397,6 +395,61 @@ batch
testBatcherWithOutput(t, "TestBatch_CountEmptyBatch", script, 30*time.Second, er)
}

func TestBatch_GroupBy_TimeOffset(t *testing.T) {

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA'
''')
.period(10s)
.every(10s)
.groupBy(time(2s, 1s), 'cpu')
|count('mean')
|window()
.period(20s)
.every(20s)
|sum('count')
|httpOut('TestBatch_SimpleMR')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu-total"},
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
10.0,
}},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu0"},
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
10.0,
}},
},
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
10.0,
}},
},
},
}

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_Default(t *testing.T) {

var script = `
Expand Down
16 changes: 15 additions & 1 deletion pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,12 +151,26 @@ func (n *QueryNode) ChainMethods() map[string]reflect.Value {
//
// This property adds a `GROUP BY` clause to the query
// so all the normal behaviors when quering InfluxDB with a `GROUP BY` apply.
// More details: https://influxdb.com/docs/v0.9/query_language/data_exploration.html#the-group-by-clause
//
// Example:
// batch
// |query(...)
// .groupBy(time(10s), 'tag1', 'tag2'))
// .align()
//
// A group by time offset is also possible
//
// Example:
// batch
// |query(...)
// .groupBy(time(10s, -5s), 'tag1', 'tag2'))
// .align()
// .offset(5s)
//
// It is recommended to use QueryNode.Align and QueryNode.Offset in conjunction with
// group by time dimensions so that the time bounds match up with the group by intervals.
//
// NOTE: Since QueryNode.Offset is inherently a negative property the second "offset" argument to the "time" function is negative to match.
//
// tick:property
func (b *QueryNode) GroupBy(d ...interface{}) *QueryNode {
Expand Down
34 changes: 34 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ func (q *Query) Dimensions(dims []interface{}) error {
&influxql.Dimension{
Expr: &influxql.Wildcard{},
})
case TimeDimension:
q.stmt.Dimensions = append(q.stmt.Dimensions,
&influxql.Dimension{
Expr: &influxql.Call{
Name: "time",
Args: []influxql.Expr{
&influxql.DurationLiteral{
Val: dim.Length,
},
&influxql.DurationLiteral{
Val: dim.Offset,
},
},
},
})

default:
return fmt.Errorf("invalid dimension type:%T, must be string or time.Duration", d)
Expand All @@ -141,3 +156,22 @@ func (q *Query) Fill(option influxql.FillOption, value interface{}) {
func (q *Query) String() string {
return q.stmt.String()
}

type TimeDimension struct {
Length time.Duration
Offset time.Duration
}

func groupByTime(length time.Duration, offset ...time.Duration) (TimeDimension, error) {
var o time.Duration
if l := len(offset); l == 1 {
o = offset[0]

} else if l != 0 {
return TimeDimension{}, fmt.Errorf("time() function expects 1 or 2 args, got %d", l+1)
}
return TimeDimension{
Length: length,
Offset: o,
}, nil
}
2 changes: 1 addition & 1 deletion task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (tm *TaskMaster) waitForForks() {

func (tm *TaskMaster) CreateTICKScope() *stateful.Scope {
scope := stateful.NewScope()
scope.Set("time", func(d time.Duration) time.Duration { return d })
scope.Set("time", groupByTime)
// Add dynamic methods to the scope for UDFs
if tm.UDFService != nil {
for _, f := range tm.UDFService.List() {
Expand Down

0 comments on commit cbfe211

Please sign in to comment.