Skip to content

Commit

Permalink
Merge pull request influxdata#7293 from influxdata/js-7110-cannot-use…
Browse files Browse the repository at this point in the history
…-derivative-function

Skip past points at the same time in derivative call within a merged series
  • Loading branch information
jsternberg authored Sep 19, 2016
2 parents 796f35b + 0b94f5d commit f44dd4e
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
- [#7272](https://github.com/influxdata/influxdb/issues/7272): Report cmdline and memstats in /debug/vars.
- [#7299](https://github.com/influxdata/influxdb/ssues/7299): Ensure fieldsCreated stat available in shard measurement.
- [#6846](https://github.com/influxdata/influxdb/issues/6846): Read an invalid JSON response as an error in the influx client.
- [#7110](https://github.com/influxdata/influxdb/issues/7110): Skip past points at the same time in derivative call within a merged series.

## v1.0.0 [2016-09-07]

Expand Down
40 changes: 40 additions & 0 deletions influxql/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ func NewFloatDerivativeReducer(interval Interval, isNonNegative, ascending bool)

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDerivativeReducer) AggregateFloat(p *FloatPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -107,6 +114,9 @@ func (r *FloatDerivativeReducer) Emit() []FloatPoint {
}
value := diff / (float64(elapsed) / float64(r.interval.Duration))

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
Expand Down Expand Up @@ -138,6 +148,13 @@ func NewIntegerDerivativeReducer(interval Interval, isNonNegative, ascending boo

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDerivativeReducer) AggregateInteger(p *IntegerPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -154,6 +171,9 @@ func (r *IntegerDerivativeReducer) Emit() []FloatPoint {
}
value := diff / (float64(elapsed) / float64(r.interval.Duration))

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true

// Drop negative values for non-negative derivatives.
if r.isNonNegative && diff < 0 {
return nil
Expand All @@ -179,6 +199,13 @@ func NewFloatDifferenceReducer() *FloatDifferenceReducer {

// AggregateFloat aggregates a point into the reducer and updates the current window.
func (r *FloatDifferenceReducer) AggregateFloat(p *FloatPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -188,6 +215,9 @@ func (r *FloatDifferenceReducer) Emit() []FloatPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []FloatPoint{{Time: r.curr.Time, Value: value}}
}
return nil
Expand All @@ -209,6 +239,13 @@ func NewIntegerDifferenceReducer() *IntegerDifferenceReducer {

// AggregateInteger aggregates a point into the reducer and updates the current window.
func (r *IntegerDifferenceReducer) AggregateInteger(p *IntegerPoint) {
// Skip past a point when it does not advance the stream. A joined series
// may have multiple points at the same time so we will discard anything
// except the first point we encounter.
if !r.curr.Nil && r.curr.Time == p.Time {
return
}

r.prev = r.curr
r.curr = *p
}
Expand All @@ -218,6 +255,9 @@ func (r *IntegerDifferenceReducer) Emit() []IntegerPoint {
if !r.prev.Nil {
// Calculate the difference of successive points.
value := r.curr.Value - r.prev.Value

// Mark this point as read by changing the previous point to nil.
r.prev.Nil = true
return []IntegerPoint{{Time: r.curr.Time, Value: value}}
}
return nil
Expand Down
96 changes: 96 additions & 0 deletions influxql/select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2205,6 +2205,54 @@ func TestSelect_Derivative_Desc_Integer(t *testing.T) {
}
}

func TestSelect_Derivative_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Derivative_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT derivative(value, 1s) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -2.5}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Difference_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down Expand Up @@ -2257,6 +2305,54 @@ func TestSelect_Difference_Integer(t *testing.T) {
}
}

func TestSelect_Difference_Duplicate_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &FloatIterator{Points: []influxql.FloatPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Difference_Duplicate_Integer(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
return &IntegerIterator{Points: []influxql.IntegerPoint{
{Name: "cpu", Time: 0 * Second, Value: 20},
{Name: "cpu", Time: 0 * Second, Value: 19},
{Name: "cpu", Time: 4 * Second, Value: 10},
{Name: "cpu", Time: 4 * Second, Value: 3},
}}, nil
}

// Execute selection.
itrs, err := influxql.Select(MustParseSelectStatement(`SELECT difference(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil)
if err != nil {
t.Fatal(err)
} else if a, err := Iterators(itrs).ReadAll(); err != nil {
t.Fatalf("unexpected error: %s", err)
} else if !deep.Equal(a, [][]influxql.Point{
{&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: -10}},
}) {
t.Fatalf("unexpected points: %s", spew.Sdump(a))
}
}

func TestSelect_Elapsed_Float(t *testing.T) {
var ic IteratorCreator
ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {
Expand Down

0 comments on commit f44dd4e

Please sign in to comment.