Skip to content

Commit

Permalink
Set tmax if the batch is empty or not grouping by time (influxdata#927)
Browse files Browse the repository at this point in the history
* set tmax if the batch is empty or not grouping by time

* CHANGELOG.md

* add tests for query IsGroupedByTime
  • Loading branch information
Nathaniel Cook authored Sep 20, 2016
1 parent 3a2fd51 commit 9ea0f90
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
- [#889](https://github.com/influxdata/kapacitor/issues/889): Some typo in the default config file
- [#914](https://github.com/influxdata/kapacitor/pull/914): Change |log() output to be in JSON format so its self documenting structure.
- [#915](https://github.com/influxdata/kapacitor/pull/915): Fix issue with TMax and the Holt-Winters method.
- [#927](https://github.com/influxdata/kapacitor/pull/927): Fix bug with TMax and group by time.

## v1.0.0 [2016-09-02]

Expand Down
23 changes: 13 additions & 10 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,23 +238,23 @@ func (b *QueryNode) Queries(start, stop time.Time) ([]*Query, error) {
}
// Crons are sensitive to timezones.
// Make sure we are using local time.
start = start.Local()
current := start.Local()
queries := make([]*Query, 0)
for {
start = b.ticker.Next(start)
if start.IsZero() || start.After(stop) {
current = b.ticker.Next(current)
if current.IsZero() || current.After(stop) {
break
}
qstart := start.Add(-1 * b.b.Offset)
qstop := current.Add(-1 * b.b.Offset)
if qstop.After(now) {
break
}

q, err := b.query.Clone()
if err != nil {
return nil, err
}
q.SetStartTime(qstart)
qstop := qstart.Add(b.b.Period)
if qstop.After(now) {
break
}
q.SetStartTime(qstop.Add(-1 * b.b.Period))
q.SetStopTime(qstop)
queries = append(queries, q)
}
Expand Down Expand Up @@ -343,7 +343,10 @@ func (b *QueryNode) doQuery() error {
continue
}
for _, bch := range batches {
bch.TMax = stop
// Set stop time based off query bounds
if bch.TMax.IsZero() || !b.query.IsGroupedByTime() {
bch.TMax = stop
}
b.batchesQueried.Add(1)
b.pointsQueried.Add(int64(len(bch.Points)))
b.timer.Pause()
Expand Down
20 changes: 16 additions & 4 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ import (
)

type Query struct {
startTL *influxql.TimeLiteral
stopTL *influxql.TimeLiteral
stmt *influxql.SelectStatement
startTL *influxql.TimeLiteral
stopTL *influxql.TimeLiteral
stmt *influxql.SelectStatement
isGroupedByTime bool
}

func NewQuery(queryString string) (*Query, error) {
Expand Down Expand Up @@ -105,7 +106,8 @@ func (q *Query) SetStopTime(s time.Time) {
// Deep clone this query
func (q *Query) Clone() (*Query, error) {
n := &Query{
stmt: q.stmt.Clone(),
stmt: q.stmt.Clone(),
isGroupedByTime: q.isGroupedByTime,
}
// Find the start/stop time literals
var err error
Expand Down Expand Up @@ -185,6 +187,11 @@ func (q *Query) Dimensions(dims []interface{}) error {
Expr: &influxql.Wildcard{},
})
case TimeDimension:
if hasTime {
return fmt.Errorf("groupBy cannot have more than one time dimension")
}
// Add time dimension
hasTime = true
q.stmt.Dimensions = append(q.stmt.Dimensions,
&influxql.Dimension{
Expr: &influxql.Call{
Expand All @@ -205,9 +212,14 @@ func (q *Query) Dimensions(dims []interface{}) error {
}
}

q.isGroupedByTime = hasTime
return nil
}

func (q *Query) IsGroupedByTime() bool {
return q.isGroupedByTime
}

func (q *Query) Fill(option influxql.FillOption, value interface{}) {
q.stmt.Fill = option
q.stmt.FillValue = value
Expand Down
139 changes: 139 additions & 0 deletions query_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package kapacitor_test

import (
"fmt"
"testing"
"time"

"github.com/influxdata/kapacitor"
)

func TestQuery_Clone(t *testing.T) {
testCases := []string{
"SELECT usage FROM telegraf.autogen.cpu",
"SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA'",
"SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA' AND dc = 'slc'",
"SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA' AND dc = 'slc' OR product = 'login'",
"SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA' AND (dc = 'slc' OR product = 'login')",
}

equal := func(q0, q1 *kapacitor.Query) error {
if got, exp := q0.String(), q1.String(); got != exp {
return fmt.Errorf("unequal query string: got %s exp %s", got, exp)
}
if got, exp := q0.StartTime(), q1.StartTime(); got != exp {
return fmt.Errorf("unequal query start time: got %v exp %v", got, exp)
}
if got, exp := q0.StopTime(), q1.StopTime(); got != exp {
return fmt.Errorf("unequal query stop time: got %v exp %v", got, exp)
}
if got, exp := q0.IsGroupedByTime(), q1.IsGroupedByTime(); got != exp {
return fmt.Errorf("unequal query IsGroupedByTime: got %v exp %v", got, exp)
}
return nil
}
for _, query := range testCases {
q, err := kapacitor.NewQuery(query)
if err != nil {
t.Fatal(err)
}
clone, err := q.Clone()
if err != nil {
t.Fatal(err)
}
if err := equal(clone, q); err != nil {
t.Error(err)
}

// Modify original start time
start := time.Date(1975, 1, 1, 0, 0, 0, 0, time.UTC)
q.SetStartTime(start)

if err := equal(clone, q); err == nil {
t.Errorf("equal after modification: got %v", clone)
}

// Modify clone in the same way
clone.SetStartTime(start)
if err := equal(clone, q); err != nil {
t.Error(err)
}

// Re-clone
clone, err = q.Clone()
if err != nil {
t.Fatal(err)
}
if err := equal(clone, q); err != nil {
t.Error(err)
}

// Modify original stop time
stop := time.Date(1975, 1, 2, 0, 0, 0, 0, time.UTC)
q.SetStopTime(stop)

if err := equal(clone, q); err == nil {
t.Errorf("equal after modification: got %v", clone)
}

// Modify clone in the same way
clone.SetStopTime(stop)
if err := equal(clone, q); err != nil {
t.Error(err)
}

// Re-clone
clone, err = q.Clone()
if err != nil {
t.Fatal(err)
}
if err := equal(clone, q); err != nil {
t.Error(err)
}

// Set dimensions
q.Dimensions([]interface{}{time.Hour})
if err := equal(clone, q); err == nil {
t.Errorf("equal after modification: got %v", clone)
}
// Set dimesions on the clone in the same way
clone.Dimensions([]interface{}{time.Hour})
if err := equal(clone, q); err != nil {
t.Error(err)
}
// Re-clone
clone, err = q.Clone()
if err != nil {
t.Fatal(err)
}
if err := equal(clone, q); err != nil {
t.Error(err)
}
}
}
func TestQuery_IsGroupedByTime(t *testing.T) {
q, err := kapacitor.NewQuery("SELECT usage FROM telegraf.autogen.cpu")
if err != nil {
t.Fatal(err)
}

q.Dimensions([]interface{}{time.Hour})
if !q.IsGroupedByTime() {
t.Error("expected query to be grouped by time")
}

q, err = kapacitor.NewQuery("SELECT usage FROM telegraf.autogen.cpu")
if err != nil {
t.Fatal(err)
}

q.Dimensions([]interface{}{kapacitor.TimeDimension{Length: time.Hour, Offset: time.Minute}})
if !q.IsGroupedByTime() {
t.Error("expected query to be grouped by time")
}

q.Dimensions([]interface{}{"host"})
if q.IsGroupedByTime() {
t.Error("expected query to not be grouped by time")
}
}
4 changes: 2 additions & 2 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2403,11 +2403,11 @@ func TestServer_BatchTask(t *testing.T) {
Columns: []string{"time", "value"},
Values: [][]interface{}{
{
stopTime.Add(-time.Millisecond).Format(time.RFC3339Nano),
stopTime.Add(-2 * time.Millisecond).Format(time.RFC3339Nano),
1.0,
},
{
stopTime.Add(-2 * time.Millisecond).Format(time.RFC3339Nano),
stopTime.Add(-1 * time.Millisecond).Format(time.RFC3339Nano),
1.0,
},
},
Expand Down
4 changes: 3 additions & 1 deletion services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,9 @@ func (s *Service) startRecordBatch(t *kapacitor.Task, start, stop time.Time) ([]
}
for _, b := range batches {
// Set stop time based off query bounds
b.TMax = q.StopTime()
if b.TMax.IsZero() || !q.IsGroupedByTime() {
b.TMax = q.StopTime()
}
source <- b
}
}
Expand Down

0 comments on commit 9ea0f90

Please sign in to comment.