From f4a42ac51181a55a4c0b1e0ed77005992216db5b Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Wed, 17 Feb 2016 12:15:16 -0700 Subject: [PATCH] implement batch group by --- CHANGELOG.md | 1 + group_by.go | 38 +++++++- .../data/TestStream_BatchGroupBy.srpl | 72 ++++++++++++++ integrations/helpers_test.go | 5 + integrations/streamer_test.go | 94 +++++++++++++++++++ models/batch.go | 2 +- 6 files changed, 210 insertions(+), 2 deletions(-) create mode 100644 integrations/data/TestStream_BatchGroupBy.srpl diff --git a/CHANGELOG.md b/CHANGELOG.md index 29d4f2400..135a17563 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Release Notes ### Features +- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by - [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons. - [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data. The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`. diff --git a/group_by.go b/group_by.go index aadc476eb..8bddfc356 100644 --- a/group_by.go +++ b/group_by.go @@ -41,7 +41,43 @@ func (g *GroupByNode) runGroupBy([]byte) error { } } default: - panic("not implemented") + for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() { + groups := make(map[models.GroupID]*models.Batch) + for _, p := range b.Points { + var dims []string + if g.allDimensions { + dims = models.SortedKeys(p.Tags) + } else { + dims = g.dimensions + } + groupID := models.TagsToGroupID(dims, p.Tags) + group, ok := groups[groupID] + if !ok { + tags := make(map[string]string, len(dims)) + for _, dim := range dims { + tags[dim] = p.Tags[dim] + } + group = &models.Batch{ + Name: b.Name, + Group: groupID, + TMax: b.TMax, + Tags: tags, + } + groups[groupID] = group + } + group.Points = append(group.Points, p) + } + + for _, group := range groups { + for _, child := range g.outs { + err := child.CollectBatch(*group) + if err != nil { + return err + } + } + } + + } } return nil } diff --git a/integrations/data/TestStream_BatchGroupBy.srpl b/integrations/data/TestStream_BatchGroupBy.srpl new file mode 100644 index 000000000..38c16a339 --- /dev/null +++ b/integrations/data/TestStream_BatchGroupBy.srpl @@ -0,0 +1,72 @@ +dbname +rpname +cpu,type=idle,host=serverA value=97.1 0000000001 +dbname +rpname +cpu,type=idle,host=serverB value=97.1 0000000001 +dbname +rpname +cpu,type=idle,host=serverA value=92.6 0000000002 +dbname +rpname +cpu,type=idle,host=serverB value=92.6 0000000002 +dbname +rpname +cpu,type=idle,host=serverA value=95.6 0000000003 +dbname +rpname +cpu,type=idle,host=serverB value=95.6 0000000003 +dbname +rpname +cpu,type=idle,host=serverA value=93.1 0000000004 +dbname +rpname +cpu,type=idle,host=serverB value=93.1 0000000004 +dbname +rpname +cpu,type=idle,host=serverA value=92.6 0000000005 +dbname +rpname +cpu,type=idle,host=serverB value=92.6 0000000005 +dbname +rpname +cpu,type=idle,host=serverA value=95.8 0000000006 +dbname +rpname +cpu,type=idle,host=serverC value=95.8 0000000006 +dbname +rpname +cpu,type=idle,host=serverA value=92.7 0000000007 +dbname +rpname +cpu,type=idle,host=serverB value=92.7 0000000007 +dbname +rpname +cpu,type=idle,host=serverA value=96.0 0000000008 +dbname +rpname +cpu,type=idle,host=serverB value=96.0 0000000008 +dbname +rpname +cpu,type=idle,host=serverA value=93.4 0000000009 +dbname +rpname +cpu,type=idle,host=serverB value=93.4 0000000009 +dbname +rpname +cpu,type=idle,host=serverA value=95.3 0000000010 +dbname +rpname +cpu,type=idle,host=serverB value=95.3 0000000010 +dbname +rpname +cpu,type=idle,host=serverA value=96.4 0000000011 +dbname +rpname +cpu,type=idle,host=serverB value=96.4 0000000011 +dbname +rpname +cpu,type=idle,host=serverA value=95.1 0000000012 +dbname +rpname +cpu,type=idle,host=serverB value=95.1 0000000012 diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index 3a664c74a..cc017e619 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -84,14 +84,19 @@ func compareResultsIgnoreSeriesOrder(exp, got kapacitor.Result) (bool, string) { for i := range exp.Series { // Find series with same name var j int + found := false for j = range set { if exp.Series[i].Name == got.Series[j].Name && reflect.DeepEqual(exp.Series[i].Tags, got.Series[j].Tags) { // found matching series delete(set, j) + found = true break } } + if !found { + return false, fmt.Sprintf("could not find matching series: %s %v", exp.Series[i].Name, exp.Series[i].Tags) + } if !reflect.DeepEqual(exp.Series[i].Columns, got.Series[j].Columns) { return false, fmt.Sprintf("unexpected series columns: i: %d \nexp %v \ngot %v", i, exp.Series[i].Columns, got.Series[j].Columns) } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index f9cb121ee..16d3188c0 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -461,6 +461,100 @@ stream testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false) } +func TestStream_BatchGroupBy(t *testing.T) { + + var script = ` +stream + .from().measurement('cpu') + .window() + .period(10s) + .every(10s) + .groupBy('host') + .mapReduce(influxql.count('value')) + .httpOut('TestStream_BatchGroupBy') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 10.0, + }}, + }, + { + Name: "cpu", + Tags: map[string]string{"host": "serverB"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 9.0, + }}, + }, + { + Name: "cpu", + Tags: map[string]string{"host": "serverC"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 1.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_BatchGroupBy", script, 15*time.Second, er, nil, true) +} + +func TestStream_BatchGroupByAll(t *testing.T) { + + var script = ` +stream + .from().measurement('cpu') + .window() + .period(10s) + .every(10s) + .groupBy(*) + .mapReduce(influxql.count('value')) + .httpOut('TestStream_BatchGroupBy') +` + er := kapacitor.Result{ + Series: imodels.Rows{ + { + Name: "cpu", + Tags: map[string]string{"host": "serverA", "type": "idle"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 10.0, + }}, + }, + { + Name: "cpu", + Tags: map[string]string{"host": "serverB", "type": "idle"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 9.0, + }}, + }, + { + Name: "cpu", + Tags: map[string]string{"host": "serverC", "type": "idle"}, + Columns: []string{"time", "count"}, + Values: [][]interface{}{[]interface{}{ + time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), + 1.0, + }}, + }, + }, + } + + testStreamerWithOutput(t, "TestStream_BatchGroupBy", script, 15*time.Second, er, nil, true) +} + func TestStream_SimpleWhere(t *testing.T) { var script = ` diff --git a/models/batch.go b/models/batch.go index 3a93e01c8..18d5a3258 100644 --- a/models/batch.go +++ b/models/batch.go @@ -59,7 +59,7 @@ func (b Batch) PointTags() Tags { } func (b Batch) PointDimensions() []string { - return nil + return SortedKeys(b.Tags) } func BatchToRow(b Batch) (row *models.Row) {