diff --git a/CHANGELOG.md b/CHANGELOG.md index d738c910ca6..fea0632aafe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ - [#7268](https://github.com/influxdata/influxdb/pull/7268): More man pages for the other tools we package and compress man pages fully. - [#7305](https://github.com/influxdata/influxdb/pull/7305): UDP Client: Split large points. Thanks @vlasad - [#7115](https://github.com/influxdata/influxdb/issues/7115): Feature request: `influx inspect -export` should dump WAL files. +- [#7388](https://github.com/influxdata/influxdb/pull/7388): Implement cumulative_sum() function. ### Bugfixes diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index a0b52e7600e..cd92f963368 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -2421,6 +2421,226 @@ cpu value=35 1278010025000000000 } } +// Ensure the server can handle various group by time cumulative sum queries. +func TestServer_Query_SelectGroupByTimeCumulativeSum(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000 +cpu value=15 1278010021000000000 +cpu value=20 1278010022000000000 +cpu value=25 1278010023000000000 +`)}, + } + + test.addQueries([]*Query{ + &Query{ + name: "calculate cumulative sum of count", + command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",4]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of mean", + command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",35]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of median", + command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",12.5],["2010-07-01T18:47:02Z",35]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of mode", + command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of sum", + command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",25],["2010-07-01T18:47:02Z",70]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of first", + command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of last", + command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",40]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of min", + command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of max", + command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",40]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of percentile", + command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + +// Ensure the server can handle various group by time cumulative sum queries with fill. +func TestServer_Query_SelectGroupByTimeCumulativeSumWithFill(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: fmt.Sprintf(`cpu value=10 1278010020000000000 +cpu value=20 1278010021000000000 +`)}, + } + + test.addQueries([]*Query{ + &Query{ + name: "calculate cumulative sum of count with fill 0", + command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",2]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of count with fill previous", + command: `SELECT cumulative_sum(count(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",2],["2010-07-01T18:47:02Z",4]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of mean with fill 0", + command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",15]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of mean with fill previous", + command: `SELECT cumulative_sum(mean(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of median with fill 0", + command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",15]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of median with fill previous", + command: `SELECT cumulative_sum(median(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",15],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of mode with fill 0", + command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of mode with fill previous", + command: `SELECT cumulative_sum(mode(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of sum with fill 0", + command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",30]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of sum with fill previous", + command: `SELECT cumulative_sum(sum(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",30],["2010-07-01T18:47:02Z",60]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of first with fill 0", + command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of first with fill previous", + command: `SELECT cumulative_sum(first(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of last with fill 0", + command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of last with fill previous", + command: `SELECT cumulative_sum(last(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",40]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of min with fill 0", + command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of min with fill previous", + command: `SELECT cumulative_sum(min(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of max with fill 0", + command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of max with fill previous", + command: `SELECT cumulative_sum(max(value)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",20],["2010-07-01T18:47:02Z",40]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of percentile with fill 0", + command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(0)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",10]]}]}]}`, + }, + &Query{ + name: "calculate cumulative sum of percentile with fill previous", + command: `SELECT cumulative_sum(percentile(value, 50)) from db0.rp0.cpu where time >= '2010-07-01 18:47:00' and time <= '2010-07-01 18:47:03' group by time(2s) fill(previous)`, + exp: `{"results":[{"series":[{"name":"cpu","columns":["time","cumulative_sum"],"values":[["2010-07-01T18:47:00Z",10],["2010-07-01T18:47:02Z",20]]}]}]}`, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + func TestServer_Query_MathWithFill(t *testing.T) { t.Parallel() s := OpenServer(NewConfig()) diff --git a/influxql/ast.go b/influxql/ast.go index 50a6b8823e4..ff9336061dc 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -1643,7 +1643,7 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { for _, f := range s.Fields { for _, expr := range walkFunctionCalls(f.Expr) { switch expr.Name { - case "derivative", "non_negative_derivative", "difference", "moving_average", "elapsed": + case "derivative", "non_negative_derivative", "difference", "moving_average", "cumulative_sum", "elapsed": if err := s.validSelectWithAggregate(); err != nil { return err } @@ -1663,9 +1663,9 @@ func (s *SelectStatement) validateAggregates(tr targetRequirement) error { return errors.New("elapsed requires a duration argument") } } - case "difference": + case "difference", "cumulative_sum": if got := len(expr.Args); got != 1 { - return fmt.Errorf("invalid number of arguments for difference, expected 1, got %d", got) + return fmt.Errorf("invalid number of arguments for %s, expected 1, got %d", expr.Name, got) } case "moving_average": if got := len(expr.Args); got != 2 { diff --git a/influxql/call_iterator.go b/influxql/call_iterator.go index 0c5b05ec38e..a77af391e3c 100644 --- a/influxql/call_iterator.go +++ b/influxql/call_iterator.go @@ -1227,6 +1227,26 @@ func newMovingAverageIterator(input Iterator, n int, opt IteratorOptions) (Itera } } +// newCumulativeSumIterator returns an iterator for operating on a cumulative_sum() call. +func newCumulativeSumIterator(input Iterator, opt IteratorOptions) (Iterator, error) { + switch input := input.(type) { + case FloatIterator: + createFn := func() (FloatPointAggregator, FloatPointEmitter) { + fn := NewFloatCumulativeSumReducer() + return fn, fn + } + return newFloatStreamFloatIterator(input, createFn, opt), nil + case IntegerIterator: + createFn := func() (IntegerPointAggregator, IntegerPointEmitter) { + fn := NewIntegerCumulativeSumReducer() + return fn, fn + } + return newIntegerStreamIntegerIterator(input, createFn, opt), nil + default: + return nil, fmt.Errorf("unsupported cumulative sum iterator type: %T", input) + } +} + // newHoltWintersIterator returns an iterator for operating on a elapsed() call. func newHoltWintersIterator(input Iterator, opt IteratorOptions, h, m int, includeFitData bool, interval time.Duration) (Iterator, error) { switch input := input.(type) { diff --git a/influxql/functions.go b/influxql/functions.go index 3a6a268a766..f48a400ab23 100644 --- a/influxql/functions.go +++ b/influxql/functions.go @@ -357,6 +357,58 @@ func (r *IntegerMovingAverageReducer) Emit() []FloatPoint { } } +// FloatCumulativeSumReducer cumulates the values from each point. +type FloatCumulativeSumReducer struct { + curr FloatPoint +} + +// NewFloatCumulativeSumReducer creates a new FloatCumulativeSumReducer. +func NewFloatCumulativeSumReducer() *FloatCumulativeSumReducer { + return &FloatCumulativeSumReducer{ + curr: FloatPoint{Nil: true}, + } +} + +func (r *FloatCumulativeSumReducer) AggregateFloat(p *FloatPoint) { + r.curr.Value += p.Value + r.curr.Time = p.Time + r.curr.Nil = false +} + +func (r *FloatCumulativeSumReducer) Emit() []FloatPoint { + var pts []FloatPoint + if !r.curr.Nil { + pts = []FloatPoint{r.curr} + } + return pts +} + +// IntegerCumulativeSumReducer cumulates the values from each point. +type IntegerCumulativeSumReducer struct { + curr IntegerPoint +} + +// NewIntegerCumulativeSumReducer creates a new IntegerCumulativeSumReducer. +func NewIntegerCumulativeSumReducer() *IntegerCumulativeSumReducer { + return &IntegerCumulativeSumReducer{ + curr: IntegerPoint{Nil: true}, + } +} + +func (r *IntegerCumulativeSumReducer) AggregateInteger(p *IntegerPoint) { + r.curr.Value += p.Value + r.curr.Time = p.Time + r.curr.Nil = false +} + +func (r *IntegerCumulativeSumReducer) Emit() []IntegerPoint { + var pts []IntegerPoint + if !r.curr.Nil { + pts = []IntegerPoint{r.curr} + } + return pts +} + // FloatHoltWintersReducer forecasts a series into the future. // This is done using the Holt-Winters damped method. // 1. Using the series the initial values are calculated using a SSE. diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 252f6f5a8dc..63fe4f74bec 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -341,6 +341,67 @@ func TestParser_ParseStatement(t *testing.T) { }, }, }, + + // cumulative_sum + { + s: fmt.Sprintf(`SELECT cumulative_sum(field1) FROM myseries WHERE time > '%s'`, now.UTC().Format(time.RFC3339Nano)), + stmt: &influxql.SelectStatement{ + Fields: []*influxql.Field{ + { + Expr: &influxql.Call{ + Name: "cumulative_sum", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "field1"}, + }, + }, + }, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + Condition: &influxql.BinaryExpr{ + Op: influxql.GT, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.StringLiteral{Val: now.UTC().Format(time.RFC3339Nano)}, + }, + }, + }, + + { + s: fmt.Sprintf(`SELECT cumulative_sum(mean(field1)) FROM myseries WHERE time > '%s' GROUP BY time(1m)`, now.UTC().Format(time.RFC3339Nano)), + stmt: &influxql.SelectStatement{ + Fields: []*influxql.Field{ + { + Expr: &influxql.Call{ + Name: "cumulative_sum", + Args: []influxql.Expr{ + &influxql.Call{ + Name: "mean", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "field1"}, + }, + }, + }, + }, + }, + }, + Sources: []influxql.Source{&influxql.Measurement{Name: "myseries"}}, + Dimensions: []*influxql.Dimension{ + { + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{Val: time.Minute}, + }, + }, + }, + }, + Condition: &influxql.BinaryExpr{ + Op: influxql.GT, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.StringLiteral{Val: now.UTC().Format(time.RFC3339Nano)}, + }, + }, + }, + // holt_winters { s: fmt.Sprintf(`SELECT holt_winters(first(field1), 3, 1) FROM myseries WHERE time > '%s' GROUP BY time(1h);`, now.UTC().Format(time.RFC3339Nano)), @@ -2229,6 +2290,14 @@ func TestParser_ParseStatement(t *testing.T) { {s: `SELECT moving_average(max(), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`}, {s: `SELECT moving_average(percentile(value), 2) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`}, {s: `SELECT moving_average(mean(value), 2) FROM myseries where time < now() and time > now() - 1d`, err: `moving_average aggregate requires a GROUP BY interval`}, + {s: `SELECT cumulative_sum(), field1 FROM myseries`, err: `mixing aggregate and non-aggregate queries is not supported`}, + {s: `SELECT cumulative_sum() from myseries`, err: `invalid number of arguments for cumulative_sum, expected 1, got 0`}, + {s: `SELECT cumulative_sum(value) FROM myseries group by time(1h)`, err: `aggregate function required inside the call to cumulative_sum`}, + {s: `SELECT cumulative_sum(top(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for top, expected at least 2, got 1`}, + {s: `SELECT cumulative_sum(bottom(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for bottom, expected at least 2, got 1`}, + {s: `SELECT cumulative_sum(max()) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for max, expected 1, got 0`}, + {s: `SELECT cumulative_sum(percentile(value)) FROM myseries where time < now() and time > now() - 1d group by time(1h)`, err: `invalid number of arguments for percentile, expected 2, got 1`}, + {s: `SELECT cumulative_sum(mean(value)) FROM myseries where time < now() and time > now() - 1d`, err: `cumulative_sum aggregate requires a GROUP BY interval`}, {s: `SELECT holt_winters(value) FROM myseries where time < now() and time > now() - 1d`, err: `invalid number of arguments for holt_winters, expected 3, got 1`}, {s: `SELECT holt_winters(value, 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `must use aggregate function with holt_winters`}, {s: `SELECT holt_winters(min(value), 10, 2) FROM myseries where time < now() and time > now() - 1d`, err: `holt_winters aggregate requires a GROUP BY interval`}, diff --git a/influxql/select.go b/influxql/select.go index 219a6772756..b7675f29b12 100644 --- a/influxql/select.go +++ b/influxql/select.go @@ -315,6 +315,12 @@ func buildExprIterator(expr Expr, ic IteratorCreator, opt IteratorOptions, selec return newMovingAverageIterator(input, int(n.Val), opt) } panic(fmt.Sprintf("invalid series aggregate function: %s", expr.Name)) + case "cumulative_sum": + input, err := buildExprIterator(expr.Args[0], ic, opt, selector) + if err != nil { + return nil, err + } + return newCumulativeSumIterator(input, opt) default: itr, err := func() (Iterator, error) { switch expr.Name { diff --git a/influxql/select_test.go b/influxql/select_test.go index 529ddf9cfc4..2b9decf344b 100644 --- a/influxql/select_test.go +++ b/influxql/select_test.go @@ -2734,6 +2734,114 @@ func TestSelect_MovingAverage_Integer(t *testing.T) { } } +func TestSelect_CumulativeSum_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 30}}, + {&influxql.FloatPoint{Name: "cpu", Time: 8 * Second, Value: 49}}, + {&influxql.FloatPoint{Name: "cpu", Time: 12 * Second, Value: 52}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_CumulativeSum_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 8 * Second, Value: 19}, + {Name: "cpu", Time: 12 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: 30}}, + {&influxql.IntegerPoint{Name: "cpu", Time: 8 * Second, Value: 49}}, + {&influxql.IntegerPoint{Name: "cpu", Time: 12 * Second, Value: 52}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_CumulativeSum_Duplicate_Float(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &FloatIterator{Points: []influxql.FloatPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&influxql.FloatPoint{Name: "cpu", Time: 0 * Second, Value: 39}}, + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 49}}, + {&influxql.FloatPoint{Name: "cpu", Time: 4 * Second, Value: 52}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestSelect_CumulativeSum_Duplicate_Integer(t *testing.T) { + var ic IteratorCreator + ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) { + return &IntegerIterator{Points: []influxql.IntegerPoint{ + {Name: "cpu", Time: 0 * Second, Value: 20}, + {Name: "cpu", Time: 0 * Second, Value: 19}, + {Name: "cpu", Time: 4 * Second, Value: 10}, + {Name: "cpu", Time: 4 * Second, Value: 3}, + }}, nil + } + + // Execute selection. + itrs, err := influxql.Select(MustParseSelectStatement(`SELECT cumulative_sum(value) FROM cpu WHERE time >= '1970-01-01T00:00:00Z' AND time < '1970-01-01T00:00:16Z'`), &ic, nil) + if err != nil { + t.Fatal(err) + } else if a, err := Iterators(itrs).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.IntegerPoint{Name: "cpu", Time: 0 * Second, Value: 20}}, + {&influxql.IntegerPoint{Name: "cpu", Time: 0 * Second, Value: 39}}, + {&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: 49}}, + {&influxql.IntegerPoint{Name: "cpu", Time: 4 * Second, Value: 52}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + func TestSelect_HoltWinters_GroupBy_Agg(t *testing.T) { var ic IteratorCreator ic.CreateIteratorFn = func(opt influxql.IteratorOptions) (influxql.Iterator, error) {