Skip to content

Commit

Permalink
log errors about missing or badly type fields, do not panic
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 4, 2016
1 parent aa267cf commit 3ff4696
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ For example, let's say we want to store all data that triggered an alert in Infl

### Bugfixes

- [#499](https://github.com/influxdata/kapacitor/issues/499): Fix panic in InfluxQL nodes if field is missing or incorrect type.
- [#441](https://github.com/influxdata/kapacitor/issues/441): Fix panic in UDF code.
- [#429](https://github.com/influxdata/kapacitor/issues/429): BREAKING: Change TICKscript parser to be left-associative on equal precedence operators. For example previously this statement `(1+2-3*4/5)` was evaluated as `(1+(2-(3*(4/5))))`
which is not the typical/expected behavior. Now using left-associative parsing the statement is evaluated as `((1+2)-((3*4)/5))`.
Expand Down
104 changes: 88 additions & 16 deletions influxql.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,21 @@ func floatPopulateAuxFieldsAndTags(ap *influxql.FloatPoint, fieldsAndTags []stri
}
}

func (a *floatPointAggregator) AggregateBatch(b *models.Batch) {
func (a *floatPointAggregator) AggregateBatch(b *models.Batch) error {
for _, p := range b.Points {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(float64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp float64", a.field, value)
}
ap := &influxql.FloatPoint{
Name: b.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(float64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -52,14 +60,23 @@ func (a *floatPointAggregator) AggregateBatch(b *models.Batch) {

a.aggregator.AggregateFloat(ap)
}
return nil
}

func (a *floatPointAggregator) AggregatePoint(p *models.Point) {
func (a *floatPointAggregator) AggregatePoint(p *models.Point) error {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(float64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp float64", a.field, value)
}
ap := &influxql.FloatPoint{
Name: p.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(float64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -71,6 +88,7 @@ func (a *floatPointAggregator) AggregatePoint(p *models.Point) {
}

a.aggregator.AggregateFloat(ap)
return nil
}

type floatPointBulkAggregator struct {
Expand All @@ -80,14 +98,22 @@ type floatPointBulkAggregator struct {
aggregator pipeline.FloatBulkPointAggregator
}

func (a *floatPointBulkAggregator) AggregateBatch(b *models.Batch) {
func (a *floatPointBulkAggregator) AggregateBatch(b *models.Batch) error {
slice := make([]influxql.FloatPoint, len(b.Points))
for i, p := range b.Points {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(float64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp float64", a.field, value)
}
slice[i] = influxql.FloatPoint{
Name: b.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(float64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -99,14 +125,23 @@ func (a *floatPointBulkAggregator) AggregateBatch(b *models.Batch) {
}
}
a.aggregator.AggregateFloatBulk(slice)
return nil
}

func (a *floatPointBulkAggregator) AggregatePoint(p *models.Point) {
func (a *floatPointBulkAggregator) AggregatePoint(p *models.Point) error {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(float64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp float64", a.field, value)
}
ap := &influxql.FloatPoint{
Name: p.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(float64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -118,6 +153,7 @@ func (a *floatPointBulkAggregator) AggregatePoint(p *models.Point) {
}

a.aggregator.AggregateFloat(ap)
return nil
}

type floatPointEmitter struct {
Expand Down Expand Up @@ -205,13 +241,21 @@ func integerPopulateAuxFieldsAndTags(ap *influxql.IntegerPoint, fieldsAndTags []
}
}

func (a *integerPointAggregator) AggregateBatch(b *models.Batch) {
func (a *integerPointAggregator) AggregateBatch(b *models.Batch) error {
for _, p := range b.Points {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(int64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp int64", a.field, value)
}
ap := &influxql.IntegerPoint{
Name: b.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(int64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -224,14 +268,23 @@ func (a *integerPointAggregator) AggregateBatch(b *models.Batch) {

a.aggregator.AggregateInteger(ap)
}
return nil
}

func (a *integerPointAggregator) AggregatePoint(p *models.Point) {
func (a *integerPointAggregator) AggregatePoint(p *models.Point) error {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(int64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp int64", a.field, value)
}
ap := &influxql.IntegerPoint{
Name: p.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(int64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -243,6 +296,7 @@ func (a *integerPointAggregator) AggregatePoint(p *models.Point) {
}

a.aggregator.AggregateInteger(ap)
return nil
}

type integerPointBulkAggregator struct {
Expand All @@ -252,14 +306,22 @@ type integerPointBulkAggregator struct {
aggregator pipeline.IntegerBulkPointAggregator
}

func (a *integerPointBulkAggregator) AggregateBatch(b *models.Batch) {
func (a *integerPointBulkAggregator) AggregateBatch(b *models.Batch) error {
slice := make([]influxql.IntegerPoint, len(b.Points))
for i, p := range b.Points {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(int64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp int64", a.field, value)
}
slice[i] = influxql.IntegerPoint{
Name: b.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(int64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -271,14 +333,23 @@ func (a *integerPointBulkAggregator) AggregateBatch(b *models.Batch) {
}
}
a.aggregator.AggregateIntegerBulk(slice)
return nil
}

func (a *integerPointBulkAggregator) AggregatePoint(p *models.Point) {
func (a *integerPointBulkAggregator) AggregatePoint(p *models.Point) error {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.(int64)
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp int64", a.field, value)
}
ap := &influxql.IntegerPoint{
Name: p.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].(int64),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -290,6 +361,7 @@ func (a *integerPointBulkAggregator) AggregatePoint(p *models.Point) {
}

a.aggregator.AggregateInteger(ap)
return nil
}

type integerPointEmitter struct {
Expand Down
52 changes: 44 additions & 8 deletions influxql.gen.go.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,21 @@ func {{.name}}PopulateAuxFieldsAndTags(ap *influxql.{{.Name}}Point, fieldsAndTag
}
}

func (a *{{.name}}PointAggregator) AggregateBatch(b *models.Batch) {
func (a *{{.name}}PointAggregator) AggregateBatch(b *models.Batch) error {
for _, p := range b.Points {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.({{.Type}})
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp {{.Type}}", a.field, value)
}
ap := &influxql.{{.Name}}Point{
Name: b.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].({{.Type}}),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -50,14 +58,23 @@ func (a *{{.name}}PointAggregator) AggregateBatch(b *models.Batch) {

a.aggregator.Aggregate{{.Name}}(ap)
}
return nil
}

func (a *{{.name}}PointAggregator) AggregatePoint(p *models.Point) {
func (a *{{.name}}PointAggregator) AggregatePoint(p *models.Point) error {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.({{.Type}})
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp {{.Type}}", a.field, value)
}
ap := &influxql.{{.Name}}Point{
Name: p.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].({{.Type}}),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -69,6 +86,7 @@ func (a *{{.name}}PointAggregator) AggregatePoint(p *models.Point) {
}

a.aggregator.Aggregate{{.Name}}(ap)
return nil
}


Expand All @@ -80,14 +98,22 @@ type {{.name}}PointBulkAggregator struct {
aggregator pipeline.{{.Name}}BulkPointAggregator
}

func (a *{{.name}}PointBulkAggregator) AggregateBatch(b *models.Batch) {
func (a *{{.name}}PointBulkAggregator) AggregateBatch(b *models.Batch) error {
slice := make([]influxql.{{.Name}}Point, len(b.Points))
for i, p := range b.Points {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.({{.Type}})
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp {{.Type}}", a.field, value)
}
slice[i] = influxql.{{.Name}}Point{
Name: b.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].({{.Type}}),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -99,14 +125,23 @@ func (a *{{.name}}PointBulkAggregator) AggregateBatch(b *models.Batch) {
}
}
a.aggregator.Aggregate{{.Name}}Bulk(slice)
return nil
}

func (a *{{.name}}PointBulkAggregator) AggregatePoint(p *models.Point) {
func (a *{{.name}}PointBulkAggregator) AggregatePoint(p *models.Point) error {
value, ok := p.Fields[a.field]
if !ok {
return fmt.Errorf("field %s missing from point cannot aggregate", a.field)
}
typed, ok := value.({{.Type}})
if !ok {
return fmt.Errorf("field %s has wrong type: got %T exp {{.Type}}", a.field, value)
}
ap := &influxql.{{.Name}}Point{
Name: p.Name,
Tags: influxql.NewTags(p.Tags),
Time: p.Time.UnixNano(),
Value: p.Fields[a.field].({{.Type}}),
Value: typed,
}
if a.topBottomInfo != nil {
// We need to populate the Aux fields
Expand All @@ -118,6 +153,7 @@ func (a *{{.name}}PointBulkAggregator) AggregatePoint(p *models.Point) {
}

a.aggregator.Aggregate{{.Name}}(ap)
return nil
}

type {{.name}}PointEmitter struct {
Expand Down
Loading

0 comments on commit 3ff4696

Please sign in to comment.