Skip to content

Commit

Permalink
fix: return an error when aggregating a point fails
Browse files Browse the repository at this point in the history
When the `AggregatePoint` method failed to convert the point to the
proper type, an error would happen and the underlying influxql code
would never receive the point within the aggregator.

But, the batch size would be incremented because the error was swallowed
instead of being reported back to the task node. This meant the task
believed that at least one point had been aggregated and it would
attempt to emit a point from aggregators that could not emit without at
least one point.

This change fixes the task so that it reports the error when it happens
instead of swallowing the error. This way, the error doesn't hide itself
with little evidence.

This has also been changed so the batch size is only incremented when an
error doesn't happen. The code had a path flow of incrementing the batch
size before it knew if an error happened when aggregating or not.
  • Loading branch information
jsternberg committed May 14, 2019
1 parent 72e88b9 commit d4c1792
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 6 deletions.
8 changes: 4 additions & 4 deletions influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func floatPopulateAuxFieldsAndTags(ap *influxql.FloatPoint, fieldsAndTags []stri
func (a *floatPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTimeGetter) error {
ap, err := convertFloatPoint(name, p, a.field, a.isSimpleSelector, a.topBottomInfo)
if err != nil {
return nil
return err
}
a.aggregator.AggregateFloat(ap)
return nil
Expand Down Expand Up @@ -229,7 +229,7 @@ func integerPopulateAuxFieldsAndTags(ap *influxql.IntegerPoint, fieldsAndTags []
func (a *integerPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTimeGetter) error {
ap, err := convertIntegerPoint(name, p, a.field, a.isSimpleSelector, a.topBottomInfo)
if err != nil {
return nil
return err
}
a.aggregator.AggregateInteger(ap)
return nil
Expand Down Expand Up @@ -387,7 +387,7 @@ func stringPopulateAuxFieldsAndTags(ap *influxql.StringPoint, fieldsAndTags []st
func (a *stringPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTimeGetter) error {
ap, err := convertStringPoint(name, p, a.field, a.isSimpleSelector, a.topBottomInfo)
if err != nil {
return nil
return err
}
a.aggregator.AggregateString(ap)
return nil
Expand Down Expand Up @@ -545,7 +545,7 @@ func booleanPopulateAuxFieldsAndTags(ap *influxql.BooleanPoint, fieldsAndTags []
func (a *booleanPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTimeGetter) error {
ap, err := convertBooleanPoint(name, p, a.field, a.isSimpleSelector, a.topBottomInfo)
if err != nil {
return nil
return err
}
a.aggregator.AggregateBoolean(ap)
return nil
Expand Down
2 changes: 1 addition & 1 deletion influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func {{.name}}PopulateAuxFieldsAndTags(ap *influxql.{{.Name}}Point, fieldsAndTag
func (a *{{.name}}PointAggregator) AggregatePoint(name string, p edge.FieldsTagsTimeGetter) error {
ap, err := convert{{.Name}}Point(name, p, a.field, a.isSimpleSelector, a.topBottomInfo)
if err != nil {
return nil
return err
}
a.aggregator.Aggregate{{.Name}}(ap)
return nil
Expand Down
2 changes: 1 addition & 1 deletion influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ func (g *influxqlGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, err
return nil, nil
}
}
g.batchSize++
if err := g.rc.AggregatePoint(g.begin.Name(), bp); err != nil {
g.n.diag.Error("failed to aggregate point in batch", err)
}
g.batchSize++
return nil, nil
}

Expand Down

0 comments on commit d4c1792

Please sign in to comment.