Skip to content

Commit

Permalink
fix bug where aggregate operations could not change type
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 12, 2017
1 parent d13c950 commit 7dfb18d
Showing 8 changed files with 97 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -83,6 +83,7 @@ kapacitor define-handler system aggregate_by_1m.yaml
- [#1068](https://github.com/influxdata/kapacitor/issues/1068): Fix dot view syntax to use xlabels and not create invalid quotes.
- [#1295](https://github.com/influxdata/kapacitor/issues/1295): Fix curruption of recordings list after deleting all recordings.
- [#1237](https://github.com/influxdata/kapacitor/issues/1237): Fix missing "vars" key when listing tasks.
- [#1271](https://github.com/influxdata/kapacitor/issues/1271): Fix bug where aggregates would not be able to change type.

## v1.2.0 [2017-01-23]

15 changes: 8 additions & 7 deletions influxql.gen.go
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ package kapacitor

import (
"fmt"
"reflect"
"time"

"github.com/influxdata/influxdb/influxql"
@@ -1151,10 +1152,10 @@ type booleanBulkReduceContext struct {
booleanPointEmitter
}

func determineReduceContextCreateFn(method string, value interface{}, rc pipeline.ReduceCreater) (fn createReduceContextFunc, err error) {
switch value.(type) {
func determineReduceContextCreateFn(method string, kind reflect.Kind, rc pipeline.ReduceCreater) (fn createReduceContextFunc, err error) {
switch kind {

case float64:
case reflect.Float64:
switch {

case rc.CreateFloatReducer != nil:
@@ -1301,7 +1302,7 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
err = fmt.Errorf("cannot apply %s to float64 field", method)
}

case int64:
case reflect.Int64:
switch {

case rc.CreateIntegerFloatReducer != nil:
@@ -1448,7 +1449,7 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
err = fmt.Errorf("cannot apply %s to int64 field", method)
}

case string:
case reflect.String:
switch {

case rc.CreateStringFloatReducer != nil:
@@ -1595,7 +1596,7 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
err = fmt.Errorf("cannot apply %s to string field", method)
}

case bool:
case reflect.Bool:
switch {

case rc.CreateBooleanFloatReducer != nil:
@@ -1743,7 +1744,7 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
}

default:
err = fmt.Errorf("invalid field type: %T", value)
err = fmt.Errorf("invalid field kind: %v", kind)
}
return
}
9 changes: 5 additions & 4 deletions influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ package kapacitor
import (
"fmt"
"time"
"reflect"

"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/models"
@@ -274,10 +275,10 @@ type {{$a.name}}Bulk{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}ReduceContext str

{{/* Define switch cases for reduceContext contruction */}}

func determineReduceContextCreateFn(method string, value interface{}, rc pipeline.ReduceCreater) (fn createReduceContextFunc, err error) {
switch value.(type) {
func determineReduceContextCreateFn(method string, kind reflect.Kind, rc pipeline.ReduceCreater) (fn createReduceContextFunc, err error) {
switch kind {
{{range $a := $types}}
case {{.Type}}:
case {{.Kind}}:
switch {
{{range $e := $types}}
case rc.Create{{$a.Name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer != nil:
@@ -320,7 +321,7 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
}
{{end}}
default:
err = fmt.Errorf("invalid field type: %T", value)
err = fmt.Errorf("invalid field kind: %v", kind)
}
return
}
29 changes: 19 additions & 10 deletions influxql.go
Original file line number Diff line number Diff line change
@@ -3,6 +3,7 @@ package kapacitor
import (
"fmt"
"log"
"reflect"
"sync"
"time"

@@ -13,7 +14,7 @@ import (
)

// tmpl -- go get github.com/benbjohnson/tmpl
//go:generate tmpl -data=@tmpldata influxql.gen.go.tmpl
//go:generate tmpl -data=@tmpldata.json influxql.gen.go.tmpl

type createReduceContextFunc func(c baseReduceContext) reduceContext

@@ -82,6 +83,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
}
n.statMap.Set(statCardinalityGauge, expvar.NewIntFuncGauge(valueF))

var kind reflect.Kind
for p, ok := n.ins[0].NextPoint(); ok; {
n.timer.Start()
mu.RLock()
@@ -101,7 +103,11 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
pointTimes: n.n.PointTimes || n.isStreamTransformation,
}

createFn, err := n.getCreateFn(p.Fields[c.field])
k := reflect.TypeOf(p.Fields[c.field]).Kind()
kindChanged := k != kind
kind = k

createFn, err := n.getCreateFn(kindChanged, kind)
if err != nil {
return err
}
@@ -155,7 +161,8 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
}

func (n *InfluxQLNode) runBatchInfluxQL() error {
var exampleValue interface{}
var kind reflect.Kind
kindChanged := true
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
// Create new base context
@@ -175,14 +182,16 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
n.timer.Stop()
continue
}
if exampleValue == nil {
if kind == reflect.Invalid {
// If we have no points and have never seen a point assume float64
exampleValue = float64(0)
kind = reflect.Float64
}
} else {
exampleValue = b.Points[0].Fields[c.field]
k := reflect.TypeOf(b.Points[0].Fields[c.field]).Kind()
kindChanged = k != kind
kind = k
}
createFn, err := n.getCreateFn(exampleValue)
createFn, err := n.getCreateFn(kindChanged, kind)
if err != nil {
return err
}
@@ -241,11 +250,11 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
return nil
}

func (n *InfluxQLNode) getCreateFn(value interface{}) (createReduceContextFunc, error) {
if n.createFn != nil {
func (n *InfluxQLNode) getCreateFn(changed bool, kind reflect.Kind) (createReduceContextFunc, error) {
if !changed && n.createFn != nil {
return n.createFn, nil
}
createFn, err := determineReduceContextCreateFn(n.n.Method, value, n.n.ReduceCreater)
createFn, err := determineReduceContextCreateFn(n.n.Method, kind, n.n.ReduceCreater)
if err != nil {
return nil, errors.Wrapf(err, "invalid influxql func %s with field %s", n.n.Method, n.n.Field)
}
21 changes: 21 additions & 0 deletions integrations/data/TestStream_Aggregate_Changing_Type.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
dbname
rpname
m c=false,value=0i 0000000000
dbname
rpname
m c=false,value=0i 0000000002
dbname
rpname
m c=false,value=0i 0000000008
dbname
rpname
m c=false,value=0i 0000000010
dbname
rpname
m c=true,value=0i 0000000011
dbname
rpname
m c=false,value=0i 0000000012
dbname
rpname
m c=false,value=0i 0000000021
38 changes: 38 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
@@ -701,6 +701,7 @@ stream

testStreamerWithOutput(t, "TestStream_Window_Count", script, 2*time.Second, er, false, nil)
}

func TestStream_Window_Count_Overlapping(t *testing.T) {

var script = `
@@ -1323,6 +1324,43 @@ stream
testStreamerWithOutput(t, "TestStream_Window_FillPeriod_Aligned", script, 21*time.Second, er, false, nil)
}

func TestStream_Aggregate_Changing_Type(t *testing.T) {

var script = `
var period = 10s
var every = 10s
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('m')
|window()
.period(period)
.every(every)
|where(lambda: "c")
|count('value')
|httpOut('TestStream_Aggregate_Changing_Type')
`

er := models.Result{
Series: models.Rows{
{
Name: "m",
Tags: nil,
Columns: []string{"time", "count"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
1.0,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_Aggregate_Changing_Type", script, 25*time.Second, er, false, nil)
}

func TestStream_Shift(t *testing.T) {

var script = `
2 changes: 1 addition & 1 deletion pipeline/influxql.go
Original file line number Diff line number Diff line change
@@ -7,7 +7,7 @@ import (
)

// tmpl -- go get github.com/benbjohnson/tmpl
//go:generate tmpl -data=@../tmpldata influxql.gen.go.tmpl
//go:generate tmpl -data=@../tmpldata.json influxql.gen.go.tmpl

// An InfluxQLNode performs the available function from the InfluxQL language.
// These function can be performed on a stream or batch edge.
5 changes: 4 additions & 1 deletion tmpldata → tmpldata.json
Original file line number Diff line number Diff line change
@@ -3,29 +3,32 @@
"Name":"Float",
"name":"float",
"Type":"float64",
"Kind":"reflect.Float64",
"Nil":"0",
"Zero":"float64(0)"
},
{
"Name":"Integer",
"name":"integer",
"Type":"int64",
"Kind":"reflect.Int64",
"Nil":"0",
"Zero":"int64(0)"
},
{
"Name":"String",
"name":"string",
"Type":"string",
"Kind":"reflect.String",
"Nil":"\"\"",
"Zero":"\"\""
},
{
"Name":"Boolean",
"name":"boolean",
"Type":"bool",
"Kind":"reflect.Bool",
"Nil":"false",
"Zero":"false"
}
]

0 comments on commit 7dfb18d

Please sign in to comment.