Skip to content

Commit

Permalink
add chain methods back on stream node
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Apr 11, 2016
1 parent 36c7d14 commit 36364b6
Show file tree
Hide file tree
Showing 3 changed files with 232 additions and 1 deletion.
111 changes: 111 additions & 0 deletions integrations/data/TestStream_GroupByWhere.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
dbname
rpname
cpu,cpu=cpu0,host=serverA value=97.1 0000000001
dbname
rpname
cpu,cpu=cpu0,host=serverB value=67.1 0000000001
dbname
rpname
cpu,cpu=cpu1,host=serverA value=87.1 0000000001
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=97.1 0000000001
dbname
rpname
cpu,cpu=cpu0,host=serverA value=91.6 0000000002
dbname
rpname
cpu,cpu=cpu1,host=serverA value=82.6 0000000002
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=92.6 0000000002
dbname
rpname
cpu,cpu=cpu0,host=serverA value=35.6 0000000003
dbname
rpname
cpu,cpu=cpu1,host=serverA value=85.6 0000000003
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.6 0000000003
dbname
rpname
cpu,cpu=cpu0,host=serverA value=73.1 0000000004
dbname
rpname
cpu,cpu=cpu1,host=serverA value=63.1 0000000004
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=93.1 0000000004
dbname
rpname
cpu,cpu=cpu0,host=serverA value=32.6 0000000005
dbname
rpname
cpu,cpu=cpu1,host=serverA value=72.6 0000000005
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=92.6 0000000005
dbname
rpname
cpu,cpu=cpu0,host=serverA value=94.8 0000000006
dbname
rpname
cpu,cpu=cpu1,host=serverA value=25.8 0000000006
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.8 0000000006
dbname
rpname
cpu,cpu=cpu0,host=serverC value=15.8 0000000007
dbname
rpname
cpu,cpu=cpu1,host=serverA value=82.7 0000000007
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=92.7 0000000007
dbname
rpname
cpu,cpu=cpu0,host=serverA value=82.7 0000000008
dbname
rpname
cpu,cpu=cpu1,host=serverA value=66.0 0000000008
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=96.0 0000000008
dbname
rpname
cpu,cpu=cpu0,host=serverA value=86.0 0000000009
dbname
rpname
cpu,cpu=cpu1,host=serverA value=73.4 0000000009
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=93.4 0000000009
dbname
rpname
cpu,cpu=cpu0,host=serverA value=73.4 0000000010
dbname
rpname
cpu,cpu=cpu1,host=serverA value=85.3 0000000010
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.3 0000000010
dbname
rpname
cpu,cpu=cpu0,host=serverA value=65.3 0000000011
dbname
rpname
cpu,cpu=cpu1,host=serverA value=93.4 0000000011
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=96.4 0000000011
dbname
rpname
cpu,cpu=cpu0,host=serverA value=15.1 0000000012
dbname
rpname
cpu,cpu=cpu1,host=serverA value=95.1 0000000012
dbname
rpname
cpu,cpu=cpu-total,host=serverA value=95.1 0000000012
63 changes: 63 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,6 +867,69 @@ stream
testStreamerWithOutput(t, "TestStream_GroupBy", script, 13*time.Second, er, nil, false)
}

func TestStream_GroupByWhere(t *testing.T) {

var script = `
var serverA = stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
var byCpu = serverA
|groupBy('host', 'cpu')
var total = serverA
|where(lambda: "cpu" == 'cpu-total')
byCpu
|join(total)
.on('host')
.as('cpu', 'total')
|eval(lambda: "cpu.value" / "total.value")
.as('cpu_percent')
|window()
.period(10s)
.every(10s)
|mean('cpu_percent')
|httpOut('TestStream_GroupByWhere')
`

er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: map[string]string{"cpu": "cpu0", "host": "serverA"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.7823116704593873,
}},
},
{
Name: "cpu",
Tags: map[string]string{"cpu": "cpu1", "host": "serverA"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.7676074281820646,
}},
},
{
Name: "cpu",
Tags: map[string]string{"cpu": "cpu-total", "host": "serverA"},
Columns: []string{"time", "mean"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
1.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_GroupByWhere", script, 13*time.Second, er, nil, false)
}

func TestStream_Join(t *testing.T) {

var script = `
Expand Down
59 changes: 58 additions & 1 deletion pipeline/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pipeline

import (
"fmt"
"time"

"github.com/influxdata/kapacitor/tick"
Expand Down Expand Up @@ -72,6 +73,10 @@ func (s *SourceStreamNode) From() *StreamNode {
// the tag `host` matches the regex `logger\d+`
type StreamNode struct {
chainnode

// self describer
describer *tick.ReflectionDescriber

// An expression to filter the data stream.
// tick:ignore
Expression tick.Node `tick:"Where"`
Expand Down Expand Up @@ -105,9 +110,12 @@ type StreamNode struct {
}

func newStreamNode() *StreamNode {
return &StreamNode{
s := &StreamNode{
chainnode: newBasicChainNode("stream", StreamEdge, StreamEdge),
}
s.describer, _ = tick.NewReflectionDescriber(s)
return s

}

// Creates a new stream node that can be further
Expand Down Expand Up @@ -225,3 +233,52 @@ func (s *StreamNode) GroupBy(tag ...interface{}) *StreamNode {
s.Dimensions = tag
return s
}

// Tick Describer methods

//tick:ignore
func (s *StreamNode) Desc() string {
return s.describer.Desc()
}

//tick:ignore
func (s *StreamNode) HasChainMethod(name string) bool {
if name == "groupBy" || name == "where" {
return true
}
return s.describer.HasChainMethod(name)
}

//tick:ignore
func (s *StreamNode) CallChainMethod(name string, args ...interface{}) (interface{}, error) {
switch name {
case "groupBy":
return s.chainnode.GroupBy(args...), nil
case "where":
if len(args) != 1 {
return nil, fmt.Errorf("invalid number of args to |where() got %d exp 1", len(args))
}
expr, ok := args[0].(tick.Node)
if !ok {
return nil, fmt.Errorf("invalid arg to |where() got %T exp tick.Node", args[0])
}
return s.chainnode.Where(expr), nil
default:
return s.describer.CallChainMethod(name, args...)
}
}

//tick:ignore
func (s *StreamNode) HasProperty(name string) bool {
return s.describer.HasProperty(name)
}

//tick:ignore
func (s *StreamNode) Property(name string) interface{} {
return s.describer.Property(name)
}

//tick:ignore
func (s *StreamNode) SetProperty(name string, args ...interface{}) (interface{}, error) {
return s.describer.SetProperty(name, args...)
}

0 comments on commit 36364b6

Please sign in to comment.