Skip to content

Commit

Permalink
add duration to alert data
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 21, 2016
1 parent 0e28571 commit dec73bd
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#384](https://github.com/influxdata/kapacitor/issues/384): Add `elapsed` function to compute the time difference between subsequent points.
- [#230](https://github.com/influxdata/kapacitor/issues/230): Alert.StateChangesOnly now accepts optional duration arg. An alert will be triggered for every interval even if the state has not changed.
- [#426](https://github.com/influxdata/kapacitor/issues/426): Add `skip-format` query parameter to the `GET /task` endpoint so that returned TICKscript content is left unmodified from the user input.
- [#388](https://github.com/influxdata/kapacitor/issues/388): The duration of an alert is now tracked and exposed as part of the alert data as well as can be set as a field via `.durationField('duration')`.


### Bugfixes
Expand Down
91 changes: 64 additions & 27 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,13 @@ func (l *AlertLevel) UnmarshalText(text []byte) error {
}

type AlertData struct {
ID string `json:"id"`
Message string `json:"message"`
Details string `json:"details"`
Time time.Time `json:"time"`
Level AlertLevel `json:"level"`
Data influxql.Result `json:"data"`
ID string `json:"id"`
Message string `json:"message"`
Details string `json:"details"`
Time time.Time `json:"time"`
Duration time.Duration `json:"duration"`
Level AlertLevel `json:"level"`
Data influxql.Result `json:"data"`

// Info for custom templates
info detailsInfo
Expand Down Expand Up @@ -313,11 +314,12 @@ func (a *AlertNode) runAlert([]byte) error {
Tags: p.Tags,
Points: []models.BatchPoint{models.BatchPointFromPoint(p)},
}
ad, err := a.alertData(p.Name, p.Group, p.Tags, p.Fields, l, p.Time, batch)
state.triggered(p.Time)
duration := state.duration()
ad, err := a.alertData(p.Name, p.Group, p.Tags, p.Fields, l, p.Time, duration, batch)
if err != nil {
return err
}
state.lastAlert = p.Time
a.handleAlert(ad)
if a.a.LevelTag != "" || a.a.IdTag != "" {
p.Tags = p.Tags.Copy()
Expand All @@ -328,14 +330,17 @@ func (a *AlertNode) runAlert([]byte) error {
p.Tags[a.a.IdTag] = ad.ID
}
}
if a.a.LevelField != "" || a.a.IdField != "" {
if a.a.LevelField != "" || a.a.IdField != "" || a.a.DurationField != "" {
p.Fields = p.Fields.Copy()
if a.a.LevelField != "" {
p.Fields[a.a.LevelField] = l.String()
}
if a.a.IdField != "" {
p.Fields[a.a.IdField] = ad.ID
}
if a.a.DurationField != "" {
p.Fields[a.a.DurationField] = int64(duration)
}
}
a.timer.Pause()
for _, child := range a.outs {
Expand Down Expand Up @@ -390,17 +395,19 @@ func (a *AlertNode) runAlert([]byte) error {
(l != OKAlert &&
!((a.a.UseFlapping && state.flapping) ||
(a.a.IsStateChangesOnly && !state.changed && !state.expired))) {
ad, err := a.alertData(b.Name, b.Group, b.Tags, highestPoint.Fields, l, t, b)
state.triggered(t)
duration := state.duration()
ad, err := a.alertData(b.Name, b.Group, b.Tags, highestPoint.Fields, l, t, duration, b)
if err != nil {
return err
}
state.lastAlert = t
a.handleAlert(ad)
// Update tags or fields for Level property
if a.a.LevelTag != "" ||
a.a.LevelField != "" ||
a.a.IdTag != "" ||
a.a.IdField != "" {
a.a.IdField != "" ||
a.a.DurationField != "" {
for i := range b.Points {
if a.a.LevelTag != "" || a.a.IdTag != "" {
b.Points[i].Tags = b.Points[i].Tags.Copy()
Expand All @@ -411,14 +418,17 @@ func (a *AlertNode) runAlert([]byte) error {
b.Points[i].Tags[a.a.IdTag] = ad.ID
}
}
if a.a.LevelField != "" || a.a.IdField != "" {
if a.a.LevelField != "" || a.a.IdField != "" || a.a.DurationField != "" {
b.Points[i].Fields = b.Points[i].Fields.Copy()
if a.a.LevelField != "" {
b.Points[i].Fields[a.a.LevelField] = l.String()
}
if a.a.IdField != "" {
b.Points[i].Fields[a.a.IdField] = ad.ID
}
if a.a.DurationField != "" {
b.Points[i].Fields[a.a.DurationField] = int64(duration)
}
}
}
if a.a.LevelTag != "" || a.a.IdTag != "" {
Expand Down Expand Up @@ -485,6 +495,7 @@ func (a *AlertNode) alertData(
fields models.Fields,
level AlertLevel,
t time.Time,
d time.Duration,
b models.Batch,
) (*AlertData, error) {
id, err := a.renderID(name, group, tags)
Expand All @@ -496,32 +507,58 @@ func (a *AlertNode) alertData(
return nil, err
}
ad := &AlertData{
ID: id,
Message: msg,
Details: details,
Time: t,
Level: level,
Data: a.batchToResult(b),
info: info,
ID: id,
Message: msg,
Details: details,
Time: t,
Duration: d,
Level: level,
Data: a.batchToResult(b),
info: info,
}
return ad, nil
}

type alertState struct {
history []AlertLevel
idx int
flapping bool
changed bool
lastAlert time.Time
expired bool
history []AlertLevel
idx int
flapping bool
changed bool
// Time when first alert was triggered
firstTriggered time.Time
// Time when last alert was triggered.
// Note: Alerts are not triggered for every event.
lastTriggered time.Time
expired bool
}

// Return the duration of the current alert state.
func (a *alertState) duration() time.Duration {
return a.lastTriggered.Sub(a.firstTriggered)
}

// Record that the alert was triggered at time t.
func (a *alertState) triggered(t time.Time) {
a.lastTriggered = t
// Check if we are being triggered for first time since an OKAlert
// If so reset firstTriggered time
p := a.idx - 1
if p == -1 {
p = len(a.history) - 1
}
if a.history[p] == OKAlert {
a.firstTriggered = t
}
}

// Record an event in the alert history.
func (a *alertState) addEvent(level AlertLevel) {
a.changed = a.history[a.idx] != level
a.idx = (a.idx + 1) % len(a.history)
a.history[a.idx] = level
}

// Compute the percentage change in the alert history.
func (a *alertState) percentChange() float64 {
l := len(a.history)
changes := 0.0
Expand Down Expand Up @@ -564,7 +601,7 @@ func (a *AlertNode) updateState(t time.Time, level AlertLevel, group models.Grou
state.flapping = true
}
}
state.expired = !state.changed && a.a.StateChangesOnlyDuration != 0 && t.Sub(state.lastAlert) >= a.a.StateChangesOnlyDuration
state.expired = !state.changed && a.a.StateChangesOnlyDuration != 0 && t.Sub(state.lastTriggered) >= a.a.StateChangesOnlyDuration
return state
}

Expand Down
108 changes: 84 additions & 24 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,64 @@ batch
testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_AlertDuration(t *testing.T) {

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|alert()
.crit(lambda:"mean" > 95)
.durationField('duration')
|httpOut('TestBatch_SimpleMR')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "duration", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
float64(14 * time.Second),
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
float64(14 * time.Second),
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
float64(14 * time.Second),
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
float64(14 * time.Second),
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
float64(14 * time.Second),
90.99999999998545,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_AlertStateChangesOnly(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -542,14 +600,15 @@ func TestBatch_AlertStateChangesOnly(t *testing.T) {
}
} else {
expAd := kapacitor.AlertData{
ID: "cpu_usage_idle:cpu=cpu-total,",
Message: "cpu_usage_idle:cpu=cpu-total, is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Level: kapacitor.OKAlert,
ID: "cpu_usage_idle:cpu=cpu-total,",
Message: "cpu_usage_idle:cpu=cpu-total, is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Duration: 38 * time.Second,
Level: kapacitor.OKAlert,
}
ad.Data = influxql.Result{}
if eq, msg := compareAlertData(expAd, ad); !eq {
t.Error(msg)
t.Errorf("unexpected alert data for request: %d %s", rc, msg)
}
}
}))
Expand Down Expand Up @@ -592,30 +651,31 @@ func TestBatch_AlertStateChangesOnlyExpired(t *testing.T) {
if err != nil {
t.Fatal(err)
}
// We don't care about the data for this test
ad.Data = influxql.Result{}
var expAd kapacitor.AlertData
atomic.AddInt32(&requestCount, 1)
if rc := atomic.LoadInt32(&requestCount); rc < 3 {
expAd := kapacitor.AlertData{
ID: "cpu_usage_idle:cpu=cpu-total,",
Message: "cpu_usage_idle:cpu=cpu-total, is CRITICAL",
Time: time.Date(1971, 1, 1, 0, 0, int(rc-1)*20, 0, time.UTC),
Level: kapacitor.CritAlert,
}
ad.Data = influxql.Result{}
if eq, msg := compareAlertData(expAd, ad); !eq {
t.Error(msg)
rc := atomic.LoadInt32(&requestCount)
if rc < 3 {
expAd = kapacitor.AlertData{
ID: "cpu_usage_idle:cpu=cpu-total,",
Message: "cpu_usage_idle:cpu=cpu-total, is CRITICAL",
Time: time.Date(1971, 1, 1, 0, 0, int(rc-1)*20, 0, time.UTC),
Duration: time.Duration(rc-1) * 20 * time.Second,
Level: kapacitor.CritAlert,
}
} else {
expAd := kapacitor.AlertData{
ID: "cpu_usage_idle:cpu=cpu-total,",
Message: "cpu_usage_idle:cpu=cpu-total, is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Level: kapacitor.OKAlert,
}
ad.Data = influxql.Result{}
if eq, msg := compareAlertData(expAd, ad); !eq {
t.Error(msg)
expAd = kapacitor.AlertData{
ID: "cpu_usage_idle:cpu=cpu-total,",
Message: "cpu_usage_idle:cpu=cpu-total, is OK",
Time: time.Date(1971, 1, 1, 0, 0, 38, 0, time.UTC),
Duration: 38 * time.Second,
Level: kapacitor.OKAlert,
}
}
if eq, msg := compareAlertData(expAd, ad); !eq {
t.Errorf("unexpected alert data for request: %d %s", rc, msg)
}
}))
defer ts.Close()
var script = `
Expand Down
36 changes: 36 additions & 0 deletions integrations/data/TestStream_AlertDuration.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
dbname
rpname
cpu,type=idle,host=serverA value=9 0000000001
dbname
rpname
cpu,type=idle,host=serverA value=9 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=6 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000006
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000007
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=3 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=5 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=7 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=7 0000000012
Loading

0 comments on commit dec73bd

Please sign in to comment.