From be4ab120f78081f7297232dc1772b84e32b97366 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Thu, 10 Nov 2016 14:00:10 -0700 Subject: [PATCH] fix window node so that when every==0, we can immediately emit the current window --- CHANGELOG.md | 1 + integrations/data/TestStream_Window.srpl | 6 + integrations/streamer_test.go | 139 +++++++++++++++++++++++ pipeline/window.go | 1 + window.go | 11 +- 5 files changed, 157 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8280b8a2..2a6f01523 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ### Features - [#913](https://github.com/influxdata/kapacitor/issues/913): Add fillPeriod option to Window node, so that the first emit waits till the period has elapsed before emitting. +- [#898](https://github.com/influxdata/kapacitor/issues/898): Now when the Window node every value is zero, the window will be emitted immediately for each new point. ### Bugfixes diff --git a/integrations/data/TestStream_Window.srpl b/integrations/data/TestStream_Window.srpl index d327ab258..ac6b0c405 100644 --- a/integrations/data/TestStream_Window.srpl +++ b/integrations/data/TestStream_Window.srpl @@ -6,6 +6,9 @@ rpname cpu,type=idle,host=serverB value=97.1 0000000001 dbname rpname +cpu,type=idle,host=serverC value=96.4 0000000001 +dbname +rpname disk,type=sda,host=serverB value=39 0000000001 dbname rpname @@ -79,3 +82,6 @@ cpu,type=idle,host=serverA value=95.1 0000000012 dbname rpname cpu,type=idle,host=serverB value=95.1 0000000012 +dbname +rpname +cpu,type=idle,host=serverC value=95.8 0000000012 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 4c0e76407..55945ac0e 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -648,6 +648,145 @@ stream testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, false, nil) } +func TestStream_Window_Every_0(t *testing.T) { + + var script = ` +var period = 10s +// Emit the window on every point +var every = 0s +stream + |from() + .database('dbname') + .retentionPolicy('rpname') + .measurement('cpu') + .groupBy('host') + |window() + .period(period) + .every(every) + |count('value') + |window() + .period(10s) + .every(10s) + |httpOut('TestStream_Window') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 1.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 2.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 3.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 4.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 5.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 6.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 7.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + 8.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 9.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + 10.0, + }, + }, + }, + { + Name: "cpu", + Tags: map[string]string{"host": "serverB"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 1.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC), + 2.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 3.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), + 4.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 5.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 6.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 7.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC), + 8.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 9.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC), + 10.0, + }, + }, + }, + { + Name: "cpu", + Tags: map[string]string{"host": "serverC"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 1.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC), + 2.0, + }, + }, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, false, nil) +} + func TestStream_Window_Overlapping(t *testing.T) { var script = ` diff --git a/pipeline/window.go b/pipeline/window.go index 2457e133c..0a29416e9 100644 --- a/pipeline/window.go +++ b/pipeline/window.go @@ -31,6 +31,7 @@ type WindowNode struct { // The period, or length in time, of the window. Period time.Duration // How often the current window is emitted into the pipeline. + // If equal to zero, then every new point will emit the current window. Every time.Duration // Whether to align the window edges with the zero time // tick:ignore diff --git a/window.go b/window.go index d0617e549..af40050b2 100644 --- a/window.go +++ b/window.go @@ -69,6 +69,13 @@ func (w *WindowNode) runWindow([]byte) error { } windows[p.Group] = wnd } + if w.w.Every == 0 { + // Insert the point now since we know we do not need to wait. + wnd.buf.insert(p) + // We are emitting on every point, so the nextEmit is always + // the time of the current point. + wnd.nextEmit = p.Time + } if !p.Time.Before(wnd.nextEmit) { points := wnd.emit(p.Time) // Send window to all children @@ -81,7 +88,9 @@ func (w *WindowNode) runWindow([]byte) error { } w.timer.Resume() } - wnd.buf.insert(p) + if w.w.Every != 0 { + wnd.buf.insert(p) + } w.timer.Stop() } return nil