Skip to content

Commit

Permalink
Merge pull request influxdata#231 from influxdata/nc-shift
Browse files Browse the repository at this point in the history
Add shift node so values from different times can be joined
  • Loading branch information
Nathaniel Cook committed Feb 11, 2016
2 parents 6d2c50b + 9019c78 commit 4e5a5e5
Show file tree
Hide file tree
Showing 11 changed files with 508 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ kapacitor*.zip
*.pyc
*.test
/test-logs
*.prof
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
### Release Notes

### Features
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.


### Bugfixes
- [#199](https://github.com/influxdata/kapacitor/issues/199): BREAKING: Various fixes for the Alerta integration.
Expand Down
78 changes: 78 additions & 0 deletions integrations/data/TestStream_Shift.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
dbname
rpname
cpu,type=idle,host=serverA value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverB value=97.1 0000000001
dbname
rpname
disk,type=sda,host=serverB value=39 0000000001
dbname
rpname
cpu,type=idle,host=serverB value=92.6 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverB value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverB value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverB value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverB value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverC value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverA value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverB value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverA value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverB value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=93.4 0000000009
dbname
rpname
cpu,type=idle,host=serverB value=93.4 0000000009
dbname
rpname
disk,type=sda,host=serverB value=423 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverB value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverB value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverB value=95.1 0000000012
204 changes: 204 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,210 @@ stream
testStreamerWithOutput(t, "TestStream_Window", script, 13*time.Second, er, nil, false)
}

func TestStream_Shift(t *testing.T) {

var script = `
var period = 5s
var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
var past = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
.shift(period)
var current = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_ShiftBatch(t *testing.T) {

var script = `
var period = 5s
var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
var past = data
.window()
.period(period)
.every(period)
.align()
.shift(period)
.mapReduce(influxql.count('value'))
var current = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_ShiftNegative(t *testing.T) {

var script = `
var period = 5s
var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
var past = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
var current = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
.shift(-period)
past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_ShiftBatchNegative(t *testing.T) {

var script = `
var period = 5s
var data = stream
.from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
var past = data
.window()
.period(period)
.every(period)
.align()
.mapReduce(influxql.count('value'))
var current = data
.window()
.period(period)
.every(period)
.align()
.shift(-period)
.mapReduce(influxql.count('value'))
past.join(current)
.as('past', 'current')
.eval(lambda: "current.count" - "past.count")
.keep()
.as('diff')
.httpOut('TestStream_Shift')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "current.count", "diff", "past.count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
5.0,
1.0,
4.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Shift", script, 15*time.Second, er, nil, false)
}

func TestStream_SimpleMR(t *testing.T) {

var script = `
Expand Down
7 changes: 7 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,3 +428,10 @@ func (n *chainnode) Derivative(field string) *DerivativeNode {
n.linkChild(s)
return s
}

// Create a new node that shifts the incoming points or batches in time.
func (n *chainnode) Shift(shift time.Duration) *ShiftNode {
s := newShiftNode(n.Provides(), shift)
n.linkChild(s)
return s
}
34 changes: 34 additions & 0 deletions pipeline/shift.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package pipeline

import (
"time"
)

// Shift points and batches in time, this is useful for comparing
// batches or points from different times.
//
// Example:
// stream
// .shift(5m)
//
// Shift all data points 5m forward in time.
//
// Example:
// stream
// .shift(-10s)
//
// Shift all data points 10s backward in time.
type ShiftNode struct {
chainnode

// Keep one point or batch every Duration
// tick:ignore
Shift time.Duration
}

func newShiftNode(wants EdgeType, shift time.Duration) *ShiftNode {
return &ShiftNode{
chainnode: newBasicChainNode("shift", wants, wants),
Shift: shift,
}
}
Loading

0 comments on commit 4e5a5e5

Please sign in to comment.