Skip to content

Commit

Permalink
Merged pull request influxdata#1030 from influxdata/nc-issue#913
Browse files Browse the repository at this point in the history
Add fillPeriod option to Window node that wait till the period has el…
  • Loading branch information
nathanielc committed Nov 10, 2016
2 parents 939fdf6 + dcec00a commit b5bd467
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 4 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
# Changelog

## v1.1.1 [unreleased]

### Release Notes

### Features

- [#913](https://github.com/influxdata/kapacitor/issues/913): Add fillPeriod option to Window node, so that the first emit waits till the period has elapsed before emitting.

### Bugfixes



## v1.1.0 [2016-10-07]

### Release Notes
Expand Down
48 changes: 48 additions & 0 deletions integrations/data/TestStream_Window_FillPeriod.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000000
dbname
rpname
cpu,type=idle,host=serverA value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverA value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverA value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=93.4 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverA value=91.1 0000000013
dbname
rpname
cpu,type=idle,host=serverA value=95.7 0000000014
dbname
rpname
cpu,type=idle,host=serverA value=96.2 0000000015
63 changes: 63 additions & 0 deletions integrations/data/TestStream_Window_FillPeriod_Aligned.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000000
dbname
rpname
cpu,type=idle,host=serverA value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverA value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverA value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=93.4 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverA value=91.1 0000000013
dbname
rpname
cpu,type=idle,host=serverA value=95.7 0000000014
dbname
rpname
cpu,type=idle,host=serverA value=96.2 0000000015
dbname
rpname
cpu,type=idle,host=serverA value=96.6 0000000016
dbname
rpname
cpu,type=idle,host=serverA value=91.2 0000000017
dbname
rpname
cpu,type=idle,host=serverA value=98.2 0000000018
dbname
rpname
cpu,type=idle,host=serverA value=96.1 0000000019
dbname
rpname
cpu,type=idle,host=serverA value=97.2 0000000020
172 changes: 172 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,6 +648,178 @@ stream
testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, false, nil)
}

func TestStream_Window_Overlapping(t *testing.T) {

var script = `
var period = 14s
var every = 10s
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('cpu')
.where(lambda: "host" == 'serverA')
|window()
.period(period)
.every(every)
|httpOut('TestStream_Window_FillPeriod')
`

nums := []float64{
93.1,
97.1,
92.6,
95.6,
93.1,
92.6,
95.8,
92.7,
96.0,
93.4,
}

values := make([][]interface{}, len(nums))
for i, num := range nums {
values[i] = []interface{}{
time.Date(1971, 1, 1, 0, 0, i, 0, time.UTC),
"serverA",
"idle",
num,
}
}

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "host", "type", "value"},
Values: values,
},
},
}

testStreamerWithOutput(t, "TestStream_Window_FillPeriod", script, 16*time.Second, er, false, nil)
}

func TestStream_Window_FillPeriod(t *testing.T) {

var script = `
var period = 14s
var every = 10s
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('cpu')
.where(lambda: "host" == 'serverA')
|window()
.period(period)
.every(every)
.fillPeriod()
|httpOut('TestStream_Window_FillPeriod')
`

nums := []float64{
93.1,
97.1,
92.6,
95.6,
93.1,
92.6,
95.8,
92.7,
96.0,
93.4,
95.3,
96.4,
95.1,
91.1,
}

values := make([][]interface{}, len(nums))
for i, num := range nums {
values[i] = []interface{}{
time.Date(1971, 1, 1, 0, 0, i, 0, time.UTC),
"serverA",
"idle",
num,
}
}

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "host", "type", "value"},
Values: values,
},
},
}

testStreamerWithOutput(t, "TestStream_Window_FillPeriod", script, 16*time.Second, er, false, nil)
}
func TestStream_Window_FillPeriod_Aligned(t *testing.T) {

var script = `
var period = 14s
var every = 10s
stream
|from()
.database('dbname')
.retentionPolicy('rpname')
.measurement('cpu')
.where(lambda: "host" == 'serverA')
|window()
.period(period)
.every(every)
.fillPeriod()
.align()
|httpOut('TestStream_Window_FillPeriod_Aligned')
`

nums := []float64{
95.8,
92.7,
96.0,
93.4,
95.3,
96.4,
95.1,
91.1,
95.7,
96.2,
96.6,
91.2,
98.2,
96.1,
}

values := make([][]interface{}, len(nums))
for i, num := range nums {
values[i] = []interface{}{
time.Date(1971, 1, 1, 0, 0, i+6, 0, time.UTC),
"serverA",
"idle",
num,
}
}

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "host", "type", "value"},
Values: values,
},
},
}

testStreamerWithOutput(t, "TestStream_Window_FillPeriod_Aligned", script, 21*time.Second, er, false, nil)
}

func TestStream_Shift(t *testing.T) {

var script = `
Expand Down
13 changes: 12 additions & 1 deletion pipeline/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@ type WindowNode struct {
Period time.Duration
// How often the current window is emitted into the pipeline.
Every time.Duration
// Wether to align the window edges with the zero time
// Whether to align the window edges with the zero time
// tick:ignore
AlignFlag bool `tick:"Align"`
// Whether to wait till the period is full before the first emit.
// tick:ignore
FillPeriodFlag bool `tick:"FillPeriod"`
}

func newWindowNode() *WindowNode {
Expand All @@ -54,3 +57,11 @@ func (w *WindowNode) Align() *WindowNode {
w.AlignFlag = true
return w
}

// FillPeriod instructs the WindowNode to wait till the period has elapsed before emitting the first batch.
// This only applies if the period is greater than the every value.
// tick:property
func (w *WindowNode) FillPeriod() *WindowNode {
w.FillPeriodFlag = true
return w
}
21 changes: 18 additions & 3 deletions window.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,24 @@ func (w *WindowNode) runWindow([]byte) error {
for _, dim := range p.Dimensions.TagNames {
tags[dim] = p.Tags[dim]
}
nextEmit := p.Time.Add(w.w.Every)
if w.w.AlignFlag {
nextEmit = nextEmit.Truncate(w.w.Every)
// Determine first next emit time.
var nextEmit time.Time
if w.w.FillPeriodFlag {
nextEmit = p.Time.Add(w.w.Period)
if w.w.AlignFlag {
firstPeriod := nextEmit
// Needs to be aligned with Every and be greater than now+Period
nextEmit = nextEmit.Truncate(w.w.Every)
if !nextEmit.After(firstPeriod) {
// This means we will drop the first few points
nextEmit = nextEmit.Add(w.w.Every)
}
}
} else {
nextEmit = p.Time.Add(w.w.Every)
if w.w.AlignFlag {
nextEmit = nextEmit.Truncate(w.w.Every)
}
}
wnd = &window{
buf: &windowBuffer{logger: w.logger},
Expand Down

0 comments on commit b5bd467

Please sign in to comment.