Skip to content

Commit

Permalink
fix window node so that when every==0, we can immediately emit the cu…
Browse files Browse the repository at this point in the history
…rrent window
  • Loading branch information
nathanielc committed Nov 10, 2016
1 parent b5bd467 commit be4ab12
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features

- [#913](https://github.com/influxdata/kapacitor/issues/913): Add fillPeriod option to Window node, so that the first emit waits till the period has elapsed before emitting.
- [#898](https://github.com/influxdata/kapacitor/issues/898): Now when the Window node every value is zero, the window will be emitted immediately for each new point.

### Bugfixes

Expand Down
6 changes: 6 additions & 0 deletions integrations/data/TestStream_Window.srpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ rpname
cpu,type=idle,host=serverB value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverC value=96.4 0000000001
dbname
rpname
disk,type=sda,host=serverB value=39 0000000001
dbname
rpname
Expand Down Expand Up @@ -79,3 +82,6 @@ cpu,type=idle,host=serverA value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverB value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverC value=95.8 0000000012
139 changes: 139 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,145 @@ stream
testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, false, nil)
}

func TestStream_Window_Every_0(t *testing.T) {

var script = `
var period = 10s
// Emit the window on every point
var every = 0s
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('cpu')
.groupBy('host')
|window()
.period(period)
.every(every)
|count('value')
|window()
.period(10s)
.every(10s)
|httpOut('TestStream_Window')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "count"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
2.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
3.0,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
4.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
5.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
6.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
7.0,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
8.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
9.0,
},
{
time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC),
10.0,
},
},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB"},
Columns: []string{"time", "count"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
2.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
3.0,
},
{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
4.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
5.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
6.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
7.0,
},
{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
8.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
9.0,
},
{
time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC),
10.0,
},
},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverC"},
Columns: []string{"time", "count"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
2.0,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, false, nil)
}

func TestStream_Window_Overlapping(t *testing.T) {

var script = `
Expand Down
1 change: 1 addition & 0 deletions pipeline/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type WindowNode struct {
// The period, or length in time, of the window.
Period time.Duration
// How often the current window is emitted into the pipeline.
// If equal to zero, then every new point will emit the current window.
Every time.Duration
// Whether to align the window edges with the zero time
// tick:ignore
Expand Down
11 changes: 10 additions & 1 deletion window.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,13 @@ func (w *WindowNode) runWindow([]byte) error {
}
windows[p.Group] = wnd
}
if w.w.Every == 0 {
// Insert the point now since we know we do not need to wait.
wnd.buf.insert(p)
// We are emitting on every point, so the nextEmit is always
// the time of the current point.
wnd.nextEmit = p.Time
}
if !p.Time.Before(wnd.nextEmit) {
points := wnd.emit(p.Time)
// Send window to all children
Expand All @@ -81,7 +88,9 @@ func (w *WindowNode) runWindow([]byte) error {
}
w.timer.Resume()
}
wnd.buf.insert(p)
if w.w.Every != 0 {
wnd.buf.insert(p)
}
w.timer.Stop()
}
return nil
Expand Down

0 comments on commit be4ab12

Please sign in to comment.