Skip to content

Commit

Permalink
fix derivative to add result to current point, not previous
Browse files Browse the repository at this point in the history
  • Loading branch information
phemmer committed Apr 21, 2017
1 parent 39c759d commit af7de68
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 64 deletions.
73 changes: 35 additions & 38 deletions derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,55 +42,48 @@ func (d *DerivativeNode) runDerivative([]byte) error {
for p, ok := d.ins[0].NextPoint(); ok; p, ok = d.ins[0].NextPoint() {
d.timer.Start()
mu.RLock()
pr, ok := previous[p.Group]
pr := previous[p.Group]
mu.RUnlock()
if !ok {

value, store, emit := d.derivative(pr.Fields, p.Fields, pr.Time, p.Time)
if store {
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
continue
}

value, ok := d.derivative(pr.Fields, p.Fields, pr.Time, p.Time)
if ok {
if emit {
fields := pr.Fields.Copy()
fields[d.d.As] = value
pr.Fields = fields
p.Fields = fields
d.timer.Pause()
for _, child := range d.outs {
err := child.CollectPoint(pr)
err := child.CollectPoint(p)
if err != nil {
return err
}
}
d.timer.Resume()
}
mu.Lock()
previous[p.Group] = p
mu.Unlock()
d.timer.Stop()
}
case pipeline.BatchEdge:
for b, ok := d.ins[0].NextBatch(); ok; b, ok = d.ins[0].NextBatch() {
d.timer.Start()
if len(b.Points) > 0 {
pr := b.Points[0]
var p models.BatchPoint
for i := 1; i < len(b.Points); i++ {
p = b.Points[i]
value, ok := d.derivative(pr.Fields, p.Fields, pr.Time, p.Time)
if ok {
fields := pr.Fields.Copy()
fields[d.d.As] = value
b.Points[i-1].Fields = fields
} else {
b.Points = append(b.Points[:i-1], b.Points[i:]...)
i--
}
var pr, p models.BatchPoint
for i := 0; i < len(b.Points); i++ {
p = b.Points[i]
value, store, emit := d.derivative(pr.Fields, p.Fields, pr.Time, p.Time)
if store {
pr = p
}
b.Points = b.Points[:len(b.Points)-1]
if emit {
fields := pr.Fields.Copy()
fields[d.d.As] = value
b.Points[i].Fields = fields
} else {
b.Points = append(b.Points[:i], b.Points[i+1:]...)
i--
}
}
d.timer.Stop()
for _, child := range d.outs {
Expand All @@ -104,35 +97,39 @@ func (d *DerivativeNode) runDerivative([]byte) error {
return nil
}

func (d *DerivativeNode) derivative(prev, curr models.Fields, prevTime, currTime time.Time) (float64, bool) {
f0, ok := numToFloat(prev[d.d.Field])
// derivative calculates the derivative between prev and cur.
// Return is the resulting derivative, whether the current point should be
// stored as previous, and whether the point result should be emitted.
func (d *DerivativeNode) derivative(prev, curr models.Fields, prevTime, currTime time.Time) (float64, bool, bool) {
f1, ok := numToFloat(curr[d.d.Field])
if !ok {
d.incrementErrorCount()
d.logger.Printf("E! cannot apply derivative to type %T", prev[d.d.Field])
return 0, false
d.logger.Printf("E! cannot apply derivative to type %T", curr[d.d.Field])
return 0, false, false
}

f1, ok := numToFloat(curr[d.d.Field])
f0, ok := numToFloat(prev[d.d.Field])
if !ok {
d.incrementErrorCount()
d.logger.Printf("E! cannot apply derivative to type %T", curr[d.d.Field])
return 0, false
// The only time this will fail to parse is if there is no previous.
// Because we only return `store=true` if current parses successfully, we will
// never get a previous which doesn't parse.
return 0, true, false
}

elapsed := float64(currTime.Sub(prevTime))
if elapsed == 0 {
d.incrementErrorCount()
d.logger.Printf("E! cannot perform derivative elapsed time was 0")
return 0, false
return 0, true, false
}
diff := f1 - f0
// Drop negative values for non-negative derivatives
if d.d.NonNegativeFlag && diff < 0 {
return 0, false
return 0, true, false
}

value := float64(diff) / (elapsed / float64(d.d.Unit))
return value, true
return value, true, true
}

func numToFloat(num interface{}) (float64, bool) {
Expand Down
30 changes: 15 additions & 15 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,6 @@ batch
Tags: nil,
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
0.5,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
0.5,
Expand All @@ -98,6 +94,10 @@ batch
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
0.5,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
0.5,
},
},
},
},
Expand Down Expand Up @@ -129,10 +129,6 @@ batch
Tags: nil,
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
1.0,
Expand All @@ -145,6 +141,10 @@ batch
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
1.0,
},
},
},
},
Expand Down Expand Up @@ -176,19 +176,19 @@ batch
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
0.5,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
0.5,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
-501.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
0.5,
},
},
Expand Down Expand Up @@ -223,15 +223,15 @@ batch
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
0.5,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
0.5,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
0.5,
},
},
Expand Down
2 changes: 1 addition & 1 deletion integrations/data/TestBatch_Derivative.0.brpl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1004},"time":"2015-10-18T00:00:08Z"}]}
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value2":0},"time":"2015-10-18T00:00:05Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1004},"time":"2015-10-18T00:00:08Z"}]}
2 changes: 1 addition & 1 deletion integrations/data/TestBatch_DerivativeNN.0.brpl
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value":0},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1},"time":"2015-10-18T00:00:08Z"}]}
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value2":0},"time":"2015-10-18T00:00:05Z"},{"fields":{"value":0},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1},"time":"2015-10-18T00:00:08Z"}]}
2 changes: 1 addition & 1 deletion integrations/data/TestStream_Derivative.srpl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ rpname
packets value=1001 0000000002
dbname
rpname
packets value=1002 0000000003
packets value2=0 0000000003
dbname
rpname
packets value=1003 0000000004
Expand Down
10 changes: 5 additions & 5 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ stream
Tags: nil,
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
1.0,
}},
},
Expand Down Expand Up @@ -123,7 +123,7 @@ stream
Tags: nil,
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
9.0,
}},
},
Expand Down Expand Up @@ -153,7 +153,7 @@ stream
Tags: nil,
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
10.0,
}},
},
Expand Down Expand Up @@ -183,7 +183,7 @@ stream
Tags: nil,
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
1.0,
}},
},
Expand Down Expand Up @@ -212,7 +212,7 @@ stream
Tags: nil,
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
time.Date(1971, 1, 1, 0, 0, 11, 0, time.UTC),
-99.7,
}},
},
Expand Down
6 changes: 3 additions & 3 deletions pipeline/derivative.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
// Computes the derivative via:
// (current - previous ) / ( time_difference / unit)
//
// For batch edges the derivative is computed for each
// point in the batch and because of boundary conditions
// the number of points is reduced by one.
// The derivative is computed for each point, and
// because of boundary conditions the first point is
// dropped.
type DerivativeNode struct {
chainnode

Expand Down

0 comments on commit af7de68

Please sign in to comment.