Skip to content

Commit

Permalink
Changes in response to review
Browse files Browse the repository at this point in the history
  • Loading branch information
Adam committed Mar 9, 2018
1 parent 5e7df2a commit 4f6e0c6
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 38 deletions.
40 changes: 13 additions & 27 deletions change_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package kapacitor

import (
"fmt"
"time"

"github.com/influxdata/kapacitor/edge"
"github.com/influxdata/kapacitor/keyvalue"
Expand Down Expand Up @@ -56,17 +55,16 @@ type changeDetectGroup struct {
func (g *changeDetectGroup) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) {
if s := begin.SizeHint(); s > 0 {
begin = begin.ShallowCopy()
begin.SetSizeHint(s - 1)
begin.SetSizeHint(0)
}
g.previous = nil
return begin, nil
}

func (g *changeDetectGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) {
np := bp.ShallowCopy()
emit := g.doChangeDetect(bp, np)
emit := g.doChangeDetect(bp)
if emit {
return np, nil
return bp, nil
}
return nil, nil
}
Expand All @@ -76,39 +74,27 @@ func (g *changeDetectGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, er
}

func (g *changeDetectGroup) Point(p edge.PointMessage) (edge.Message, error) {
np := p.ShallowCopy()
emit := g.doChangeDetect(p, np)
emit := g.doChangeDetect(p)
if emit {
return np, nil
return p, nil
}
return nil, nil
}

// doChangeDetect computes the changeDetect with respect to g.previous and p.
// The resulting changeDetect value will be set on n.
func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter, n edge.FieldsTagsTimeSetter) bool {
func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter) bool {
var prevFields, currFields models.Fields
var prevTime, currTime time.Time
if g.previous != nil {
prevFields = g.previous.Fields()
prevTime = g.previous.Time()
}
currFields = p.Fields()
currTime = p.Time()
value, store, emit := g.n.changeDetect(
prevFields, currFields,
prevTime, currTime,
)
if store {
g.previous = p
}
emit := g.n.changeDetect(prevFields, currFields)

if !emit {
return false
}

fields := n.Fields().Copy()
fields[g.n.d.Field] = value
n.SetFields(fields)
g.previous = p
return true
}

Expand All @@ -122,18 +108,18 @@ func (g *changeDetectGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message
// changeDetect calculates the changeDetect between prev and cur.
// Return is the resulting changeDetect, whether the current point should be
// stored as previous, and whether the point result should be emitted.
func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields, prevTime, currTime time.Time) (interface{}, bool, bool) {
func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields) bool {

value, ok := curr[n.d.Field]
if !ok {
n.diag.Error("Invalid field in change detect",
fmt.Errorf("expected field %s not found", n.d.Field),
keyvalue.KV("field", n.d.Field))
return 0, false, false
return false
}
if prev[n.d.Field] == value {
return value, false, false
return false
}

return value, true, true
return true
}
3 changes: 1 addition & 2 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ stream

func TestStream_ChangeDetect(t *testing.T) {

var script = `
stream
var script = `stream
|from().measurement('packets')
|changeDetect('value')
|window()
Expand Down
19 changes: 18 additions & 1 deletion pipeline/change_detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,27 @@ import (
// Example:
// stream
// |from()
// .measurement('net_rx_packets')
// .measurement('packets')
// |changeDetect('value')
// ...
//
// with source data:
// packets value="bad" 0000000000
// packets value="good" 0000000001
// packets value="bad" 0000000002
// packets value="bad" 0000000003
// packets value="bad" 0000000004
// packets value="good" 0000000005
//
// Would have output:
// packets value="bad" 0000000000
// packets value="good" 0000000001
// packets value="bad" 0000000002
// packets value="good" 0000000005
//
// Where the data are unchanged, but only the points
// where the value changes from the previous value are
// emitted.

type ChangeDetectNode struct {
chainnode `json:"-"`
Expand Down
9 changes: 1 addition & 8 deletions pipeline/tick/change_detect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,15 @@ package tick_test

import (
"testing"
"time"
)

func TestChangeDetect(t *testing.T) {
pipe, _, from := StreamFrom()
d := from.ChangeDetect("work")
d.As = "very important"
d.Unit = time.Hour
d.NonNegative()
from.ChangeDetect("work")

want := `stream
|from()
|changeDetect('work')
.as('very important')
.unit(1h)
.nonNegative()
`
PipelineTickTestHelper(t, pipe, want)
}

0 comments on commit 4f6e0c6

Please sign in to comment.