Skip to content

Commit

Permalink
add time of day expressions
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 25, 2016
1 parent cf1bc8a commit a820107
Show file tree
Hide file tree
Showing 8 changed files with 168 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ See note on a breaking change in the HTTP API below. #163
- [#72](https://github.com/influxdata/kapacitor/issues/72): Add support for User Defined Functions (UDFs).
- [#139](https://github.com/influxdata/kapacitor/issues/139): Alerta.io support thanks! @md14454
- [#85](https://github.com/influxdata/kapacitor/issues/85): Sensu support using JIT clients. Thanks @sstarcher!
- [#141](https://github.com/influxdata/kapacitor/issues/141): Time of day expressions for silencing alerts.

### Bugfixes
- [#153](https://github.com/influxdata/kapacitor/issues/153): Fix panic if referencing non existant field in MapReduce function.
Expand Down
8 changes: 4 additions & 4 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (a *AlertNode) runAlert([]byte) error {
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
l := a.determineLevel(p.Fields, p.Tags)
l := a.determineLevel(p.Time, p.Fields, p.Tags)
state := a.updateState(l, p.Group)
if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed) {
continue
Expand All @@ -243,7 +243,7 @@ func (a *AlertNode) runAlert([]byte) error {
for b, ok := a.ins[0].NextBatch(); ok; b, ok = a.ins[0].NextBatch() {
triggered := false
for _, p := range b.Points {
l := a.determineLevel(p.Fields, p.Tags)
l := a.determineLevel(p.Time, p.Fields, p.Tags)
if l > OKAlert {
triggered = true
state := a.updateState(l, b.Group)
Expand Down Expand Up @@ -281,12 +281,12 @@ func (a *AlertNode) runAlert([]byte) error {
return nil
}

func (a *AlertNode) determineLevel(fields models.Fields, tags map[string]string) (level AlertLevel) {
func (a *AlertNode) determineLevel(now time.Time, fields models.Fields, tags map[string]string) (level AlertLevel) {
for l, se := range a.levels {
if se == nil {
continue
}
if pass, err := EvalPredicate(se, fields, tags); pass {
if pass, err := EvalPredicate(se, now, fields, tags); pass {
level = AlertLevel(l)
} else if err != nil {
a.logger.Println("E! error evaluating expression:", err)
Expand Down
9 changes: 5 additions & 4 deletions eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kapacitor
import (
"errors"
"log"
"time"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
Expand Down Expand Up @@ -38,7 +39,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
fields, err := e.eval(p.Fields, p.Tags)
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
Expand All @@ -53,7 +54,7 @@ func (e *EvalNode) runEval(snapshot []byte) error {
case pipeline.BatchEdge:
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
for i, p := range b.Points {
fields, err := e.eval(p.Fields, p.Tags)
fields, err := e.eval(p.Time, p.Fields, p.Tags)
if err != nil {
return err
}
Expand All @@ -70,8 +71,8 @@ func (e *EvalNode) runEval(snapshot []byte) error {
return nil
}

func (e *EvalNode) eval(fields models.Fields, tags map[string]string) (models.Fields, error) {
vars, err := mergeFieldsAndTags(fields, tags)
func (e *EvalNode) eval(now time.Time, fields models.Fields, tags map[string]string) (models.Fields, error) {
vars, err := mergeFieldsAndTags(now, fields, tags)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@ package kapacitor

import (
"fmt"
"time"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/tick"
)

// Evaluate a given expression as a boolean predicate against a set of fields and tags
func EvalPredicate(se *tick.StatefulExpr, fields models.Fields, tags map[string]string) (bool, error) {
vars, err := mergeFieldsAndTags(fields, tags)
func EvalPredicate(se *tick.StatefulExpr, now time.Time, fields models.Fields, tags models.Tags) (bool, error) {
vars, err := mergeFieldsAndTags(now, fields, tags)
if err != nil {
return false, err
}
Expand All @@ -20,8 +21,9 @@ func EvalPredicate(se *tick.StatefulExpr, fields models.Fields, tags map[string]
return b, nil
}

func mergeFieldsAndTags(fields models.Fields, tags map[string]string) (*tick.Scope, error) {
func mergeFieldsAndTags(now time.Time, fields models.Fields, tags models.Tags) (*tick.Scope, error) {
scope := tick.NewScope()
scope.Set("time", now)
for k, v := range fields {
if _, ok := tags[k]; ok {
return nil, fmt.Errorf("cannot have field and tags with same name %q", k)
Expand Down
23 changes: 21 additions & 2 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ const intervalMarker = "INTERVAL"
//
// - Threshold -- trigger alert if throughput drops below threshold in points/interval.
// - Interval -- how often to check the throughput.
// - Expressions -- optional list of expressions to also evaluate. Useful for time of day alerting.
//
// Example:
// var data = stream.from()...
Expand Down Expand Up @@ -226,13 +227,22 @@ const intervalMarker = "INTERVAL"
// //Do normal processing of data
// data....
//
func (n *node) Deadman(threshold float64, interval time.Duration) *AlertNode {
// You can specify additional lambda expressions to further constrain when the deadman's switch is triggered.
// Example:
// var data = stream.from()...
// // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s.
// // Only trigger the alert if the time of day is between 8am-5pm.
// data.deadman(100.0, 10s, lambda: hour("time") >= 8 AND hour("time") <= 17)
// //Do normal processing of data
// data....
//
func (n *node) Deadman(threshold float64, interval time.Duration, expr ...tick.Node) *AlertNode {
dn := n.Stats(interval).
Derivative("collected").NonNegative()
dn.Unit = interval

an := dn.Alert()
an.Crit = &tick.BinaryNode{
critExpr := &tick.BinaryNode{
Operator: tick.TokenLessEqual,
Left: &tick.ReferenceNode{
Reference: "collected",
Expand All @@ -242,6 +252,15 @@ func (n *node) Deadman(threshold float64, interval time.Duration) *AlertNode {
Float64: threshold,
},
}
// Add any additional expressions
for _, e := range expr {
critExpr = &tick.BinaryNode{
Operator: tick.TokenAnd,
Left: critExpr,
Right: e,
}
}
an.Crit = critExpr
// Replace NODE_NAME with actual name of the node in the Id.
an.Id = strings.Replace(n.pipeline().deadman.Id(), nodeNameMarker, n.Name(), 1)
// Set the message on the alert node.
Expand Down
2 changes: 1 addition & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *StreamNode) matches(p models.Point) bool {
return false
}
if s.expression != nil {
if pass, err := EvalPredicate(s.expression, p.Fields, p.Tags); err != nil {
if pass, err := EvalPredicate(s.expression, p.Time, p.Fields, p.Tags); err != nil {
s.logger.Println("E! error while evaluating WHERE expression:", err)
return false
} else {
Expand Down
129 changes: 129 additions & 0 deletions tick/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math"
"strconv"
"time"
)

var ErrNotFloat = errors.New("value is not a float")
Expand Down Expand Up @@ -70,6 +71,14 @@ func init() {
statelessFuncs["y0"] = newMath1("y0", math.Y0)
statelessFuncs["y1"] = newMath1("y1", math.Y1)
statelessFuncs["yn"] = newMathIF("yn", math.Yn)

// Time functions
statelessFuncs["minute"] = &minute{}
statelessFuncs["hour"] = &hour{}
statelessFuncs["weekday"] = &weekday{}
statelessFuncs["day"] = &day{}
statelessFuncs["month"] = &month{}
statelessFuncs["year"] = &year{}
}

// Return set of built-in Funcs
Expand Down Expand Up @@ -330,3 +339,123 @@ func (s *sigma) Call(args ...interface{}) (interface{}, error) {
}
return math.Abs(x-s.mean) / math.Sqrt(s.variance), nil
}

type minute struct {
}

func (*minute) Reset() {
}

// Return the minute within the hour for the given time, within the range [0,59].
func (*minute) Call(args ...interface{}) (v interface{}, err error) {
if len(args) != 1 {
return 0, errors.New("minute expects exactly one argument")
}
switch a := args[0].(type) {
case time.Time:
v = int64(a.Minute())
default:
err = fmt.Errorf("cannot convert %T to time.Time", a)
}
return
}

type hour struct {
}

func (*hour) Reset() {
}

// Return the hour within the day for the given time, within the range [0,23].
func (*hour) Call(args ...interface{}) (v interface{}, err error) {
if len(args) != 1 {
return 0, errors.New("hour expects exactly one argument")
}
switch a := args[0].(type) {
case time.Time:
v = int64(a.Hour())
default:
err = fmt.Errorf("cannot convert %T to time.Time", a)
}
return
}

type weekday struct {
}

func (*weekday) Reset() {
}

// Return the weekday within the week for the given time, within the range [0,6] where 0 is Sunday.
func (*weekday) Call(args ...interface{}) (v interface{}, err error) {
if len(args) != 1 {
return 0, errors.New("weekday expects exactly one argument")
}
switch a := args[0].(type) {
case time.Time:
v = int64(a.Weekday())
default:
err = fmt.Errorf("cannot convert %T to time.Time", a)
}
return
}

type day struct {
}

func (*day) Reset() {
}

// Return the day within the month for the given time, within the range [1,31] depending on the month.
func (*day) Call(args ...interface{}) (v interface{}, err error) {
if len(args) != 1 {
return 0, errors.New("day expects exactly one argument")
}
switch a := args[0].(type) {
case time.Time:
v = int64(a.Day())
default:
err = fmt.Errorf("cannot convert %T to time.Time", a)
}
return
}

type month struct {
}

func (*month) Reset() {
}

// Return the month within the year for the given time, within the range [1,12].
func (*month) Call(args ...interface{}) (v interface{}, err error) {
if len(args) != 1 {
return 0, errors.New("month expects exactly one argument")
}
switch a := args[0].(type) {
case time.Time:
v = int64(a.Month())
default:
err = fmt.Errorf("cannot convert %T to time.Time", a)
}
return
}

type year struct {
}

func (*year) Reset() {
}

// Return the year for the given time.
func (*year) Call(args ...interface{}) (v interface{}, err error) {
if len(args) != 1 {
return 0, errors.New("year expects exactly one argument")
}
switch a := args[0].(type) {
case time.Time:
v = int64(a.Year())
default:
err = fmt.Errorf("cannot convert %T to time.Time", a)
}
return
}
4 changes: 2 additions & 2 deletions where.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (w *WhereNode) runWhere(snapshot []byte) error {
expr = tick.NewStatefulExpr(w.w.Expression)
w.expressions[p.Group] = expr
}
if pass, err := EvalPredicate(expr, p.Fields, p.Tags); pass {
if pass, err := EvalPredicate(expr, p.Time, p.Fields, p.Tags); pass {
for _, child := range w.outs {
err := child.CollectPoint(p)
if err != nil {
Expand All @@ -58,7 +58,7 @@ func (w *WhereNode) runWhere(snapshot []byte) error {
w.expressions[b.Group] = expr
}
for i, p := range b.Points {
if pass, err := EvalPredicate(expr, p.Fields, p.Tags); !pass {
if pass, err := EvalPredicate(expr, p.Time, p.Fields, p.Tags); !pass {
if err != nil {
w.logger.Println("E! error while evaluating WHERE expression:", err)
}
Expand Down

0 comments on commit a820107

Please sign in to comment.