Skip to content

Commit

Permalink
Update BatchNode to allow for overlapping and offset windows
Browse files Browse the repository at this point in the history
as well as cron syntax for scheduling
  • Loading branch information
nathanielc committed Oct 28, 2015
1 parent 5d713b9 commit f6a0281
Show file tree
Hide file tree
Showing 9 changed files with 259 additions and 85 deletions.
141 changes: 135 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package kapacitor

import (
"encoding/json"
"errors"
"sync"
"time"

"github.com/gorhill/cronexpr"
"github.com/influxdb/influxdb/client"
"github.com/influxdb/kapacitor/models"
"github.com/influxdb/kapacitor/pipeline"
Expand All @@ -13,7 +16,8 @@ type BatchNode struct {
node
b *pipeline.BatchNode
query *Query
ticker *time.Ticker
ticker ticker
wg sync.WaitGroup
}

func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error) {
Expand All @@ -34,6 +38,22 @@ func newBatchNode(et *ExecutingTask, n *pipeline.BatchNode) (*BatchNode, error)
return nil, err
}

if n.Every != 0 && n.Cron != "" {
return nil, errors.New("must not set both 'every' and 'cron' properties")
}
switch {
case n.Every != 0:
bn.ticker = newTimeTicker(n.Every)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
if err != nil {
return nil, err
}
default:
return nil, errors.New("must define one of 'every' or 'cron'")
}

return bn, nil
}

Expand All @@ -43,16 +63,45 @@ func (b *BatchNode) DBRPs() ([]DBRP, error) {
return b.query.DBRPs()
}

func (b *BatchNode) Start(batch BatchCollector) {
b.wg.Add(1)
go b.doQuery(batch)
}

func (b *BatchNode) NextQueries(start time.Time, num int) []string {
now := time.Now()
// Crons are sensitive to timezones.
start = start.Local()
queries := make([]string, 0, num)
for i := 0; i < num || num == 0; i++ {
start = b.ticker.Next(start)
if start.IsZero() {
break
}
b.query.Start(start)
stop := start.Add(b.b.Period)
if stop.After(now) {
break
}
b.query.Stop(stop)
queries = append(queries, b.query.String())

}
return queries
}

// Query InfluxDB return Edge with data
func (b *BatchNode) Query(batch BatchCollector) {
func (b *BatchNode) doQuery(batch BatchCollector) {
defer batch.Close()
defer b.wg.Done()

b.ticker = time.NewTicker(b.b.Period)
for now := range b.ticker.C {
tickC := b.ticker.Start()
for now := range tickC {

// Update times for query
b.query.Start(now.Add(-1 * b.b.Period))
b.query.Stop(now)
stop := now.Add(-1 * b.b.Offset)
b.query.Start(stop.Add(-1 * b.b.Period))
b.query.Stop(stop)

b.logger.Println("D! starting next batch query:", b.query.String())

Expand Down Expand Up @@ -143,6 +192,7 @@ func (b *BatchNode) stopBatch() {
if b.ticker != nil {
b.ticker.Stop()
}
b.wg.Wait()
}

func (b *BatchNode) runBatch() error {
Expand All @@ -156,3 +206,82 @@ func (b *BatchNode) runBatch() error {
}
return nil
}

type ticker interface {
Start() <-chan time.Time
Stop()
// Return the next time the ticker will tick
// after now.
Next(now time.Time) time.Time
}

type timeTicker struct {
every time.Duration
ticker *time.Ticker
}

func newTimeTicker(every time.Duration) *timeTicker {
return &timeTicker{every: every}
}

func (t *timeTicker) Start() <-chan time.Time {
t.ticker = time.NewTicker(t.every)
return t.ticker.C
}

func (t *timeTicker) Stop() {
if t.ticker != nil {
t.ticker.Stop()
}
}

func (t *timeTicker) Next(now time.Time) time.Time {
return now.Add(t.every)
}

type cronTicker struct {
expr *cronexpr.Expression
ticker chan time.Time
closing chan struct{}
wg sync.WaitGroup
}

func newCronTicker(cronExpr string) (*cronTicker, error) {
expr, err := cronexpr.Parse(cronExpr)
if err != nil {
return nil, err
}
return &cronTicker{
expr: expr,
ticker: make(chan time.Time),
closing: make(chan struct{}),
}, nil
}

func (c *cronTicker) Start() <-chan time.Time {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
now := time.Now()
next := c.expr.Next(now)
diff := next.Sub(now)
select {
case <-time.After(diff):
c.ticker <- next
case <-c.closing:
return
}
}
}()
return c.ticker
}

func (c *cronTicker) Stop() {
close(c.closing)
c.wg.Wait()
}

func (c *cronTicker) Next(now time.Time) time.Time {
return c.expr.Next(now)
}
21 changes: 16 additions & 5 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,9 @@ var (
recordFlags = flag.NewFlagSet("record", flag.ExitOnError)
rname = recordFlags.String("name", "", "the name of a task. If recording a batch or stream")

rstop = recordFlags.String("stop", "", "the stop time of a query if recording a batch. Defaults to now.")
rnum = recordFlags.Int("num", 1, "the number of periods to query. If recording a batch")
rstart = recordFlags.String("start", "", "the start time for the set of queries when recording a batch.")
rpast = recordFlags.Duration("past", 0, "set start time via 'now - past'.")
rnum = recordFlags.Int("num", 0, "the number of periods to query, if zero will query as many times as the schedule defines until the queries reach the present time. Applies only to recording a batch.")

rquery = recordFlags.String("query", "", "the query to record. If recording a query.")
rtype = recordFlags.String("type", "", "the type of the recording to save (stream|batch). If recording a query.")
Expand All @@ -219,10 +220,16 @@ Examples:
This records the live data stream for 1 minute using the databases and retention policies
from the named task.
$ kapacitor record batch -name cpu_idle -stop 2015-09-01T00:00:00Z -num 10
$ kapacitor record batch -name cpu_idle -start 2015-09-01T00:00:00Z -num 10
This records the result of the query defined in task 'cpu_idle' and runs the query 10 times
starting at time 'stop - num*period' and incrementing by the period defined in the task.
starting at time 'start' and incrementing by the schedule defined in the task.
$ kapacitor record batch -name cpu_idle -past 10h
This records the result of the query defined in task 'cpu_idle' and runs the query
as many times as defined by the schedule until the queries reaches the present time.
The starting time for the queries is 'now - 10h' and increments by the schedule defined in the task.
$ kapacitor record query -query "select value from cpu_idle where time > now() - 1h and time < now()" -type stream
Expand All @@ -243,8 +250,12 @@ func doRecord(args []string) error {
v.Add("name", *rname)
v.Add("duration", rdur.String())
case "batch":
if *rstart != "" && *rpast != 0 {
return errors.New("cannot set both start and past flags.")
}
v.Add("name", *rname)
v.Add("stop", *rstop)
v.Add("start", *rstart)
v.Add("past", (*rpast).String())
v.Add("num", strconv.FormatInt(int64(*rnum), 10))
case "query":
v.Add("qtype", *rtype)
Expand Down
1 change: 1 addition & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ batch
WHERE "host" = 'serverA'
''')
.period(10s)
.every(10s)
.groupBy(time(2s), 'cpu')
.mapReduce(influxql.count('value'))
.window()
Expand Down
35 changes: 29 additions & 6 deletions pipeline/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,47 @@ import (
// FROM "telegraf"."default".cpu_usage_idle
// WHERE "host" = 'serverA'
// ''')
// .period(10s)
// .groupBy(time(2s), 'cpu')
// .period(1m)
// .every(20s)
// .groupBy(time(10s), 'cpu')
// ...
//
// In the above example InfluxDB is queried every 10 seconds and the results
// are then processed by the rest of the pipeline.
// In the above example InfluxDB is queried every 20 seconds; the window of time returned
// spans 1 minute and is grouped into 10 second buckets.
type BatchNode struct {
chainnode
// The query to execute. Must not contain a time condition
// in the `WHERE` clause or contain a `GROUP BY` clause.
// The time conditions are added dynamically according to the period.
// The time conditions are added dynamically according to the period, offset and schedule.
// The `GROUP BY` clause is added dynamically according to the dimensions
// passed to the `groupBy` method.
Query string

// The period at which Kapacitor should query InfluxDB.
// The period or length of time that will be queried from InfluxDB
Period time.Duration

// How often to query InfluxDB.
//
// The Every property is mutually exclusive with the Cron property.
Every time.Duration

// Define a schedule using a cron syntax.
//
// The specific cron implementation is documented here:
// https://github.com/gorhill/cronexpr#implementation
//
// The Cron property is mutually exclusive with the Every property.
Cron string

// How far back in time to query from the current time
//
// For example an Offest of 2 hours and an Every of 5m,
// Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago.
//
// This applies to Cron schedules as well. If the cron specifies to run every Sunday at
// 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried.
Offset time.Duration

// The list of dimensions for the group-by clause.
//tick:ignore
Dimensions []interface{}
Expand Down
3 changes: 0 additions & 3 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ func (q *Query) Dimensions(dims []interface{}) error {
}
}

if !hasTime {
return fmt.Errorf("groupBy must have a time dimension.")
}
return nil
}

Expand Down
15 changes: 11 additions & 4 deletions replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,10 @@ func (r *replaySource) ReplayBatch(batch BatchCollector) {
series.Tags,
),
Tags: series.Tags,
Points: make([]models.TimeFields, len(series.Values)),
Points: make([]models.TimeFields, 0, len(series.Values)),
}
for i, v := range series.Values {
for _, v := range series.Values {
var skip bool
tf := models.TimeFields{}
tf.Fields = make(models.Fields, len(series.Columns))
for i, c := range series.Columns {
Expand All @@ -169,11 +170,17 @@ func (r *replaySource) ReplayBatch(batch BatchCollector) {
}
tf.Time = st.Add(diff).UTC()
} else {
if v[i] == nil {
skip = true
break
}
tf.Fields[c] = v[i]
}
}
lastTime = tf.Time
b.Points[i] = tf
if !skip {
lastTime = tf.Time
b.Points = append(b.Points, tf)
}
}
r.clck.Until(lastTime)
batch.CollectBatch(b)
Expand Down
Loading

0 comments on commit f6a0281

Please sign in to comment.