Skip to content

Commit

Permalink
Merged pull request influxdata#1414 from phemmer/fix-derivative-as-st…
Browse files Browse the repository at this point in the history
…ream

fix derivative preserving fields from wrong point in stream
  • Loading branch information
nathanielc committed Jun 1, 2017
2 parents c02ebc8 + 6c77c03 commit ad7cbfd
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

### Bugfixes

- [#1414](https://github.com/influxdata/kapacitor/pull/1414): Fix derivative node preserving fields from previous point in stream tasks.

## v1.3.0 [2017-05-22]

### Release Notes
Expand Down
4 changes: 2 additions & 2 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
mu.Unlock()
}
if emit {
fields := pr.Fields.Copy()
fields := p.Fields.Copy()
fields[d.d.As] = value
p.Fields = fields
d.timer.Pause()
Expand All @@ -78,7 +78,7 @@ func (d *DerivativeNode) runDerivative([]byte) error {
pr = p
}
if emit {
fields := pr.Fields.Copy()
fields := p.Fields.Copy()
fields[d.d.As] = value
b.Points[i].Fields = fields
} else {
Expand Down
53 changes: 52 additions & 1 deletion integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,58 @@ batch
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
1.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_Derivative", script, 21*time.Second, er, false)
}

func TestBatch_DerivativeAs(t *testing.T) {

var script = `
batch
|query('''
SELECT sum("value") as "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
|derivative('value')
.as('derivative')
|httpOut('TestBatch_Derivative')
`

er := models.Result{
Series: models.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "derivative", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
0.5,
1001.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
0.5,
1002.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
0.5,
1003.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
1.0,
1005.0,
},
},
},
Expand Down Expand Up @@ -144,7 +195,7 @@ batch
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
1.0,
2.0,
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion integrations/data/TestBatch_Derivative.0.brpl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value2":0},"time":"2015-10-18T00:00:05Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1004},"time":"2015-10-18T00:00:08Z"}]}
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value2":0},"time":"2015-10-18T00:00:05Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1005},"time":"2015-10-18T00:00:08Z"}]}
72 changes: 72 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,78 @@ stream
testStreamerWithOutput(t, "TestStream_Derivative", script, 15*time.Second, er, false, nil)
}

func TestStream_DerivativeAs(t *testing.T) {

var script = `
stream
|from().measurement('packets')
|derivative('value')
.as('derivative')
|window()
.period(10s)
.every(10s)
|httpOut('TestStream_Derivative')
`
er := models.Result{
Series: models.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "derivative", "value"},
Values: [][]interface{}{
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 1, 0, time.UTC),
1.0,
1001.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC),
1.0,
1003.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
1.0,
1004.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 5, 0, time.UTC),
2.0,
1006.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
1.0,
1007.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 7, 0, time.UTC),
0.0,
1007.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
1.0,
1008.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 9, 0, time.UTC),
1.0,
1009.0,
},
[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
1.0,
1010.0,
},
},
},
},
}

testStreamerWithOutput(t, "TestStream_Derivative", script, 15*time.Second, er, false, nil)
}

func TestStream_DerivativeZeroElapsed(t *testing.T) {

var script = `
Expand Down

0 comments on commit ad7cbfd

Please sign in to comment.