Skip to content

Commit

Permalink
Support float,integer,string and boolean (influxdata#524)
Browse files Browse the repository at this point in the history
* support float,integer,string and boolean

* add functions for converting to bool and strings

* convert number to booleans explicitly as 0 or 1 nothing else

* support str concat
  • Loading branch information
Nathaniel Cook committed May 18, 2016
1 parent ca64974 commit 91238df
Show file tree
Hide file tree
Showing 19 changed files with 2,431 additions and 135 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ kapacitor replay-live query -task cpu_alert -query 'SELECT usage_idle FROM teleg
### Features

- [#283](https://github.com/influxdata/kapacitor/issues/283): Add live replays.
- [#500](https://github.com/influxdata/kapacitor/issues/500): Support Float,Integer,String and Boolean types.
- [#82](https://github.com/influxdata/kapacitor/issues/82): Multiple services for PagerDuty alert
- [#558](https://github.com/influxdata/kapacitor/pull/558): Preserve fields as well as tags on selector InfluxQL functions.

Expand Down
83 changes: 45 additions & 38 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,19 @@ func (e *EvalNode) runEval(snapshot []byte) error {
e.statMap.Set(statsEvalErrors, e.evalErrors)
switch e.Provides() {
case pipeline.StreamEdge:
var err error
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
p.Fields = e.eval(p.Time, p.Fields, p.Tags)
p.Fields, err = e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
e.timer.Stop()
// Skip bad point
continue
}
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectPoint(p)
Expand All @@ -67,10 +77,22 @@ func (e *EvalNode) runEval(snapshot []byte) error {
}
}
case pipeline.BatchEdge:
var err error
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
for i, p := range b.Points {
b.Points[i].Fields = e.eval(p.Time, p.Fields, p.Tags)
for i := 0; i < len(b.Points); {
p := b.Points[i]
b.Points[i].Fields, err = e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
// Remove bad point
b.Points = append(b.Points[:i], b.Points[i+1:]...)
} else {
i++
}
}
e.timer.Stop()
for _, child := range e.outs {
Expand All @@ -84,69 +106,54 @@ func (e *EvalNode) runEval(snapshot []byte) error {
return nil
}

func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) models.Fields {
func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) (models.Fields, error) {
vars := e.scopePool.Get()
defer e.scopePool.Put(vars)
err := fillScope(vars, e.scopePool.ReferenceVariables(), now, fields, tags)
if err != nil {
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
return nil
return nil, err
}
for i, expr := range e.expressions {
if v, err := expr.EvalNum(vars); err == nil {
name := e.e.AsList[i]
vars.Set(name, v)
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := expr.Eval(vars)
if err != nil {
return nil, err
}
name := e.e.AsList[i]
vars.Set(name, v)
}
var newFields models.Fields
if e.e.KeepFlag {
if l := len(e.e.KeepList); l != 0 {
newFields = make(models.Fields, l)
for _, f := range e.e.KeepList {
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := vars.Get(f)
if err != nil {
return nil, err
}
newFields[f] = v
}
} else {
newFields = make(models.Fields, len(fields)+len(e.e.AsList))
for f, v := range fields {
newFields[f] = v
}
for _, f := range e.e.AsList {
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := vars.Get(f)
if err != nil {
return nil, err
}
newFields[f] = v
}
}
} else {
newFields = make(models.Fields, len(e.e.AsList))
for _, f := range e.e.AsList {
if v, err := vars.Get(f); err == nil {
newFields[f] = v
} else {
e.evalErrors.Add(1)
if !e.e.QuiteFlag {
e.logger.Println("E!", err)
}
v, err := vars.Get(f)
if err != nil {
return nil, err
}
newFields[f] = v
}
}
return newFields
return newFields, nil
}
Loading

0 comments on commit 91238df

Please sign in to comment.