Skip to content

Commit

Permalink
add alert levels
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Oct 19, 2015
1 parent 9fd27ba commit 1da6b70
Show file tree
Hide file tree
Showing 17 changed files with 482 additions and 176 deletions.
28 changes: 19 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ stream
.period(10s)
.every(5s)
.mapReduce(influxql.mean("value"))
.where("value < 30")
.alert()
.crit("value < 30")
.email("[email protected]");
```

Expand Down Expand Up @@ -182,8 +182,8 @@ stream
.every(5s)
.groupBy("dc")
.mapReduce(influxql.mean("value"))
.where("value < 30")
.alert()
.crit("value < 30")
.email("[email protected]");
```

Expand Down Expand Up @@ -244,8 +244,8 @@ batch
.period(15m)
// or .cron("*/15 * * * 0")
.groupBy(time(1h), "dc")
.where("value < 30")
.alert()
.crit("value < 30")
.email("[email protected]");
```

Expand Down Expand Up @@ -285,7 +285,9 @@ stream
.period(1m)
.every(1m)
.mapReduce(influxql.count("value"))
.alert();
.alert()
.crit("true")
.email("[email protected]");

//Now define normal processing on the stream
stream
Expand All @@ -299,7 +301,9 @@ stream
.window()
...
.alert()
.flapping(25.0, 50.0);
.crit("true")
.flapping(25.0, 50.0)
.email("[email protected]");
```
#### Aggregate alerts
Expand All @@ -317,13 +321,17 @@ var redis = stream
.from("redis")
.where("instantaneous_ops_per_sec < 10")
.groupBy("host")
.alert();
.alert()
.crit("true")
.email("[email protected]");
var cpu = stream
.from("cpu")
.where("idle < 20")
.groupBy("host")
.alert();
.alert()
.crit("true")
.email("[email protected]");
```
Now lets say we want to combine the alerts so that if a either alert triggers we can send them in the same alert.
Expand All @@ -342,7 +350,9 @@ redis.union(cpu)
.window()
.period(10s)
.every(10s)
.alert();
.alert()
.crit("true")
.email("[email protected]");
```
This will aggregate the union of all alerts every 10s by host. Then it will send out one alert with the aggregate information.
Expand Down Expand Up @@ -399,7 +409,7 @@ stream
.every(1s)
.map(fMap, "idle")
.reduce(fReduce)
.cache();
.httpOut("http://example.com/path");
```
The `mapFunction.py` and `reduceFunction.py` files contain python scripts that read data on an incoming stream perform their function and output the result.
Expand Down
11 changes: 7 additions & 4 deletions TUTORIAL.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ stream
.alert()
// We are using idle so we want to check
// if idle drops below 70% (aka cpu used > 30%)
.predicate("value < 70")
.crit("value < 70")
// Post the data for the point to a URL
.post("http://localhost:8000");
```
Expand Down Expand Up @@ -121,7 +121,7 @@ If not then lets lower the threshold so we will see some alerts.
Note the `-fast` flag tells Kapacitor to replay the data as fast as possible but it still emulates the time in the recording.
Without the `-fast` Kapacitor would replay the data in real time.

Edit the `.predicate("value < 70")` line to be `.predicate("value < 99")`.
Edit the `.crit("value < 70")` line to be `.crit("value < 99")`.
Now if your system is at least 1% busy you will get an alert.

Redefine the `task` so that Kapacitor knows about your update.
Expand Down Expand Up @@ -153,7 +153,7 @@ We want to only get alerts when things are really bad. Try this:
stream
.from("cpu_usage_idle")
.alert()
.predicate("sigma(value) > 3")
.crit("sigma(value) > 3")
.post("http://localhost:8000");
```

Expand Down Expand Up @@ -181,8 +181,11 @@ stream
.period(10s)
.every(5s)
.mapReduce(influxql.mean("value"))
.where("sigma(value) > 3")
.apply(expr("sigma", "sigma(value)"))
.alert()
.info("sigma > 2")
.warn("sigma > 2.5")
.crit("sigma > 3")
.post("http://localhost:8000");
```

Expand Down
135 changes: 118 additions & 17 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,54 @@ import (
"encoding/json"
"net/http"

"github.com/influxdb/influxdb/influxql"
imodels "github.com/influxdb/influxdb/models"
"github.com/influxdb/kapacitor/expr"
"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
)

type AlertHandler func(batch models.Batch)
type AlertHandler func(ad AlertData)

type AlertLevel int

const (
NoAlert AlertLevel = iota
InfoAlert
WarnAlert
CritAlert
)

func (l AlertLevel) String() string {
switch l {
case NoAlert:
return "noalert"
case InfoAlert:
return "INFO"
case WarnAlert:
return "WARNING"
case CritAlert:
return "CRITICAL"
default:
panic("unknown AlertLevel")
}
}

func (l AlertLevel) MarshalText() ([]byte, error) {
return []byte(l.String()), nil
}

type AlertData struct {
Level AlertLevel `json:"level"`
Data influxql.Result `json:"data"`
}

type AlertNode struct {
node
a *pipeline.AlertNode
endpoint string
handlers []AlertHandler
levels []*expr.StatefulExpr
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
Expand All @@ -33,37 +70,101 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err
if n.From != "" && len(n.ToList) != 0 {
an.handlers = append(an.handlers, an.handleEmail)
}
// Parse level expressions
an.levels = make([]*expr.StatefulExpr, CritAlert+1)
if n.Info != "" {
tree, err := expr.ParseForType(n.Info, expr.ReturnBool)
if err != nil {
return nil, err
}
an.levels[InfoAlert] = &expr.StatefulExpr{tree, expr.Functions()}
}
if n.Warn != "" {
tree, err := expr.ParseForType(n.Warn, expr.ReturnBool)
if err != nil {
return nil, err
}
an.levels[WarnAlert] = &expr.StatefulExpr{tree, expr.Functions()}
}
if n.Crit != "" {
tree, err := expr.ParseForType(n.Crit, expr.ReturnBool)
if err != nil {
return nil, err
}
an.levels[CritAlert] = &expr.StatefulExpr{tree, expr.Functions()}
}
return
}

func (a *AlertNode) runAlert() error {
switch a.Wants() {
case pipeline.StreamEdge:
for p, ok := a.ins[0].NextPoint(); ok; p, ok = a.ins[0].NextPoint() {
batch := models.Batch{
Name: p.Name,
Group: p.Group,
Tags: p.Tags,
Points: []models.TimeFields{{Time: p.Time, Fields: p.Fields}},
}
for _, h := range a.handlers {
h(batch)
l := a.determineLevel(p.Fields, p.Tags)
if l > NoAlert {
batch := models.Batch{
Name: p.Name,
Group: p.Group,
Tags: p.Tags,
Points: []models.TimeFields{{Time: p.Time, Fields: p.Fields}},
}

ad := AlertData{
l,
a.batchToResult(batch),
}
for _, h := range a.handlers {
h(ad)
}
}

}
case pipeline.BatchEdge:
for b, ok := a.ins[0].NextBatch(); ok; b, ok = a.ins[0].NextBatch() {
for _, h := range a.handlers {
h(b)
for _, p := range b.Points {
l := a.determineLevel(p.Fields, b.Tags)
if l > NoAlert {
ad := AlertData{l, a.batchToResult(b)}
for _, h := range a.handlers {
h(ad)
}
break
}
}
}
}
return nil
}

func (a *AlertNode) handlePost(batch models.Batch) {
b, err := json.Marshal(batch)
func (a *AlertNode) determineLevel(fields models.Fields, tags map[string]string) (level AlertLevel) {
for l, se := range a.levels {
if se == nil {
continue
}
if pass, err := EvalPredicate(se, fields, tags); pass {
level = AlertLevel(l)
} else if err != nil {
a.logger.Println("E! error evaluating expression:", err)
return
} else {
return
}
}
return
}

func (a *AlertNode) batchToResult(b models.Batch) influxql.Result {
row := models.BatchToRow(b)
r := influxql.Result{
Series: imodels.Rows{row},
}
return r
}

func (a *AlertNode) handlePost(ad AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal batch json", err)
a.logger.Println("E! failed to marshal alert data json", err)
return
}
buf := bytes.NewBuffer(b)
Expand All @@ -73,10 +174,10 @@ func (a *AlertNode) handlePost(batch models.Batch) {
}
}

func (a *AlertNode) handleEmail(batch models.Batch) {
b, err := json.Marshal(batch)
func (a *AlertNode) handleEmail(ad AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal batch json", err)
a.logger.Println("E! failed to marshal alert data json", err)
return
}
if a.et.tm.SMTPService != nil {
Expand Down
30 changes: 24 additions & 6 deletions expr.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ func ExprFunc(field, e string) (TransFunc, error) {
}
x := &expression{
field: field,
t: t,
se: &expr.StatefulExpr{t, expr.Functions()},
}
return TransFunc(x.Eval), nil
}

type expression struct {
field string
t *expr.Tree
se *expr.StatefulExpr
}

func (x *expression) Eval(fields models.Fields) (models.Fields, error) {
Expand All @@ -37,11 +37,29 @@ func (x *expression) Eval(fields models.Fields) (models.Fields, error) {
}
}

nfields := make(models.Fields, 1)
v, err := x.t.EvalNumber(vars, nil)
v, err := x.se.EvalNum(vars)
if err != nil {
return nil, err
}
nfields[x.field] = v
return nfields, nil
fields[x.field] = v
return fields, nil
}

// Evaluate a given expression as a boolean predicate against a set of fields and tags
func EvalPredicate(se *expr.StatefulExpr, fields models.Fields, tags map[string]string) (bool, error) {
vars := make(expr.Vars)
for k, v := range fields {
if tags[k] != "" {
return false, fmt.Errorf("cannot have field and tags with same name %q", k)
}
vars[k] = v
}
for k, v := range tags {
vars[k] = v
}
b, err := se.EvalBool(vars)
if err != nil {
return false, err
}
return b, nil
}
Loading

0 comments on commit 1da6b70

Please sign in to comment.