diff --git a/change_detect.go b/change_detect.go index 19974ce47..67436585d 100644 --- a/change_detect.go +++ b/change_detect.go @@ -2,7 +2,6 @@ package kapacitor import ( "fmt" - "time" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/keyvalue" @@ -56,17 +55,16 @@ type changeDetectGroup struct { func (g *changeDetectGroup) BeginBatch(begin edge.BeginBatchMessage) (edge.Message, error) { if s := begin.SizeHint(); s > 0 { begin = begin.ShallowCopy() - begin.SetSizeHint(s - 1) + begin.SetSizeHint(0) } g.previous = nil return begin, nil } func (g *changeDetectGroup) BatchPoint(bp edge.BatchPointMessage) (edge.Message, error) { - np := bp.ShallowCopy() - emit := g.doChangeDetect(bp, np) + emit := g.doChangeDetect(bp) if emit { - return np, nil + return bp, nil } return nil, nil } @@ -76,39 +74,27 @@ func (g *changeDetectGroup) EndBatch(end edge.EndBatchMessage) (edge.Message, er } func (g *changeDetectGroup) Point(p edge.PointMessage) (edge.Message, error) { - np := p.ShallowCopy() - emit := g.doChangeDetect(p, np) + emit := g.doChangeDetect(p) if emit { - return np, nil + return p, nil } return nil, nil } // doChangeDetect computes the changeDetect with respect to g.previous and p. // The resulting changeDetect value will be set on n. -func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter, n edge.FieldsTagsTimeSetter) bool { +func (g *changeDetectGroup) doChangeDetect(p edge.FieldsTagsTimeGetter) bool { var prevFields, currFields models.Fields - var prevTime, currTime time.Time if g.previous != nil { prevFields = g.previous.Fields() - prevTime = g.previous.Time() } currFields = p.Fields() - currTime = p.Time() - value, store, emit := g.n.changeDetect( - prevFields, currFields, - prevTime, currTime, - ) - if store { - g.previous = p - } + emit := g.n.changeDetect(prevFields, currFields) + if !emit { return false } - - fields := n.Fields().Copy() - fields[g.n.d.Field] = value - n.SetFields(fields) + g.previous = p return true } @@ -122,18 +108,18 @@ func (g *changeDetectGroup) DeleteGroup(d edge.DeleteGroupMessage) (edge.Message // changeDetect calculates the changeDetect between prev and cur. // Return is the resulting changeDetect, whether the current point should be // stored as previous, and whether the point result should be emitted. -func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields, prevTime, currTime time.Time) (interface{}, bool, bool) { +func (n *ChangeDetectNode) changeDetect(prev, curr models.Fields) bool { value, ok := curr[n.d.Field] if !ok { n.diag.Error("Invalid field in change detect", fmt.Errorf("expected field %s not found", n.d.Field), keyvalue.KV("field", n.d.Field)) - return 0, false, false + return false } if prev[n.d.Field] == value { - return value, false, false + return false } - return value, true, true + return true } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 82d06b67f..e6649e645 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -146,8 +146,7 @@ stream func TestStream_ChangeDetect(t *testing.T) { - var script = ` -stream + var script = `stream |from().measurement('packets') |changeDetect('value') |window() diff --git a/pipeline/change_detect.go b/pipeline/change_detect.go index c1c08d87f..b709f5fdd 100644 --- a/pipeline/change_detect.go +++ b/pipeline/change_detect.go @@ -15,10 +15,27 @@ import ( // Example: // stream // |from() -// .measurement('net_rx_packets') +// .measurement('packets') // |changeDetect('value') // ... // +// with source data: +// packets value="bad" 0000000000 +// packets value="good" 0000000001 +// packets value="bad" 0000000002 +// packets value="bad" 0000000003 +// packets value="bad" 0000000004 +// packets value="good" 0000000005 +// +// Would have output: +// packets value="bad" 0000000000 +// packets value="good" 0000000001 +// packets value="bad" 0000000002 +// packets value="good" 0000000005 +// +// Where the data are unchanged, but only the points +// where the value changes from the previous value are +// emitted. type ChangeDetectNode struct { chainnode `json:"-"` diff --git a/pipeline/tick/change_detect_test.go b/pipeline/tick/change_detect_test.go index 1af0720a9..31dd3c621 100644 --- a/pipeline/tick/change_detect_test.go +++ b/pipeline/tick/change_detect_test.go @@ -2,22 +2,15 @@ package tick_test import ( "testing" - "time" ) func TestChangeDetect(t *testing.T) { pipe, _, from := StreamFrom() - d := from.ChangeDetect("work") - d.As = "very important" - d.Unit = time.Hour - d.NonNegative() + from.ChangeDetect("work") want := `stream |from() |changeDetect('work') - .as('very important') - .unit(1h) - .nonNegative() ` PipelineTickTestHelper(t, pipe, want) }