Skip to content

Commit

Permalink
Add multiple field support to Change Detect
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Nov 21, 2018
1 parent ca77ad0 commit ec22ea9
Show file tree
Hide file tree
Showing 18 changed files with 238 additions and 73 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
- [#2095](https://github.com/influxdata/kapacitor/issues/2095): Add barrier node support to join node.
- [#1157](https://github.com/influxdata/kapacitor/issues/1157): Add ability to expire groups using the barrier node.
- [#2099](https://github.com/influxdata/kapacitor/issues/2099): Add `alert/persist-topics` to config
- [#2101](https://github.com/influxdata/kapacitor/issues/2101): Add multiple field support to the change detect node.

### Bugfixes

- [#2048](https://github.com/influxdata/kapacitor/pull/2048): Fix join not catching up fast enough after a pause in the data stream.
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_build_ubuntu32
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSIO

# Install go
ENV GOPATH /root/go
ENV GO_VERSION 1.10.2
ENV GO_VERSION 1.11.2
ENV GO_ARCH 386
RUN wget -q https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile_build_ubuntu64
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ RUN wget -q https://github.com/google/protobuf/releases/download/v${PROTO_VERSIO

# Install go
ENV GOPATH /root/go
ENV GO_VERSION 1.10.2
ENV GO_VERSION 1.11.2
ENV GO_ARCH amd64
RUN wget -q https://storage.googleapis.com/golang/go${GO_VERSION}.linux-${GO_ARCH}.tar.gz; \
tar -C /usr/local/ -xf /go${GO_VERSION}.linux-${GO_ARCH}.tar.gz ; \
Expand Down
40 changes: 19 additions & 21 deletions change_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (g *changeDetectGroup) BeginBatch(begin edge.BeginBatchMessage) (edge.Messa
}

func (g *changeDetectGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
emit := g.doChangeDetect(bp)
if emit {
changed := g.doChangeDetect(bp)
if changed {
return bp, nil
}
return nil, nil
Expand All @@ -74,8 +74,8 @@ func (g *changeDetectGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, er
}

func (g *changeDetectGroup) Point(p edge.PointMessage) (edge.Message, error) {
emit := g.doChangeDetect(p)
if emit {
changed := g.doChangeDetect(p)
if changed {
return p, nil
}
return nil, nil
Expand All @@ -89,9 +89,9 @@ func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter) bool {
prevFields = g.previous.Fields()
}
currFields = p.Fields()
emit := g.n.changeDetect(prevFields, currFields)
changed := g.n.changeDetect(prevFields, currFields)

if !emit {
if !changed {
return false
}
g.previous = p
Expand All @@ -106,21 +106,19 @@ func (g *changeDetectGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message
}
func (g *changeDetectGroup) Done() {}

// changeDetect calculates the changeDetect between prev and cur.
// Return is the resulting changeDetect, whether the current point should be
// stored as previous, and whether the point result should be emitted.
// changeDetect reports whether there was a change between prev and cur.
func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields) bool {

value, ok := curr[n.d.Field]
if !ok {
n.diag.Error("Invalid field in change detect",
fmt.Errorf("expected field %s not found", n.d.Field),
keyvalue.KV("field", n.d.Field))
return false
}
if prev[n.d.Field] == value {
return false
for _, field := range n.d.Fields {
value, ok := curr[field]
if !ok {
n.diag.Error("Invalid field in change detect",
fmt.Errorf("expected field %s not found", field),
keyvalue.KV("field", field))
continue
}
if prev[field] != value {
return true
}
}

return true
return false
}
4 changes: 2 additions & 2 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type InfluxQLNode struct {

func newInfluxQLNode(et *ExecutingTask, n *pipeline.InfluxQLNode, d NodeDiagnostic) (*InfluxQLNode, error) {
m := &InfluxQLNode{
node: node{Node: n, et: et, diag: d},
n: n,
node: node{Node: n, et: et, diag: d},
n: n,
isStreamTransformation: n.ReduceCreater.IsStreamTransformation,
}
m.node.runF = m.runInfluxQL
Expand Down
50 changes: 50 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,56 @@ batch
testBatcherWithOutput(t, "TestBatch_ChangeDetect", script, 21*time.Second, er, false)
}

func TestBatch_ChangeDetect_Many(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
|changeDetect('a','b')
|httpOut('TestBatch_ChangeDetect_Many')
`

er := models.Result{
Series: models.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "a", "b"},
Values: [][]interface{}{
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
"bad",
0.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
"good",
0.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
"bad",
1.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
"bad",
0.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_ChangeDetect_Many", script, 21*time.Second, er, false)
}

func TestBatch_Derivative(t *testing.T) {

var script = `
Expand Down
57 changes: 51 additions & 6 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,51 @@ func TestStream_ChangeDetect(t *testing.T) {

testStreamerWithOutput(t, "TestStream_ChangeDetect", script, 15*time.Second, er, false, nil)
}
func TestStream_ChangeDetect_Many(t *testing.T) {

var script = `stream
|from().measurement('packets')
|changeDetect('a','b')
|window()
.period(6s)
.every(6s)
|httpOut('TestStream_ChangeDetect_Many')
`

er := models.Result{
Series: models.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "a", "b"},
Values: [][]interface{}{
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
"bad",
0.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
"good",
0.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
"bad",
1.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
"bad",
0.0,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_ChangeDetect_Many", script, 15*time.Second, er, false, nil)
}

func TestStream_Derivative(t *testing.T) {

Expand Down Expand Up @@ -10071,9 +10116,9 @@ Value: {{ index .Fields "count" }}
"Mime-Version": []string{"1.0"},
"Content-Type": []string{"text/html; charset=UTF-8"},
"Content-Transfer-Encoding": []string{"quoted-printable"},
"To": []string{"[email protected], [email protected]"},
"From": []string{"[email protected]"},
"Subject": []string{"kapacitor.cpu.serverA is CRITICAL"},
"To": []string{"[email protected], [email protected]"},
"From": []string{"[email protected]"},
"Subject": []string{"kapacitor.cpu.serverA is CRITICAL"},
},
Body: `
<b>kapacitor.cpu.serverA is CRITICAL</b>
Expand All @@ -10087,9 +10132,9 @@ Value: 10
"Mime-Version": []string{"1.0"},
"Content-Type": []string{"text/html; charset=UTF-8"},
"Content-Transfer-Encoding": []string{"quoted-printable"},
"To": []string{"[email protected], [email protected]"},
"From": []string{"[email protected]"},
"Subject": []string{"kapacitor.cpu.serverA is CRITICAL"},
"To": []string{"[email protected], [email protected]"},
"From": []string{"[email protected]"},
"Subject": []string{"kapacitor.cpu.serverA is CRITICAL"},
},
Body: `
<b>kapacitor.cpu.serverA is CRITICAL</b>
Expand Down
12 changes: 11 additions & 1 deletion integrations/testdata/TestBatch_ChangeDetect.0.brpl
Original file line number Diff line number Diff line change
@@ -1 +1,11 @@
{"name":"packets","points":[{"fields":{"value":"bad"},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":"good"},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":"good"},"time":"2015-10-18T00:00:04Z"},{"fields":{"value2":"good"},"time":"2015-10-18T00:00:05Z"},{"fields":{"value":"bad"},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":"good"},"time":"2015-10-18T00:00:08Z"}]}
{
"name":"packets",
"points":[
{"fields":{"value":"bad"},"time":"2015-10-18T00:00:00Z"},
{"fields":{"value":"good"},"time":"2015-10-18T00:00:02Z"},
{"fields":{"value":"good"},"time":"2015-10-18T00:00:04Z"},
{"fields":{"value2":"good"},"time":"2015-10-18T00:00:05Z"},
{"fields":{"value":"bad"},"time":"2015-10-18T00:00:06Z"},
{"fields":{"value":"good"},"time":"2015-10-18T00:00:08Z"}
]
}
11 changes: 11 additions & 0 deletions integrations/testdata/TestBatch_ChangeDetect_Many.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"name":"packets",
"points":[
{"fields":{"a":"bad","b":0},"time":"2015-10-18T00:00:00Z"},
{"fields":{"a":"good","b":0},"time":"2015-10-18T00:00:02Z"},
{"fields":{"a":"good","b":0},"time":"2015-10-18T00:00:04Z"},
{"fields":{"a":"good","b":0,"c":"something else"},"time":"2015-10-18T00:00:05Z"},
{"fields":{"a":"bad","b":1},"time":"2015-10-18T00:00:06Z"},
{"fields":{"a":"bad","b":0},"time":"2015-10-18T00:00:08Z"}
]
}
21 changes: 21 additions & 0 deletions integrations/testdata/TestStream_ChangeDetect_Many.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
dbname
rpname
packets a="bad",b=0 0000000000
dbname
rpname
packets a="good",b=0 0000000001
dbname
rpname
packets a="good",b=0 0000000002
dbname
rpname
packets a="good",b=0,c="something else" 0000000003
dbname
rpname
packets a="bad",b=1 0000000004
dbname
rpname
packets a="bad",b=0 0000000005
dbname
rpname
packets a="pump",b=2 0000000006
30 changes: 26 additions & 4 deletions pipeline/change_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,41 @@ import (
// Where the data are unchanged, but only the points
// where the value changes from the previous value are
// emitted.

//
// It is also possible to provide a list of many fields and if any field changes the point will be emitted.
//
// Example:
// stream
// |from()
// .measurement('packets')
// |changeDetect('in','out')
// ...
//
// with source data:
// packets in=0,out=0 0000000000
// packets in=1,out=0 0000000001
// packets in=1,out=1 0000000002
// packets in=1,out=1 0000000003
// packets in=2,out=1 0000000004
// packets in=2,out=1 0000000005
//
// Would have output:
// packets in=0,out=0 0000000000
// packets in=1,out=0 0000000001
// packets in=1,out=1 0000000002
// packets in=2,out=1 0000000004
type ChangeDetectNode struct {
chainnode `json:"-"`

// The field to use when calculating the changeDetect
// tick:ignore
Field string `json:"field"`
Fields []string `json:"fields"`
}

func newChangeDetectNode(wants EdgeType, field string) *ChangeDetectNode {
func newChangeDetectNode(wants EdgeType, fields []string) *ChangeDetectNode {
return &ChangeDetectNode{
chainnode: newBasicChainNode("changeDetect", wants, wants),
Field: field,
Fields: fields,
}
}

Expand Down
2 changes: 1 addition & 1 deletion pipeline/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,7 +521,7 @@ type chainnodeAlias interface {
Default() *DefaultNode
Delete() *DeleteNode
Derivative(string) *DerivativeNode
ChangeDetect(string) *ChangeDetectNode
ChangeDetect(...string) *ChangeDetectNode
Desc() string
Difference(string) *InfluxQLNode
Distinct(string) *InfluxQLNode
Expand Down
4 changes: 2 additions & 2 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,8 +460,8 @@ func (n *chainnode) Derivative(field string) *DerivativeNode {
}

// Create a new node that only emits new points if different from the previous point
func (n *chainnode) ChangeDetect(field string) *ChangeDetectNode {
s := newChangeDetectNode(n.Provides(), field)
func (n *chainnode) ChangeDetect(fields ...string) *ChangeDetectNode {
s := newChangeDetectNode(n.Provides(), fields)
n.linkChild(s)
return s
}
Expand Down
6 changes: 5 additions & 1 deletion pipeline/tick/change_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func NewChangeDetect(parents []ast.Node) *ChangeDetectNode {

// Build creates a ChangeDetect ast.Node
func (n *ChangeDetectNode) Build(d *pipeline.ChangeDetectNode) (ast.Node, error) {
n.Pipe("changeDetect", d.Field)
fields := make([]interface{}, len(d.Fields))
for i, f := range d.Fields {
fields[i] = f
}
n.Pipe("changeDetect", fields...)
return n.prev, n.err
}
Loading

0 comments on commit ec22ea9

Please sign in to comment.