Skip to content

Commit

Permalink
support counting and summing empty batches (influxdata#595)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook committed Jun 2, 2016
1 parent 6379768 commit ccf6922
Show file tree
Hide file tree
Showing 50 changed files with 1,166 additions and 794 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ In order to know if subscription writes are being dropped you should monitor the
- [#558](https://github.com/influxdata/kapacitor/pull/558): Preserve fields as well as tags on selector InfluxQL functions.
- [#259](https://github.com/influxdata/kapacitor/issues/259): Template Tasks have been added.
- [#562](https://github.com/influxdata/kapacitor/pull/562): HTTP based subscriptions.
- [#595](https://github.com/influxdata/kapacitor/pull/595): Support counting and summing empty batches to 0.


### Bugfixes
Expand Down
1 change: 1 addition & 0 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ func (b *QueryNode) doQuery() error {
continue
}
for _, bch := range batches {
bch.TMax = stop
b.batchesQueried.Add(1)
b.pointsQueried.Add(int64(len(bch.Points)))
b.timer.Pause()
Expand Down
139 changes: 110 additions & 29 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1584,29 +1584,66 @@ func TestServer_BatchTask(t *testing.T) {
c := NewConfig()
c.InfluxDB[0].Enabled = true
count := 0
stopTimeC := make(chan time.Time, 1)

db := NewInfluxDB(func(q string) *iclient.Response {
if len(q) > 6 && q[:6] == "SELECT" {
count++
stmt, err := influxql.ParseStatement(q)
if err != nil {
return &iclient.Response{Err: err.Error()}
}
slct, ok := stmt.(*influxql.SelectStatement)
if !ok {
return nil
}
cond, ok := slct.Condition.(*influxql.BinaryExpr)
if !ok {
return &iclient.Response{Err: "expected select condition to be binary expression"}
}
stopTimeExpr, ok := cond.RHS.(*influxql.BinaryExpr)
if !ok {
return &iclient.Response{Err: "expected select condition rhs to be binary expression"}
}
stopTL, ok := stopTimeExpr.RHS.(*influxql.StringLiteral)
if !ok {
return &iclient.Response{Err: "expected select condition rhs to be string literal"}
}
count++
switch count {
case 1:
stopTime, err := time.Parse(time.RFC3339Nano, stopTL.Val)
if err != nil {
return &iclient.Response{Err: err.Error()}
}
stopTimeC <- stopTime
return &iclient.Response{
Results: []iclient.Result{{
Series: []models.Row{{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 1, int(time.Millisecond), time.UTC).Format(time.RFC3339Nano),
stopTime.Add(-time.Millisecond).Format(time.RFC3339Nano),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 1, 2*int(time.Millisecond), time.UTC).Format(time.RFC3339Nano),
stopTime.Add(-2 * time.Millisecond).Format(time.RFC3339Nano),
1.0,
},
},
}},
}},
}
default:
return &iclient.Response{
Results: []iclient.Result{{
Series: []models.Row{{
Name: "cpu",
Columns: []string{"time", "value"},
Values: [][]interface{}{},
}},
}},
}
}
return nil
})
c.InfluxDB[0].URLs = []string{db.URL()}
s := OpenServer(c)
Expand All @@ -1620,10 +1657,12 @@ func TestServer_BatchTask(t *testing.T) {
RetentionPolicy: "myrp",
}}
tick := `batch
|query(' SELECT value from mydb.myrp.cpu ')
|query('SELECT value from mydb.myrp.cpu')
.period(5ms)
.every(5ms)
.align()
|count('value')
|where(lambda: "count" == 2)
|httpOut('count')
`

Expand All @@ -1647,20 +1686,23 @@ func TestServer_BatchTask(t *testing.T) {

endpoint := fmt.Sprintf("%s/tasks/%s/count", s.URL(), id)

exp := `{"series":[{"name":"cpu","columns":["time","count"],"values":[["1971-01-01T00:00:01.002Z",2]]}]}`
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
}
err = cli.UpdateTask(task.Link, client.UpdateTaskOptions{
Status: client.Disabled,
})
if err != nil {
t.Fatal(err)
}

if count == 0 {
t.Error("unexpected query count", count)
timeout := time.NewTicker(100 * time.Millisecond)
defer timeout.Stop()
select {
case <-timeout.C:
t.Fatal("timedout waiting for query")
case stopTime := <-stopTimeC:
exp := fmt.Sprintf(`{"series":[{"name":"cpu","columns":["time","count"],"values":[["%s",2]]}]}`, stopTime.Local().Format(time.RFC3339Nano))
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*5)
if err != nil {
t.Error(err)
}
err = cli.UpdateTask(task.Link, client.UpdateTaskOptions{
Status: client.Disabled,
})
if err != nil {
t.Fatal(err)
}
}
}

Expand Down Expand Up @@ -3252,9 +3294,36 @@ func TestServer_UDFBatchAgents(t *testing.T) {

func testBatchAgent(t *testing.T, c *run.Config) {
count := 0
stopTimeC := make(chan time.Time, 2)
db := NewInfluxDB(func(q string) *iclient.Response {
if len(q) > 6 && q[:6] == "SELECT" {
count++
stmt, err := influxql.ParseStatement(q)
if err != nil {
return &iclient.Response{Err: err.Error()}
}
slct, ok := stmt.(*influxql.SelectStatement)
if !ok {
return nil
}
cond, ok := slct.Condition.(*influxql.BinaryExpr)
if !ok {
return &iclient.Response{Err: "expected select condition to be binary expression"}
}
stopTimeExpr, ok := cond.RHS.(*influxql.BinaryExpr)
if !ok {
return &iclient.Response{Err: "expected select condition rhs to be binary expression"}
}
stopTL, ok := stopTimeExpr.RHS.(*influxql.StringLiteral)
if !ok {
return &iclient.Response{Err: "expected select condition rhs to be string literal"}
}
count++
switch count {
case 1, 2:
stopTime, err := time.Parse(time.RFC3339Nano, stopTL.Val)
if err != nil {
return &iclient.Response{Err: err.Error()}
}
stopTimeC <- stopTime
data := []float64{
5,
6,
Expand Down Expand Up @@ -3289,7 +3358,7 @@ func testBatchAgent(t *testing.T, c *run.Config) {
values := make([][]interface{}, len(data))
for i, value := range data {
values[i] = []interface{}{
time.Date(1971, 1, 1, 0, 0, 0, (i+1)*int(time.Millisecond), time.UTC).Format(time.RFC3339Nano),
stopTime.Add(time.Duration(i-len(data)) * time.Millisecond).Format(time.RFC3339Nano),
value,
}
}
Expand All @@ -3306,8 +3375,9 @@ func testBatchAgent(t *testing.T, c *run.Config) {
}},
}},
}
default:
return nil
}
return nil
})
c.InfluxDB[0].URLs = []string{db.URL()}
c.InfluxDB[0].Enabled = true
Expand Down Expand Up @@ -3355,8 +3425,23 @@ func testBatchAgent(t *testing.T, c *run.Config) {
t.Fatal(err)
}

stopTimes := make([]time.Time, 2)
for i := range stopTimes {
timeout := time.NewTicker(100 * time.Millisecond)
defer timeout.Stop()
select {
case <-timeout.C:
t.Fatal("timedout waiting for query")
case stopTime := <-stopTimeC:
stopTimes[i] = stopTime
}
}
endpoint := fmt.Sprintf("%s/tasks/%s/count", s.URL(), id)
exp := `{"series":[{"name":"cpu","tags":{"count":"1"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]},{"name":"cpu","tags":{"count":"0"},"columns":["time","count"],"values":[["1971-01-01T00:00:00.02Z",5]]}]}`
exp := fmt.Sprintf(
`{"series":[{"name":"cpu","tags":{"count":"1"},"columns":["time","count"],"values":[["%s",5]]},{"name":"cpu","tags":{"count":"0"},"columns":["time","count"],"values":[["%s",5]]}]}`,
stopTimes[0].Format(time.RFC3339Nano),
stopTimes[1].Format(time.RFC3339Nano),
)
err = s.HTTPGetRetry(endpoint, exp, 100, time.Millisecond*50)
if err != nil {
t.Error(err)
Expand All @@ -3367,10 +3452,6 @@ func testBatchAgent(t *testing.T, c *run.Config) {
if err != nil {
t.Fatal(err)
}

if count == 0 {
t.Error("unexpected query count", count)
}
}

func TestServer_CreateTask_Defaults(t *testing.T) {
Expand Down
24 changes: 16 additions & 8 deletions influxql.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package kapacitor

import (
"errors"
"fmt"
"log"
"time"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/pkg/errors"
)

// tmpl -- go get github.com/benbjohnson/tmpl
Expand Down Expand Up @@ -132,12 +132,8 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
}

func (n *InfluxQLNode) runBatchInfluxQL() error {
var exampleValue interface{}
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
// Skip empty batches
if len(b.Points) == 0 {
continue
}

// Create new base context
c := baseReduceContext{
as: n.n.As,
Expand All @@ -149,7 +145,19 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
time: b.TMax,
pointTimes: n.n.PointTimes,
}
createFn, err := n.getCreateFn(b.Points[0].Fields[c.field])
if len(b.Points) == 0 {
if !n.n.ReduceCreater.IsEmptyOK {
// If the reduce does not handle empty batches continue
continue
}
if exampleValue == nil {
// If we have no points and have never seen a point assume float64
exampleValue = float64(0)
}
} else {
exampleValue = b.Points[0].Fields[c.field]
}
createFn, err := n.getCreateFn(exampleValue)
if err != nil {
return err
}
Expand All @@ -173,7 +181,7 @@ func (n *InfluxQLNode) getCreateFn(value interface{}) (createReduceContextFunc,
}
createFn, err := determineReduceContextCreateFn(n.n.Method, value, n.n.ReduceCreater)
if err != nil {
return nil, err
return nil, errors.Wrapf(err, "invalid influxql func %s with field %s", n.n.Method, n.n.Field)
}
n.createFn = createFn
return n.createFn, nil
Expand Down
Loading

0 comments on commit ccf6922

Please sign in to comment.