Skip to content

Commit

Permalink
add DefaultNode
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed May 5, 2016
1 parent da7c428 commit c087369
Show file tree
Hide file tree
Showing 9 changed files with 304 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#461](https://github.com/influxdata/kapacitor/pull/461): Make Alerta `event` property configurable.
- [#491](https://github.com/influxdata/kapacitor/pull/491): BREAKING: Rewriting stateful expression in order to improve performance, the only breaking change is: short circuit evaluation for booleans - for example: ``lambda: "bool_value" && (count() > 100)`` if "bool_value" is false, we won't evaluate "count".
- [#504](https://github.com/influxdata/kapacitor/pull/504): BREAKING: Many changes to the API and underlying storage system. This release requires a special upgrade process.
- [#511](https://github.com/influxdata/kapacitor/pull/511): Adds DefaultNode for providing default values for missing fields or tags.

### Bugfixes

Expand Down
81 changes: 81 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package kapacitor

import (
"log"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
)

type DefaultNode struct {
node
d *pipeline.DefaultNode
}

// Create a new DefaultNode which applies a transformation func to each point in a stream and returns a single point.
func newDefaultNode(et *ExecutingTask, n *pipeline.DefaultNode, l *log.Logger) (*DefaultNode, error) {
dn := &DefaultNode{
node: node{Node: n, et: et, logger: l},
d: n,
}
dn.node.runF = dn.runDefault
return dn, nil
}

func (e *DefaultNode) runDefault(snapshot []byte) error {
switch e.Provides() {
case pipeline.StreamEdge:
for p, ok := e.ins[0].NextPoint(); ok; p, ok = e.ins[0].NextPoint() {
e.timer.Start()
p.Fields, p.Tags = e.setDefaults(p.Fields, p.Tags)
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectPoint(p)
if err != nil {
return err
}
}
}
case pipeline.BatchEdge:
for b, ok := e.ins[0].NextBatch(); ok; b, ok = e.ins[0].NextBatch() {
e.timer.Start()
for i := range b.Points {
b.Points[i].Fields, b.Points[i].Tags = e.setDefaults(b.Points[i].Fields, b.Points[i].Tags)
}
e.timer.Stop()
for _, child := range e.outs {
err := child.CollectBatch(b)
if err != nil {
return err
}
}
}
}
return nil
}

func (d *DefaultNode) setDefaults(fields models.Fields, tags models.Tags) (models.Fields, models.Tags) {
newFields := fields
fieldsCopied := false
for field, value := range d.d.Fields {
if _, ok := fields[field]; !ok {
if !fieldsCopied {
newFields = newFields.Copy()
fieldsCopied = true
}
newFields[field] = value
}
}
newTags := tags
tagsCopied := false
for tag, value := range d.d.Tags {
if _, ok := tags[tag]; !ok {
if !tagsCopied {
newTags = newTags.Copy()
tagsCopied = true
}
newTags[tag] = value
}
}
return newFields, newTags
}
37 changes: 37 additions & 0 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,43 @@ batch
testBatcherWithOutput(t, "TestBatch_SimpleMR", script, 30*time.Second, er)
}

func TestBatch_Default(t *testing.T) {

var script = `
batch
|query('''
SELECT mean("value")
FROM "telegraf"."default".cpu_usage_idle
WHERE "host" = 'serverA' AND "cpu" = 'cpu-total'
''')
.period(10s)
.every(10s)
.groupBy(time(2s))
|default()
.field('mean', 90.0)
.tag('dc', 'sfc')
|groupBy('dc')
|sum('mean')
|httpOut('TestBatch_Default')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu_usage_idle",
Tags: map[string]string{"dc": "sfc"},
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
444.0,
}},
},
},
}

testBatcherWithOutput(t, "TestBatch_Default", script, 30*time.Second, er)
}

func TestBatch_DoubleGroupBy(t *testing.T) {

var script = `
Expand Down
2 changes: 2 additions & 0 deletions integrations/data/TestBatch_Default.0.brpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{},"time":"2015-10-30T17:14:12Z"},{"fields":{"mean":86},"time":"2015-10-30T17:14:14Z"},{"fields":{"mean":91},"time":"2015-10-30T17:14:16Z"},{"fields":{"mean":87},"time":"2015-10-30T17:14:18Z"},{"fields":{},"time":"2015-10-30T17:14:20Z"}]}
{"name":"cpu_usage_idle","tags":{"cpu":"cpu-total"},"points":[{"fields":{},"time":"2015-10-30T17:14:22Z"},{"fields":{"mean":86},"time":"2015-10-30T17:14:24Z"},{"fields":{"mean":91},"time":"2015-10-30T17:14:26Z"},{"fields":{"mean":85},"time":"2015-10-30T17:14:28Z"},{"fields":{"mean":89},"time":"2015-10-30T17:14:30Z"}]}
81 changes: 81 additions & 0 deletions integrations/data/TestStream_Default.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
dbname
rpname
cpu,type=idle,host=serverA value=9 0000000001
dbname
rpname
cpu,type=idle,host=serverB value=9 0000000001
dbname
rpname
disk,type=sda,host=serverB value=3 0000000001
dbname
rpname
cpu,type=idle,host=serverA value=4 0000000002
dbname
rpname
cpu,type=idle,host=serverB value=7 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=2 0000000003
dbname
rpname
cpu,type=idle,host=serverB value=1 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000004
dbname
rpname
cpu,type=idle,host=serverB value=4 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=6 0000000005
dbname
rpname
cpu,type=idle value=2 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=8 0000000006
dbname
rpname
cpu,type=idle,host=serverB value=8 0000000006
dbname
rpname
cpu,type=idle,host=serverC value=4 0000000006
dbname
rpname
cpu,type=idle,host=serverA anothervalue=9 0000000007
dbname
rpname
cpu,type=idle,host=serverB value=7 0000000007
dbname
rpname
cpu,type=idle anothervalue=3 0000000008
dbname
rpname
cpu,type=idle,host=serverB value=6 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=9 0000000009
dbname
rpname
cpu,type=idle,host=serverB value=9 0000000009
dbname
rpname
disk,type=sda,host=serverB value=4 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=7 0000000010
dbname
rpname
cpu,type=idle,host=serverB value=3 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=2 0000000011
dbname
rpname
cpu,type=idle,host=serverB value=1 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=7 0000000012
dbname
rpname
cpu,type=idle,host=serverB value=9 0000000012
32 changes: 32 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,38 @@ stream
testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_Default(t *testing.T) {
var script = `
stream
|from()
.measurement('cpu')
|default()
.field('value', 1.0)
.tag('host', 'serverA')
|where(lambda: "host" == 'serverA')
|window()
.period(10s)
.every(10s)
|sum('value')
|httpOut('TestStream_Default')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: nil,
Columns: []string{"time", "sum"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
57.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_Default", script, 15*time.Second, er, nil, false)
}

func TestStream_AllMeasurements(t *testing.T) {

var script = `
Expand Down
61 changes: 61 additions & 0 deletions pipeline/default.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package pipeline

import "fmt"

// Defaults fields and tags on data points.
//
// Example:
// stream
// |default()
// .field('value', 0.0)
// .tag('host', '')
//
// The above example will set the field `value` to float64(0) if it does not already exist
// It will also set the tag `host` to string("") if it does not already exist.
type DefaultNode struct {
chainnode

// Set of fields to default
// tick:ignore
Fields map[string]interface{} `tick:"Field"`

// Set of tags to default
Tags map[string]string `tick:"Tag"`
}

func newDefaultNode(e EdgeType) *DefaultNode {
n := &DefaultNode{
chainnode: newBasicChainNode("default", e, e),
Fields: make(map[string]interface{}),
Tags: make(map[string]string),
}
return n
}

// Define a field default.
// tick:property
func (n *DefaultNode) Field(name string, value interface{}) *DefaultNode {
n.Fields[name] = value
return n
}

// Define a tag default.
// tick:property
func (n *DefaultNode) Tag(name string, value string) *DefaultNode {
n.Tags[name] = value
return n
}

func (n *DefaultNode) validate() error {
for field, value := range n.Fields {
switch value.(type) {
case float64:
case int64:
case bool:
case string:
default:
return fmt.Errorf("unsupported type %T for field %q, field default values must be float,int,string or bool", value, field)
}
}
return nil
}
7 changes: 7 additions & 0 deletions pipeline/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,10 @@ func (n *chainnode) Log() *LogNode {
n.linkChild(s)
return s
}

// Create a node that can set defaults for missing tags or fields.
func (n *chainnode) Default() *DefaultNode {
s := newDefaultNode(n.Provides())
n.linkChild(s)
return s
}
2 changes: 2 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (n Node, err
n, err = newInfluxQLNode(et, t, l)
case *pipeline.LogNode:
n, err = newLogNode(et, t, l)
case *pipeline.DefaultNode:
n, err = newDefaultNode(et, t, l)
default:
return nil, fmt.Errorf("unknown pipeline node type %T", p)
}
Expand Down

0 comments on commit c087369

Please sign in to comment.