forked from influxdata/kapacitor
-
Notifications
You must be signed in to change notification settings - Fork 0
/
alert.go
148 lines (130 loc) · 2.97 KB
/
alert.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package kapacitor
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"math"
"net/http"
"github.com/influxdb/kapacitor/expr"
"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
)
type AlertHandler func(pts []*models.Point)
type AlertNode struct {
node
a *pipeline.AlertNode
endpoint string
handlers []AlertHandler
predicate *expr.Tree
funcs expr.Funcs
fs []exprFunc
}
type exprFunc interface {
name() string
fnc() expr.Func
}
// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err error) {
an = &AlertNode{
node: node{Node: n, et: et},
a: n,
}
an.node.runF = an.runAlert
// Construct alert handlers
an.handlers = make([]AlertHandler, 0)
if n.Post != "" {
an.handlers = append(an.handlers, an.handlePost)
}
// Parse predicate
an.predicate, err = expr.Parse(n.Predicate)
if err != nil {
return nil, err
}
if an.predicate.RType() != expr.ReturnBool {
return nil, fmt.Errorf("Predicate does not evaluate to boolean value %q", n.Predicate)
}
// Initialize functions for the predicate
an.fs = append(an.fs, &sigma{})
an.funcs = make(expr.Funcs)
for _, f := range an.fs {
an.funcs[f.name()] = f.fnc()
}
return
}
func (a *AlertNode) runAlert() error {
switch a.Wants() {
case pipeline.StreamEdge:
for p := a.ins[0].NextPoint(); p != nil; p = a.ins[0].NextPoint() {
if c, err := a.check(p); err != nil {
return err
} else if c {
for _, h := range a.handlers {
h([]*models.Point{p})
}
}
}
case pipeline.BatchEdge:
for w := a.ins[0].NextBatch(); w != nil; w = a.ins[0].NextBatch() {
for _, p := range w {
if c, err := a.check(p); err != nil {
return err
} else if c {
for _, h := range a.handlers {
h(w)
}
break
}
}
}
}
return nil
}
func (a *AlertNode) check(p *models.Point) (bool, error) {
vars := make(expr.Vars)
for k, v := range p.Fields {
if f, ok := v.(float64); ok {
vars[k] = f
} else {
return false, fmt.Errorf("field values must be float64")
}
}
b, err := a.predicate.EvalBool(vars, a.funcs)
return b, err
}
func (a *AlertNode) handlePost(pts []*models.Point) {
b, err := json.Marshal(pts)
if err != nil {
a.l.Println("failed to marshal points json")
return
}
buf := bytes.NewBuffer(b)
http.Post(a.a.Post, "application/json", buf)
}
type sigma struct {
mean float64
variance float64
m2 float64
n float64
}
func (s *sigma) name() string {
return "sigma"
}
func (s *sigma) fnc() expr.Func {
return s.call
}
func (s *sigma) call(args ...float64) (float64, error) {
if len(args) != 1 {
return 0, errors.New("sigma expected exactly one argument")
}
x := args[0]
s.n++
delta := x - s.mean
s.mean = s.mean + delta/s.n
s.m2 = s.m2 + delta*(x-s.mean)
s.variance = s.m2 / (s.n - 1)
if s.n < 2 {
return 0, nil
}
return math.Abs(x-s.mean) / math.Sqrt(s.variance), nil
}