Skip to content

Commit

Permalink
Preserving tags on influxql simple selectors - first, last, max, min,…
Browse files Browse the repository at this point in the history
… percentile
  • Loading branch information
yosiat committed Apr 14, 2016
1 parent 58dd5db commit 4cd6a37
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ Example UDF config for a socket based UDF.
- [#429](https://github.com/influxdata/kapacitor/issues/429): BREAKING: Change TICKscript parser to be left-associative on equal precedence operators. For example previously this statement `(1+2-3*4/5)` was evaluated as `(1+(2-(3*(4/5))))`
which is not the typical/expected behavior. Now using left-associative parsing the statement is evaluated as `((1+2)-((3*4)/5))`.
- [#456](https://github.com/influxdata/kapacitor/pull/456): Fixes Alerta integration to let server set status, fix `rawData` attribute and set default severity to `indeterminate`.
- [#425](https://github.com/influxdata/kapacitor/pull/425): BREAKING: Preserving tags on influxql simple selectors - first, last, max, min, percentile

## v0.12.0 [2016-04-04]

Expand Down
152 changes: 112 additions & 40 deletions influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

type floatPointAggregator struct {
field string
topBottomInfo *pipeline.TopBottomCallInfo
aggregator influxql.FloatPointAggregator
field string
topBottomInfo *pipeline.TopBottomCallInfo
isSimpleSelector bool
aggregator influxql.FloatPointAggregator
}

func floatPopulateAuxFieldsAndTags(ap *influxql.FloatPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) {
Expand All @@ -44,6 +45,11 @@ func (a *floatPointAggregator) AggregateBatch(b *models.Batch) {
// We need to populate the Aux fields
floatPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
ap.Aux = []interface{}{p.Tags}
}

a.aggregator.AggregateFloat(ap)
}
}
Expand All @@ -59,13 +65,19 @@ func (a *floatPointAggregator) AggregatePoint(p *models.Point) {
// We need to populate the Aux fields
floatPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
ap.Aux = []interface{}{p.Tags}
}

a.aggregator.AggregateFloat(ap)
}

type floatPointBulkAggregator struct {
field string
topBottomInfo *pipeline.TopBottomCallInfo
aggregator pipeline.FloatBulkPointAggregator
field string
topBottomInfo *pipeline.TopBottomCallInfo
isSimpleSelector bool
aggregator pipeline.FloatBulkPointAggregator
}

func (a *floatPointBulkAggregator) AggregateBatch(b *models.Batch) {
Expand All @@ -81,6 +93,10 @@ func (a *floatPointBulkAggregator) AggregateBatch(b *models.Batch) {
// We need to populate the Aux fields
floatPopulateAuxFieldsAndTags(&slice[i], a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
slice[i].Aux = []interface{}{p.Tags}
}
}
a.aggregator.AggregateFloatBulk(slice)
}
Expand All @@ -96,12 +112,18 @@ func (a *floatPointBulkAggregator) AggregatePoint(p *models.Point) {
// We need to populate the Aux fields
floatPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
ap.Aux = []interface{}{p.Tags}
}

a.aggregator.AggregateFloat(ap)
}

type floatPointEmitter struct {
baseReduceContext
emitter influxql.FloatPointEmitter
emitter influxql.FloatPointEmitter
isSimpleSelector bool
}

func (e *floatPointEmitter) EmitPoint() (models.Point, error) {
Expand All @@ -120,12 +142,18 @@ func (e *floatPointEmitter) EmitPoint() (models.Point, error) {
} else {
t = e.time
}

tags := e.tags
if e.isSimpleSelector {
tags = ap.Aux[0].(models.Tags)
}

return models.Point{
Name: e.name,
Time: t,
Group: e.group,
Dimensions: e.dimensions,
Tags: e.tags,
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}, nil
}
Expand Down Expand Up @@ -160,9 +188,10 @@ func (e *floatPointEmitter) EmitBatch() models.Batch {
}

type integerPointAggregator struct {
field string
topBottomInfo *pipeline.TopBottomCallInfo
aggregator influxql.IntegerPointAggregator
field string
topBottomInfo *pipeline.TopBottomCallInfo
isSimpleSelector bool
aggregator influxql.IntegerPointAggregator
}

func integerPopulateAuxFieldsAndTags(ap *influxql.IntegerPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) {
Expand All @@ -188,6 +217,11 @@ func (a *integerPointAggregator) AggregateBatch(b *models.Batch) {
// We need to populate the Aux fields
integerPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
ap.Aux = []interface{}{p.Tags}
}

a.aggregator.AggregateInteger(ap)
}
}
Expand All @@ -203,13 +237,19 @@ func (a *integerPointAggregator) AggregatePoint(p *models.Point) {
// We need to populate the Aux fields
integerPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
ap.Aux = []interface{}{p.Tags}
}

a.aggregator.AggregateInteger(ap)
}

type integerPointBulkAggregator struct {
field string
topBottomInfo *pipeline.TopBottomCallInfo
aggregator pipeline.IntegerBulkPointAggregator
field string
topBottomInfo *pipeline.TopBottomCallInfo
isSimpleSelector bool
aggregator pipeline.IntegerBulkPointAggregator
}

func (a *integerPointBulkAggregator) AggregateBatch(b *models.Batch) {
Expand All @@ -225,6 +265,10 @@ func (a *integerPointBulkAggregator) AggregateBatch(b *models.Batch) {
// We need to populate the Aux fields
integerPopulateAuxFieldsAndTags(&slice[i], a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
slice[i].Aux = []interface{}{p.Tags}
}
}
a.aggregator.AggregateIntegerBulk(slice)
}
Expand All @@ -240,12 +284,18 @@ func (a *integerPointBulkAggregator) AggregatePoint(p *models.Point) {
// We need to populate the Aux fields
integerPopulateAuxFieldsAndTags(ap, a.topBottomInfo.FieldsAndTags, p.Fields, p.Tags)
}

if a.isSimpleSelector {
ap.Aux = []interface{}{p.Tags}
}

a.aggregator.AggregateInteger(ap)
}

type integerPointEmitter struct {
baseReduceContext
emitter influxql.IntegerPointEmitter
emitter influxql.IntegerPointEmitter
isSimpleSelector bool
}

func (e *integerPointEmitter) EmitPoint() (models.Point, error) {
Expand All @@ -264,12 +314,18 @@ func (e *integerPointEmitter) EmitPoint() (models.Point, error) {
} else {
t = e.time
}

tags := e.tags
if e.isSimpleSelector {
tags = ap.Aux[0].(models.Tags)
}

return models.Point{
Name: e.name,
Time: t,
Group: e.group,
Dimensions: e.dimensions,
Tags: e.tags,
Tags: tags,
Fields: map[string]interface{}{e.as: ap.Value},
}, nil
}
Expand Down Expand Up @@ -362,13 +418,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateFloatReducer()
return &floatReduceContext{
floatPointAggregator: floatPointAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
floatPointEmitter: floatPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -377,13 +435,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateFloatBulkReducer()
return &floatBulkReduceContext{
floatPointBulkAggregator: floatPointBulkAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
floatPointEmitter: floatPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -393,13 +453,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateFloatIntegerReducer()
return &floatIntegerReduceContext{
floatPointAggregator: floatPointAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
integerPointEmitter: integerPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -408,13 +470,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateFloatBulkIntegerReducer()
return &floatBulkIntegerReduceContext{
floatPointBulkAggregator: floatPointBulkAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
integerPointEmitter: integerPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -431,13 +495,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateIntegerFloatReducer()
return &integerFloatReduceContext{
integerPointAggregator: integerPointAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
floatPointEmitter: floatPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -446,13 +512,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateIntegerBulkFloatReducer()
return &integerBulkFloatReduceContext{
integerPointBulkAggregator: integerPointBulkAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
floatPointEmitter: floatPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -462,13 +530,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateIntegerReducer()
return &integerReduceContext{
integerPointAggregator: integerPointAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
integerPointEmitter: integerPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand All @@ -477,13 +547,15 @@ func determineReduceContextCreateFn(method string, value interface{}, rc pipelin
a, e := rc.CreateIntegerBulkReducer()
return &integerBulkReduceContext{
integerPointBulkAggregator: integerPointBulkAggregator{
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
aggregator: a,
field: c.field,
topBottomInfo: rc.TopBottomCallInfo,
isSimpleSelector: rc.IsSimpleSelector,
aggregator: a,
},
integerPointEmitter: integerPointEmitter{
baseReduceContext: c,
emitter: e,
isSimpleSelector: rc.IsSimpleSelector,
},
}
}
Expand Down
Loading

0 comments on commit 4cd6a37

Please sign in to comment.