Skip to content

Commit

Permalink
Add Difference, Mode and MovingAverage functions (influxdata#834)
Browse files Browse the repository at this point in the history
* update vendor.sh

* git subrepo clone --force https://github.com/influxdata/influxdb.git vendor/github.com/influxdata/influxdb

subrepo:
  subdir:   "vendor/github.com/influxdata/influxdb"
  merged:   "a30f9b6"
upstream:
  origin:   "https://github.com/influxdata/influxdb.git"
  branch:   "master"
  commit:   "a30f9b6"
git-subrepo:
  version:  "0.3.0"
  origin:   "???"
  commit:   "???"

* git subrepo clone --force https://github.com/gogo/protobuf.git vendor/github.com/gogo/protobuf

subrepo:
  subdir:   "vendor/github.com/gogo/protobuf"
  merged:   "eef57b9"
upstream:
  origin:   "https://github.com/gogo/protobuf.git"
  branch:   "master"
  commit:   "eef57b9"
git-subrepo:
  version:  "0.3.0"
  origin:   "???"
  commit:   "???"

* add difference func, update code for new influxdb package

* add moving average

* vendor 1.0 of influxdb

* git subrepo clone --force --branch=1.0 https://github.com/influxdata/influxdb.git vendor/github.com/influxdata/influxdb

subrepo:
  subdir:   "vendor/github.com/influxdata/influxdb"
  merged:   "5130cd7"
upstream:
  origin:   "https://github.com/influxdata/influxdb.git"
  branch:   "1.0"
  commit:   "5130cd7"
git-subrepo:
  version:  "0.3.0"
  origin:   "???"
  commit:   "???"

* add mode function

* add test for batches

* CHANGELOG.md
  • Loading branch information
Nathaniel Cook authored Aug 25, 2016
1 parent 4604e64 commit b8be8ac
Show file tree
Hide file tree
Showing 609 changed files with 50,189 additions and 38,650 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

### Features

- [#827](https://github.com/influxdata/kapacitor/issues/827): Bring Kapacitor up to parody with available InfluxQL functions in 1.0

### Bugfixes

Expand Down
49 changes: 41 additions & 8 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
dimensions: b.PointDimensions(),
tags: b.Tags,
time: b.TMax,
pointTimes: n.n.PointTimes,
pointTimes: n.n.PointTimes || n.isStreamTransformation,
}
if len(b.Points) == 0 {
if !n.n.ReduceCreater.IsEmptyOK {
Expand All @@ -167,13 +167,46 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
}

context := createFn(c)
err = context.AggregateBatch(&b)
if err != nil {
n.logger.Println("E! failed to aggregate batch:", err)
}
err = n.emit(context)
if err != nil {
n.logger.Println("E! failed to emit batch:", err)
if n.isStreamTransformation {
// We have a stream transformation, so treat the batch as if it were a stream
// Create a new batch for emitting
eb := b
eb.Points = make([]models.BatchPoint, 0, len(b.Points))
for _, bp := range b.Points {
p := models.Point{
Name: b.Name,
Time: bp.Time,
Fields: bp.Fields,
Tags: bp.Tags,
}
if err := context.AggregatePoint(&p); err != nil {
n.logger.Println("E! failed to aggregate batch point:", err)
}
if ep, err := context.EmitPoint(); err != nil && err != ErrEmptyEmit {
n.logger.Println("E! failed to emit batch point:", err)
} else if err != ErrEmptyEmit {
eb.Points = append(eb.Points, models.BatchPoint{
Time: ep.Time,
Fields: ep.Fields,
Tags: ep.Tags,
})
}
}
// Emit the complete batch
n.timer.Pause()
for _, out := range n.outs {
if err := out.CollectBatch(eb); err != nil {
n.logger.Println("E! failed to emit batch points:", err)
}
}
n.timer.Resume()
} else {
if err := context.AggregateBatch(&b); err != nil {
n.logger.Println("E! failed to aggregate batch:", err)
}
if err := n.emit(context); err != nil {
n.logger.Println("E! failed to emit batch:", err)
}
}
n.timer.Stop()
}
Expand Down
103 changes: 103 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,18 @@ batch
Tags: nil,
Columns: []string{"time", "elapsed"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
2000.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
2000.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
2000.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
2000.0,
Expand All @@ -272,6 +284,97 @@ batch
testBatcherWithOutput(t, "TestBatch_Elapsed", script, 21*time.Second, er, false)
}

func TestBatch_Difference(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
|difference('value')
|log()
|httpOut('TestBatch_Difference')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "difference"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
5.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
3.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
1.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
-5.0,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_Difference", script, 21*time.Second, er, false)
}

func TestBatch_MovingAverage(t *testing.T) {

var script = `
batch
|query('''
SELECT "value"
FROM "telegraf"."default".packets
''')
.period(10s)
.every(10s)
|movingAverage('value', 2)
|httpOut('TestBatch_MovingAverage')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "packets",
Tags: nil,
Columns: []string{"time", "movingAverage"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
1002.5,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
1006.5,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
1008.5,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
1006.5,
},
},
},
},
}

testBatcherWithOutput(t, "TestBatch_MovingAverage", script, 21*time.Second, er, false)
}

func TestBatch_SimpleMR(t *testing.T) {

var script = `
Expand Down
25 changes: 25 additions & 0 deletions integrations/data/TestBatch_Difference.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name":"packets",
"points":[
{
"fields":{"value":1000},
"time":"2015-10-18T00:00:00Z"
},
{
"fields":{"value":1005},
"time":"2015-10-18T00:00:02Z"
},
{
"fields":{"value":1008},
"time":"2015-10-18T00:00:04Z"
},
{
"fields":{"value":1009},
"time":"2015-10-18T00:00:06Z"
},
{
"fields":{"value":1004},
"time":"2015-10-18T00:00:08Z"
}
]
}
26 changes: 25 additions & 1 deletion integrations/data/TestBatch_Elapsed.0.brpl
Original file line number Diff line number Diff line change
@@ -1 +1,25 @@
{"name":"packets","points":[{"fields":{"value":1000},"time":"2015-10-18T00:00:00Z"},{"fields":{"value":1001},"time":"2015-10-18T00:00:02Z"},{"fields":{"value":1002},"time":"2015-10-18T00:00:04Z"},{"fields":{"value":1003},"time":"2015-10-18T00:00:06Z"},{"fields":{"value":1004},"time":"2015-10-18T00:00:08Z"}]}
{
"name":"packets",
"points":[
{
"fields":{"value":1000},
"time":"2015-10-18T00:00:00Z"
},
{
"fields":{"value":1001},
"time":"2015-10-18T00:00:02Z"
},
{
"fields":{"value":1002},
"time":"2015-10-18T00:00:04Z"
},
{
"fields":{"value":1003},
"time":"2015-10-18T00:00:06Z"
},
{
"fields":{"value":1004},
"time":"2015-10-18T00:00:08Z"
}
]
}
25 changes: 25 additions & 0 deletions integrations/data/TestBatch_MovingAverage.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"name":"packets",
"points":[
{
"fields":{"value":1000},
"time":"2015-10-18T00:00:00Z"
},
{
"fields":{"value":1005},
"time":"2015-10-18T00:00:02Z"
},
{
"fields":{"value":1008},
"time":"2015-10-18T00:00:04Z"
},
{
"fields":{"value":1009},
"time":"2015-10-18T00:00:06Z"
},
{
"fields":{"value":1004},
"time":"2015-10-18T00:00:08Z"
}
]
}
18 changes: 18 additions & 0 deletions integrations/data/TestStream_Difference.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
dbname
rpname
packets value=1000 0000000001
dbname
rpname
packets value=1001 0000000002
dbname
rpname
packets value=1006 0000000003
dbname
rpname
packets value=1009 0000000010
dbname
rpname
packets value=1010 0000000011
dbname
rpname
packets value=1011 0000000012
45 changes: 45 additions & 0 deletions integrations/data/TestStream_MovingAverage.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
dbname
rpname
packets value=1000 0000000001
dbname
rpname
packets value=1001 0000000002
dbname
rpname
packets value=1002 0000000003
dbname
rpname
packets value=1003 0000000004
dbname
rpname
packets value=1014 0000000005
dbname
rpname
packets value=1015 0000000006
dbname
rpname
packets value=1016 0000000007
dbname
rpname
packets value=1017 0000000008
dbname
rpname
packets value=1018 0000000009
dbname
rpname
packets value=1019 0000000010
dbname
rpname
packets value=1020 0000000011
dbname
rpname
packets value=1021 0000000012
dbname
rpname
packets value=1022 0000000013
dbname
rpname
packets value=1023 0000000014
dbname
rpname
packets value=1024 0000000015
Loading

0 comments on commit b8be8ac

Please sign in to comment.