diff --git a/integrations/data/TestStream_GroupByWhere.srpl b/integrations/data/TestStream_GroupByWhere.srpl new file mode 100644 index 000000000..78bcb1dca --- /dev/null +++ b/integrations/data/TestStream_GroupByWhere.srpl @@ -0,0 +1,111 @@ +dbname +rpname +cpu,cpu=cpu0,host=serverA value=97.1 0000000001 +dbname +rpname +cpu,cpu=cpu0,host=serverB value=67.1 0000000001 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=87.1 0000000001 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=97.1 0000000001 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=91.6 0000000002 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=82.6 0000000002 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=92.6 0000000002 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=35.6 0000000003 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=85.6 0000000003 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=95.6 0000000003 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=73.1 0000000004 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=63.1 0000000004 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=93.1 0000000004 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=32.6 0000000005 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=72.6 0000000005 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=92.6 0000000005 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=94.8 0000000006 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=25.8 0000000006 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=95.8 0000000006 +dbname +rpname +cpu,cpu=cpu0,host=serverC value=15.8 0000000007 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=82.7 0000000007 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=92.7 0000000007 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=82.7 0000000008 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=66.0 0000000008 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=96.0 0000000008 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=86.0 0000000009 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=73.4 0000000009 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=93.4 0000000009 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=73.4 0000000010 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=85.3 0000000010 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=95.3 0000000010 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=65.3 0000000011 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=93.4 0000000011 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=96.4 0000000011 +dbname +rpname +cpu,cpu=cpu0,host=serverA value=15.1 0000000012 +dbname +rpname +cpu,cpu=cpu1,host=serverA value=95.1 0000000012 +dbname +rpname +cpu,cpu=cpu-total,host=serverA value=95.1 0000000012 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index a1fc275d8..aa0d9642c 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -867,6 +867,69 @@ stream testStreamerWithOutput(t, "TestStream_GroupBy", script, 13*time.Second, er, nil, false) } +func TestStream_GroupByWhere(t *testing.T) { + + var script = ` +var serverA = stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + +var byCpu = serverA + |groupBy('host', 'cpu') + +var total = serverA + |where(lambda: "cpu" == 'cpu-total') + +byCpu + |join(total) + .on('host') + .as('cpu', 'total') + |eval(lambda: "cpu.value" / "total.value") + .as('cpu_percent') + |window() + .period(10s) + .every(10s) + |mean('cpu_percent') + |httpOut('TestStream_GroupByWhere') +` + + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"cpu": "cpu0", "host": "serverA"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.7823116704593873, + }}, + }, + { + Name: "cpu", + Tags: map[string]string{"cpu": "cpu1", "host": "serverA"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 0.7676074281820646, + }}, + }, + { + Name: "cpu", + Tags: map[string]string{"cpu": "cpu-total", "host": "serverA"}, + Columns: []string{"time", "mean"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 1.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_GroupByWhere", script, 13*time.Second, er, nil, false) +} + func TestStream_Join(t *testing.T) { var script = ` diff --git a/pipeline/stream.go b/pipeline/stream.go index ef0811dc5..a2405b302 100644 --- a/pipeline/stream.go +++ b/pipeline/stream.go @@ -1,6 +1,7 @@ package pipeline import ( + "fmt" "time" "github.com/influxdata/kapacitor/tick" @@ -72,6 +73,10 @@ func (s *SourceStreamNode) From() *StreamNode { // the tag `host` matches the regex `logger\d+` type StreamNode struct { chainnode + + // self describer + describer *tick.ReflectionDescriber + // An expression to filter the data stream. // tick:ignore Expression tick.Node `tick:"Where"` @@ -105,9 +110,12 @@ type StreamNode struct { } func newStreamNode() *StreamNode { - return &StreamNode{ + s := &StreamNode{ chainnode: newBasicChainNode("stream", StreamEdge, StreamEdge), } + s.describer, _ = tick.NewReflectionDescriber(s) + return s + } // Creates a new stream node that can be further @@ -225,3 +233,52 @@ func (s *StreamNode) GroupBy(tag ...interface{}) *StreamNode { s.Dimensions = tag return s } + +// Tick Describer methods + +//tick:ignore +func (s *StreamNode) Desc() string { + return s.describer.Desc() +} + +//tick:ignore +func (s *StreamNode) HasChainMethod(name string) bool { + if name == "groupBy" || name == "where" { + return true + } + return s.describer.HasChainMethod(name) +} + +//tick:ignore +func (s *StreamNode) CallChainMethod(name string, args ...interface{}) (interface{}, error) { + switch name { + case "groupBy": + return s.chainnode.GroupBy(args...), nil + case "where": + if len(args) != 1 { + return nil, fmt.Errorf("invalid number of args to |where() got %d exp 1", len(args)) + } + expr, ok := args[0].(tick.Node) + if !ok { + return nil, fmt.Errorf("invalid arg to |where() got %T exp tick.Node", args[0]) + } + return s.chainnode.Where(expr), nil + default: + return s.describer.CallChainMethod(name, args...) + } +} + +//tick:ignore +func (s *StreamNode) HasProperty(name string) bool { + return s.describer.HasProperty(name) +} + +//tick:ignore +func (s *StreamNode) Property(name string) interface{} { + return s.describer.Property(name) +} + +//tick:ignore +func (s *StreamNode) SetProperty(name string, args ...interface{}) (interface{}, error) { + return s.describer.SetProperty(name, args...) +}