Skip to content

Commit

Permalink
add StateDuration & StateCount nodes
Browse files Browse the repository at this point in the history
The StateDuration node calculates the duration for which a given expression evaluates as true.
The StateCount node calculates the number of consecutive points for which a given expression evaluates as true.
  • Loading branch information
phemmer committed Mar 6, 2017
1 parent 10f4f90 commit 9163ee8
Show file tree
Hide file tree
Showing 9 changed files with 616 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
- [#922](https://github.com/influxdata/kapacitor/issues/922): Expose server specific information in alert templates.
- [#1162](https://github.com/influxdata/kapacitor/pulls/1162): Add Pushover integration.
- [#1221](https://github.com/influxdata/kapacitor/pull/1221): Add `working_cardinality` stat to each node type that tracks the number of groups per node.
- [#1211](https://github.com/influxdata/kapacitor/issues/1211): Add StateDuration node.

### Bugfixes

Expand Down
137 changes: 137 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2326,6 +2326,143 @@ batch
testBatcherWithOutput(t, "TestBatch_JoinOn_Fill", script, 30*time.Second, er, true)
}

func TestBatch_StateDuration(t *testing.T) {
var script = `
batch
|query('SELECT value FROM "telegraf"."default"."cpu"')
.period(4s)
.every(4s)
.groupBy('host')
|stateDuration(lambda: "value" > 95)
.unit(1ms)
.as('my_duration')
|httpOut('TestBatch_StateTracking')
`
er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "my_duration", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
0.0,
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
1000.0,
96.6,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
-1.0,
83.6,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
0.0,
99.1,
},
},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB"},
Columns: []string{"time", "my_duration", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
-1.0,
47.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
0.0,
95.1,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
2000.0,
96.1,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_StateTracking", script, 8*time.Second, er, false)
}

func TestBatch_StateCount(t *testing.T) {
var script = `
batch
|query('SELECT value FROM "telegraf"."default"."cpu"')
.period(4s)
.every(4s)
.groupBy('host')
|stateCount(lambda: "value" > 95)
.as('my_count')
|httpOut('TestBatch_StateTracking')
`
er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "my_count", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
1.0,
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
2.0,
96.6,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
-1.0,
83.6,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
1.0,
99.1,
},
},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB"},
Columns: []string{"time", "my_count", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
-1.0,
47.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
1.0,
95.1,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
2.0,
96.1,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_StateTracking", script, 8*time.Second, er, false)
}

// Helper test function for batcher
func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.ExecutingTask, <-chan error, *kapacitor.TaskMaster) {
if testing.Verbose() {
Expand Down
4 changes: 4 additions & 0 deletions integrations/data/TestBatch_StateTracking.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"name":"cpu","tags":{"host":"serverA"},"points":[{"time":"1971-01-01T00:00:00Z","fields":{"value":100}}]}
{"name":"cpu","tags":{"host":"serverB"},"points":[{"time":"1971-01-01T00:00:00Z","fields":{"value":100}}]}
{"name":"cpu","tags":{"host":"serverA"},"points":[{"time":"1971-01-01T00:00:04Z","fields":{"value":97.1}},{"time":"1971-01-01T00:00:05Z","fields":{"value":96.6}},{"time":"1971-01-01T00:00:06Z","fields":{"value":83.6}},{"time":"1971-01-01T00:00:07Z","fields":{"value":99.1}}]}
{"name":"cpu","tags":{"host":"serverB"},"points":[{"time":"1971-01-01T00:00:04Z","fields":{"value":47}},{"time":"1971-01-01T00:00:05Z","fields":{"value":95.1}},{"time":"1971-01-01T00:00:06Z","fields":{"value":null}},{"time":"1971-01-01T00:00:07Z","fields":{"value":96.1}}]}
30 changes: 30 additions & 0 deletions integrations/data/TestStream_StateTracking.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
dbname
rpname
cpu,host=serverA value=97.1 0000000001
dbname
rpname
cpu,host=serverB value=47.0 0000000001
dbname
rpname
cpu,host=serverA value=96.6 0000000002
dbname
rpname
cpu,host=serverB value=95.1 0000000002
dbname
rpname
cpu,host=serverA value=83.6 0000000003
dbname
rpname
cpu,host=serverB x=95.6 0000000003
dbname
rpname
cpu,host=serverA value=99.1 0000000004
dbname
rpname
cpu,host=serverB value=96.1 0000000004
dbname
rpname
cpu,host=serverA value=0 0000000005
dbname
rpname
cpu,host=serverB value=0 0000000005
135 changes: 135 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9334,6 +9334,141 @@ func testStreamerCardinality(
}
}

func TestStream_StateDuration(t *testing.T) {
var script = `
stream
|from().measurement('cpu')
|groupBy('host')
|stateDuration(lambda: "value" > 95)
.unit(1ms)
.as('my_duration')
|window().period(4s).every(4s)
|httpOut('TestStream_StateTracking')
`
er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "my_duration", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.0,
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
1000.0,
96.6,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
-1.0,
83.6,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
0.0,
99.1,
},
},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB"},
Columns: []string{"time", "my_duration", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
-1.0,
47.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
0.0,
95.1,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
2000.0,
96.1,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_StateTracking", script, 4*time.Second, er, false, nil)
}

func TestStream_StateCount(t *testing.T) {
var script = `
stream
|from().measurement('cpu')
|groupBy('host')
|stateCount(lambda: "value" > 95)
.as('my_count')
|window().period(4s).every(4s)
|httpOut('TestStream_StateTracking')
`
er := models.Result{
Series: models.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "my_count", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
1.0,
97.1,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
2.0,
96.6,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
-1.0,
83.6,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
1.0,
99.1,
},
},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB"},
Columns: []string{"time", "my_count", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
-1.0,
47.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
1.0,
95.1,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
2.0,
96.1,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_StateTracking", script, 4*time.Second, er, false, nil)
}

// Helper test function for streamer
func testStreamer(
t *testing.T,
Expand Down
14 changes: 14 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,3 +451,17 @@ func (n *chainnode) K8sAutoscale() *K8sAutoscaleNode {
n.linkChild(k)
return k
}

// Create a node that tracks duration in a given state.
func (n *chainnode) StateDuration(expression *ast.LambdaNode) *StateDurationNode {
sd := newStateDurationNode(n.provides, expression)
n.linkChild(sd)
return sd
}

// Create a node that tracks number of consecutive points in a given state.
func (n *chainnode) StateCount(expression *ast.LambdaNode) *StateCountNode {
sc := newStateCountNode(n.provides, expression)
n.linkChild(sc)
return sc
}
Loading

0 comments on commit 9163ee8

Please sign in to comment.