Skip to content

Commit

Permalink
Alert data outputs to children nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 14, 2016
1 parent c043293 commit 552a49a
Show file tree
Hide file tree
Showing 12 changed files with 1,069 additions and 138 deletions.
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,35 @@ Example UDF config for a socket based UDF.
timeout = "10s"
```

Alert data can now be consumed directly from within TICKscripts.
For example, let's say we want to store all data that triggered an alert in InfluxDB with a tag `level` containing the level string value (i.e CRITICAL).

```javascript
...
|alert()
.warn(...)
.crit(...)
.levelTag('level')
// and/or use a field
//.levelField('level')
// Also tag the data with the alert ID
.idTag('id')
// and/or use a field
//.idField('id')
|influxDBOut()
.database('alerts')
...
```


### Features

- [#360](https://github.com/influxdata/kapacitor/pull/360): Forking tasks by measurement in order to improve performance
- [#386](https://github.com/influxdata/kapacitor/issues/386): Adds official Go HTTP client package.
- [#399](https://github.com/influxdata/kapacitor/issues/399): Allow disabling of subscriptions.
- [#417](https://github.com/influxdata/kapacitor/issues/417): UDFs can be connected over a Unix socket. This enables UDFs from across Docker containers.
- [#451](https://github.com/influxdata/kapacitor/issues/451): StreamNode supports `|groupBy` and `|where` methods.
- [#93](https://github.com/influxdata/kapacitor/issues/93): AlertNode now outputs data to child nodes. The output data can have either a tag or field indicating the alert level.

### Bugfixes

Expand Down
86 changes: 83 additions & 3 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,32 +318,66 @@ func (a *AlertNode) runAlert([]byte) error {
return err
}
a.handleAlert(ad)
if a.a.LevelTag != "" || a.a.IdTag != "" {
p.Tags = p.Tags.Copy()
if a.a.LevelTag != "" {
p.Tags[a.a.LevelTag] = l.String()
}
if a.a.IdTag != "" {
p.Tags[a.a.IdTag] = ad.ID
}
}
if a.a.LevelField != "" || a.a.IdField != "" {
p.Fields = p.Fields.Copy()
if a.a.LevelField != "" {
p.Fields[a.a.LevelField] = l.String()
}
if a.a.IdField != "" {
p.Fields[a.a.IdField] = ad.ID
}
}
a.timer.Pause()
for _, child := range a.outs {
err := child.CollectPoint(p)
if err != nil {
return err
}
}
a.timer.Resume()
}
a.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := a.ins[0].NextBatch(); ok; b, ok = a.ins[0].NextBatch() {
a.timer.Start()
allOK := true
triggered := false
var l AlertLevel
var id string
for _, p := range b.Points {
l := a.determineLevel(p.Time, p.Fields, p.Tags)
l = a.determineLevel(p.Time, p.Fields, p.Tags)
if l > OKAlert {
triggered = true
allOK = false
state := a.updateState(l, b.Group)
if (a.a.UseFlapping && state.flapping) || (a.a.IsStateChangesOnly && !state.changed) {
break
}
triggered = true
ad, err := a.alertData(b.Name, b.Group, b.Tags, p.Fields, l, p.Time, b)
if err != nil {
return err
}
id = ad.ID
a.handleAlert(ad)
break
}
}
if !triggered {
// Check for recovery
if allOK {
l = OKAlert
state := a.updateState(OKAlert, b.Group)
if state.changed {
triggered = true
var fields models.Fields
if l := len(b.Points); l > 0 {
fields = b.Points[l-1].Fields
Expand All @@ -352,9 +386,55 @@ func (a *AlertNode) runAlert([]byte) error {
if err != nil {
return err
}
id = ad.ID
a.handleAlert(ad)
}
}
if triggered {
// Update tags or fields for Level property
if a.a.LevelTag != "" ||
a.a.LevelField != "" ||
a.a.IdTag != "" ||
a.a.IdField != "" {
for i := range b.Points {
if a.a.LevelTag != "" || a.a.IdTag != "" {
b.Points[i].Tags = b.Points[i].Tags.Copy()
if a.a.LevelTag != "" {
b.Points[i].Tags[a.a.LevelTag] = l.String()
}
if a.a.IdTag != "" {
b.Points[i].Tags[a.a.IdTag] = id
}
}
if a.a.LevelField != "" || a.a.IdField != "" {
b.Points[i].Fields = b.Points[i].Fields.Copy()
if a.a.LevelField != "" {
b.Points[i].Fields[a.a.LevelField] = l.String()
}
if a.a.IdField != "" {
b.Points[i].Fields[a.a.IdField] = id
}
}
}
if a.a.LevelTag != "" || a.a.IdTag != "" {
b.Tags = b.Tags.Copy()
if a.a.LevelTag != "" {
b.Tags[a.a.LevelTag] = l.String()
}
if a.a.IdTag != "" {
b.Tags[a.a.IdTag] = id
}
}
}
a.timer.Pause()
for _, child := range a.outs {
err := child.CollectBatch(b)
if err != nil {
return err
}
}
a.timer.Resume()
}
a.timer.Stop()
}
}
Expand Down
117 changes: 117 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,123 @@ batch

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}
func TestBatch_AlertLevelField(t *testing.T) {

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|alert()
.crit(lambda:"mean" > 95)
.levelField('level')
.idField('id')
|httpOut('TestBatch_SimpleMR')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1"},
Columns: []string{"time", "id", "level", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
"cpu_usage_idle:cpu=cpu1,",
"CRITICAL",
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
"cpu_usage_idle:cpu=cpu1,",
"CRITICAL",
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
"cpu_usage_idle:cpu=cpu1,",
"CRITICAL",
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
"cpu_usage_idle:cpu=cpu1,",
"CRITICAL",
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
"cpu_usage_idle:cpu=cpu1,",
"CRITICAL",
90.99999999998545,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_AlertLevelTag(t *testing.T) {

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" != 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
|alert()
.crit(lambda:"mean" > 95)
.levelTag('level')
.idTag('id')
|httpOut('TestBatch_SimpleMR')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"cpu": "cpu1", "level": "CRITICAL", "id": "cpu_usage_idle:cpu=cpu1,"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 20, 0, time.UTC),
96.49999999996908,
},
{
time.Date(1971, 1, 1, 0, 0, 22, 0, time.UTC),
93.46464646468584,
},
{
time.Date(1971, 1, 1, 0, 0, 24, 0, time.UTC),
95.00950095007724,
},
{
time.Date(1971, 1, 1, 0, 0, 26, 0, time.UTC),
92.99999999998636,
},
{
time.Date(1971, 1, 1, 0, 0, 28, 0, time.UTC),
90.99999999998545,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_Join(t *testing.T) {

Expand Down
24 changes: 23 additions & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2184,13 +2184,35 @@ stream
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.details('details')
.idField('id')
.idTag('id')
.levelField('level')
.levelTag('level')
.info(lambda: "count" > infoThreshold)
.warn(lambda: "count" > warnThreshold)
.crit(lambda: "count" > critThreshold)
.post('` + ts.URL + `')
|log()
|httpOut('TestStream_Alert')
`

testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second)
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA", "level": "CRITICAL", "id": "kapacitor/cpu/serverA"},
Columns: []string{"time", "count", "id", "level"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
10.0,
"kapacitor/cpu/serverA",
"CRITICAL",
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Alert", script, 13*time.Second, er, nil, false)

if rc := atomic.LoadInt32(&requestCount); rc != 1 {
t.Errorf("got %v exp %v", rc, 1)
Expand Down
3 changes: 3 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,9 @@ func (n *node) addChild(c Node) (*Edge, error) {
if n.Provides() != c.Wants() {
return nil, fmt.Errorf("cannot add child mismatched edges: %s:%s -> %s:%s", n.Name(), n.Provides(), c.Name(), c.Wants())
}
if n.Provides() == pipeline.NoEdge {
return nil, fmt.Errorf("cannot add child no edge expected: %s:%s -> %s:%s", n.Name(), n.Provides(), c.Name(), c.Wants())
}
n.children = append(n.children, c)

edge := newEdge(n.et.Task.Name, n.Name(), c.Name(), n.Provides(), defaultEdgeBufferSize, n.et.tm.LogService)
Expand Down
Loading

0 comments on commit 552a49a

Please sign in to comment.