Skip to content

Commit

Permalink
Merge pull request influxdata#1844 from influxdata/aa_643_changedetect
Browse files Browse the repository at this point in the history
Node to detect changes between consecutive values in a series field
  • Loading branch information
aanthony1243 authored Mar 9, 2018
2 parents e48a302 + 193c1b8 commit c7339c8
Show file tree
Hide file tree
Showing 13 changed files with 415 additions and 0 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

### Features


- [#1844](https://github.com/influxdata/kapacitor/pull/1844): Added a new kapacitor node changeDetect that emits a value
for each time a series field changes.
- [#1828](https://github.com/influxdata/kapacitor/pull/1828): Add recoverable field to JSON alert response to indicate whether the
alert will auto-recover.
- [#1823](https://github.com/influxdata/kapacitor/pull/1823): Update OpsGenie integration to use the v2 API.
Expand Down
125 changes: 125 additions & 0 deletions change_detect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package kapacitor

import (
"fmt"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

type ChangeDetectNode struct {
node
d *pipeline.ChangeDetectNode
}

// Create a new changeDetect node.
func newChangeDetectNode(et *ExecutingTask, n *pipeline.ChangeDetectNode, d NodeDiagnostic) (*ChangeDetectNode, error) {
dn := &ChangeDetectNode{
node: node{Node: n, et: et, diag: d},
d: n,
}
// Create stateful expressions
dn.node.runF = dn.runChangeDetect
return dn, nil
}

func (n *ChangeDetectNode) runChangeDetect([]byte) error {
consumer := edge.NewGroupedConsumer(
n.ins[0],
n,
)
n.statMap.Set(statCardinalityGauge, consumer.CardinalityVar())
return consumer.Consume()
}

func (n *ChangeDetectNode) NewGroup(group edge.GroupInfo, first edge.PointMeta) (edge.Receiver, error) {
return edge.NewReceiverFromForwardReceiverWithStats(
n.outs,
edge.NewTimedForwardReceiver(n.timer, n.newGroup()),
), nil
}

func (n *ChangeDetectNode) newGroup() *changeDetectGroup {
return &changeDetectGroup{
n: n,
}
}

type changeDetectGroup struct {
n *ChangeDetectNode
previous edge.FieldsTagsTimeGetter
}

func (g *changeDetectGroup) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
if s := begin.SizeHint(); s > 0 {
begin = begin.ShallowCopy()
begin.SetSizeHint(0)
}
g.previous = nil
return begin, nil
}

func (g *changeDetectGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
emit := g.doChangeDetect(bp)
if emit {
return bp, nil
}
return nil, nil
}

func (g *changeDetectGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
return end, nil
}

func (g *changeDetectGroup) Point(p edge.PointMessage) (edge.Message, error) {
emit := g.doChangeDetect(p)
if emit {
return p, nil
}
return nil, nil
}

// doChangeDetect computes the changeDetect with respect to g.previous and p.
// The resulting changeDetect value will be set on n.
func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter) bool {
var prevFields, currFields models.Fields
if g.previous != nil {
prevFields = g.previous.Fields()
}
currFields = p.Fields()
emit := g.n.changeDetect(prevFields, currFields)

if !emit {
return false
}
g.previous = p
return true
}

func (g *changeDetectGroup) Barrier(b edge.BarrierMessage) (edge.Message, error) {
return b, nil
}
func (g *changeDetectGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message, error) {
return d, nil
}

// changeDetect calculates the changeDetect between prev and cur.
// Return is the resulting changeDetect, whether the current point should be
// stored as previous, and whether the point result should be emitted.
func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields) bool {

value, ok := curr[n.d.Field]
if !ok {
n.diag.Error("Invalid field in change detect",
fmt.Errorf("expected field %s not found", n.d.Field),
keyvalue.KV("field", n.d.Field))
return false
}
if prev[n.d.Field] == value {
return false
}

return true
}
53 changes: 53 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,59 @@ func TestBatch_InvalidQuery(t *testing.T) {
}
}

//{"name":"packets","points":[
// {"fields":{"value":"bad"},"time":"2015-10-18T00:00:00Z"},
// {"fields":{"value":"good"},"time":"2015-10-18T00:00:02Z"},
// {"fields":{"value":"good"},"time":"2015-10-18T00:00:04Z"},
// {"fields":{"value2":"good"},"time":"2015-10-18T00:00:05Z"},
// {"fields":{"value":"bad"},"time":"2015-10-18T00:00:06Z"},
// {"fields":{"value":"good"},"time":"2015-10-18T00:00:08Z"}]}
func TestBatch_ChangeDetect(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
|changeDetect('value')
|httpOut('TestBatch_ChangeDetect')
`

er := models.Result{
Series: models.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
"bad",
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
"good",
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
"bad",
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
"good",
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_ChangeDetect", script, 21*time.Second, er, false)
}

func TestBatch_Derivative(t *testing.T) {

var script = `
Expand Down
50 changes: 50 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,56 @@ stream
testStreamerWithOutput(t, "TestStream_InfluxQLNodeMissingValue", script, 15*time.Second, er, false, nil)
}

func TestStream_ChangeDetect(t *testing.T) {

var script = `stream
|from().measurement('packets')
|changeDetect('value')
|window()
.period(10s)
.every(10s)
|httpOut('TestStream_ChangeDetect')
`

er := models.Result{
Series: models.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "value"},
Values: [][]interface{}{
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
"bad",
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
"good",
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
"bad",
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
"good",
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
"bad",
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
"good",
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_ChangeDetect", script, 15*time.Second, er, false, nil)
}

func TestStream_Derivative(t *testing.T) {

var script = `
Expand Down
1 change: 1 addition & 0 deletions integrations/testdata/TestBatch_ChangeDetect.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name":"packets","points":[{"fields":{"value":"bad"},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":"good"},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":"good"},"time":"2015-10-18T00:00:04Z"},{"fields":{"value2":"good"},"time":"2015-10-18T00:00:05Z"},{"fields":{"value":"bad"},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":"good"},"time":"2015-10-18T00:00:08Z"}]}
36 changes: 36 additions & 0 deletions integrations/testdata/TestStream_ChangeDetect.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
dbname
rpname
packets value="bad" 0000000000
dbname
rpname
packets value="good" 0000000001
dbname
rpname
packets value="bad" 0000000002
dbname
rpname
packets value="bad" 0000000003
dbname
rpname
packets value="bad" 0000000004
dbname
rpname
packets value="good" 0000000005
dbname
rpname
packets value2="good" 0000000006
dbname
rpname
packets value="bad" 0000000007
dbname
rpname
packets value="good" 0000000008
dbname
rpname
packets value="good" 0000000009
dbname
rpname
packets value="good" 0000000010
dbname
rpname
packets value="bad" 0000000011
92 changes: 92 additions & 0 deletions pipeline/change_detect.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package pipeline

import (
"encoding/json"
"fmt"
)

// Compute the changeDetect of a stream or batch.
// The changeDetect is computed on a single field,
// discarding consecutive duplicate values, detecting
// detects the points at which the series field
// changes from one value to another.
//
//
// Example:
// stream
// |from()
// .measurement('packets')
// |changeDetect('value')
// ...
//
// with source data:
// packets value="bad" 0000000000
// packets value="good" 0000000001
// packets value="bad" 0000000002
// packets value="bad" 0000000003
// packets value="bad" 0000000004
// packets value="good" 0000000005
//
// Would have output:
// packets value="bad" 0000000000
// packets value="good" 0000000001
// packets value="bad" 0000000002
// packets value="good" 0000000005
//
// Where the data are unchanged, but only the points
// where the value changes from the previous value are
// emitted.

type ChangeDetectNode struct {
chainnode `json:"-"`

// The field to use when calculating the changeDetect
// tick:ignore
Field string `json:"field"`
}

func newChangeDetectNode(wants EdgeType, field string) *ChangeDetectNode {
return &ChangeDetectNode{
chainnode: newBasicChainNode("changeDetect", wants, wants),
Field: field,
}
}

// MarshalJSON converts ChangeDetectNode to JSON
// tick:ignore
func (n *ChangeDetectNode) MarshalJSON() ([]byte, error) {
type Alias ChangeDetectNode
var raw = &struct {
TypeOf
*Alias
}{
TypeOf: TypeOf{
Type: "changeDetect",
ID: n.ID(),
},
Alias: (*Alias)(n),
}
return json.Marshal(raw)
}

// UnmarshalJSON converts JSON to an ChangeDetectNode
// tick:ignore
func (n *ChangeDetectNode) UnmarshalJSON(data []byte) error {
type Alias ChangeDetectNode
var raw = &struct {
TypeOf
*Alias
}{
Alias: (*Alias)(n),
}
err := json.Unmarshal(data, raw)
if err != nil {
return err
}
if raw.Type != "changeDetect" {
return fmt.Errorf("error unmarshaling node %d of type %s as ChangeDetectNode", raw.ID, raw.Type)
}

n.setID(raw.ID)
return nil
}
Loading

0 comments on commit c7339c8

Please sign in to comment.