Skip to content

Commit

Permalink
implement batch group by
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Feb 17, 2016
1 parent 407644c commit f4a42ac
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Release Notes

### Features
- [#236](https://github.com/influxdata/kapacitor/issues/236): Implement batched group by
- [#231](https://github.com/influxdata/kapacitor/pull/231): Add ShiftNode so values can be shifted in time for joining/comparisons.
- [#190](https://github.com/influxdata/kapacitor/issues/190): BREAKING: Deadman's switch now triggers off emitted counts and is grouped by to original grouping of the data.
The breaking change is that the 'collected' stat is no longer output for `.stats` and has been replaced by `emitted`.
Expand Down
38 changes: 37 additions & 1 deletion group_by.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,43 @@ func (g *GroupByNode) runGroupBy([]byte) error {
}
}
default:
panic("not implemented")
for b, ok := g.ins[0].NextBatch(); ok; b, ok = g.ins[0].NextBatch() {
groups := make(map[models.GroupID]*models.Batch)
for _, p := range b.Points {
var dims []string
if g.allDimensions {
dims = models.SortedKeys(p.Tags)
} else {
dims = g.dimensions
}
groupID := models.TagsToGroupID(dims, p.Tags)
group, ok := groups[groupID]
if !ok {
tags := make(map[string]string, len(dims))
for _, dim := range dims {
tags[dim] = p.Tags[dim]
}
group = &models.Batch{
Name: b.Name,
Group: groupID,
TMax: b.TMax,
Tags: tags,
}
groups[groupID] = group
}
group.Points = append(group.Points, p)
}

for _, group := range groups {
for _, child := range g.outs {
err := child.CollectBatch(*group)
if err != nil {
return err
}
}
}

}
}
return nil
}
Expand Down
72 changes: 72 additions & 0 deletions integrations/data/TestStream_BatchGroupBy.srpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
dbname
rpname
cpu,type=idle,host=serverA value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverB value=97.1 0000000001
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000002
dbname
rpname
cpu,type=idle,host=serverB value=92.6 0000000002
dbname
rpname
cpu,type=idle,host=serverA value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverB value=95.6 0000000003
dbname
rpname
cpu,type=idle,host=serverA value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverB value=93.1 0000000004
dbname
rpname
cpu,type=idle,host=serverA value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverB value=92.6 0000000005
dbname
rpname
cpu,type=idle,host=serverA value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverC value=95.8 0000000006
dbname
rpname
cpu,type=idle,host=serverA value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverB value=92.7 0000000007
dbname
rpname
cpu,type=idle,host=serverA value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverB value=96.0 0000000008
dbname
rpname
cpu,type=idle,host=serverA value=93.4 0000000009
dbname
rpname
cpu,type=idle,host=serverB value=93.4 0000000009
dbname
rpname
cpu,type=idle,host=serverA value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverB value=95.3 0000000010
dbname
rpname
cpu,type=idle,host=serverA value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverB value=96.4 0000000011
dbname
rpname
cpu,type=idle,host=serverA value=95.1 0000000012
dbname
rpname
cpu,type=idle,host=serverB value=95.1 0000000012
5 changes: 5 additions & 0 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,19 @@ func compareResultsIgnoreSeriesOrder(exp, got kapacitor.Result) (bool, string) {
for i := range exp.Series {
// Find series with same name
var j int
found := false
for j = range set {
if exp.Series[i].Name == got.Series[j].Name &&
reflect.DeepEqual(exp.Series[i].Tags, got.Series[j].Tags) {
// found matching series
delete(set, j)
found = true
break
}
}
if !found {
return false, fmt.Sprintf("could not find matching series: %s %v", exp.Series[i].Name, exp.Series[i].Tags)
}
if !reflect.DeepEqual(exp.Series[i].Columns, got.Series[j].Columns) {
return false, fmt.Sprintf("unexpected series columns: i: %d \nexp %v \ngot %v", i, exp.Series[i].Columns, got.Series[j].Columns)
}
Expand Down
94 changes: 94 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,100 @@ stream
testStreamerWithOutput(t, "TestStream_SimpleMR", script, 15*time.Second, er, nil, false)
}

func TestStream_BatchGroupBy(t *testing.T) {

var script = `
stream
.from().measurement('cpu')
.window()
.period(10s)
.every(10s)
.groupBy('host')
.mapReduce(influxql.count('value'))
.httpOut('TestStream_BatchGroupBy')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA"},
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
10.0,
}},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB"},
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
9.0,
}},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverC"},
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
1.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_BatchGroupBy", script, 15*time.Second, er, nil, true)
}

func TestStream_BatchGroupByAll(t *testing.T) {

var script = `
stream
.from().measurement('cpu')
.window()
.period(10s)
.every(10s)
.groupBy(*)
.mapReduce(influxql.count('value'))
.httpOut('TestStream_BatchGroupBy')
`
er := kapacitor.Result{
Series: imodels.Rows{
{
Name: "cpu",
Tags: map[string]string{"host": "serverA", "type": "idle"},
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
10.0,
}},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverB", "type": "idle"},
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
9.0,
}},
},
{
Name: "cpu",
Tags: map[string]string{"host": "serverC", "type": "idle"},
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
1.0,
}},
},
},
}

testStreamerWithOutput(t, "TestStream_BatchGroupBy", script, 15*time.Second, er, nil, true)
}

func TestStream_SimpleWhere(t *testing.T) {

var script = `
Expand Down
2 changes: 1 addition & 1 deletion models/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (b Batch) PointTags() Tags {
}

func (b Batch) PointDimensions() []string {
return nil
return SortedKeys(b.Tags)
}

func BatchToRow(b Batch) (row *models.Row) {
Expand Down

0 comments on commit f4a42ac

Please sign in to comment.