From d3ae18d94cf093be48edf22540253e8e81c71494 Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 6 May 2016 16:32:18 -0600 Subject: [PATCH] Join can emit ready sets without waiting (#523) * join can emit ready sets without waiting * changelog --- CHANGELOG.md | 1 + .../data/TestStream_JoinTolerance.srpl | 20 +-- integrations/streamer_test.go | 125 +++++++++++++++--- join.go | 92 +++++++++---- 4 files changed, 183 insertions(+), 55 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1c117e651..e52325b88 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -120,6 +120,7 @@ For example, let's say we want to store all data that triggered an alert in Infl - [#425](https://github.com/influxdata/kapacitor/pull/425): BREAKING: Preserving tags on influxql simple selectors - first, last, max, min, percentile - [#423](https://github.com/influxdata/kapacitor/issues/423): Recording stream queries with group by now correctly saves data in time order not group by order. - [#331](https://github.com/influxdata/kapacitor/issues/331): Fix panic when missing `.as()` for JoinNode. +- [#523](https://github.com/influxdata/kapacitor/pull/523): JoinNode will now emit join sets as soon as they are ready. If multiple joinable sets arrive in the same tolerance window than each will be emitted (previously the first points were dropped). ## v0.12.0 [2016-04-04] diff --git a/integrations/data/TestStream_JoinTolerance.srpl b/integrations/data/TestStream_JoinTolerance.srpl index 723e5b781..997b3e188 100644 --- a/integrations/data/TestStream_JoinTolerance.srpl +++ b/integrations/data/TestStream_JoinTolerance.srpl @@ -18,7 +18,7 @@ rpname errors,service=front,dc=A value=2 0000000001 dbname rpname -views,service=front,dc=A value=200 0000000002 +views,service=front,dc=A value=200 0000000001 dbname rpname errors,service=cartA,dc=B value=9 0000000002 @@ -27,13 +27,13 @@ rpname views,service=cartA,dc=B value=900 0000000002 dbname rpname -errors,service=login,dc=A value=5 0000000003 +errors,service=login,dc=A value=5 0000000002 dbname rpname -views,service=login,dc=A value=500 0000000003 +views,service=login,dc=A value=500 0000000002 dbname rpname -errors,service=front,dc=B value=9 0000000003 +errors,service=front,dc=B value=9 0000000002 dbname rpname views,service=front,dc=B value=900 0000000003 @@ -51,10 +51,10 @@ rpname views,service=login,dc=B value=900 0000000004 dbname rpname -errors,service=front,dc=A value=2 0000000005 +errors,service=front,dc=A value=7 0000000005 dbname rpname -views,service=front,dc=A value=200 0000000005 +views,service=front,dc=A value=700 0000000005 dbname rpname errors,service=login,dc=B value=2 0000000005 @@ -69,10 +69,10 @@ rpname views,service=front,dc=A value=500 0000000006 dbname rpname -errors,service=cartA,dc=B value=9 0000000006 +errors,service=cartA,dc=B value=11 0000000006 dbname rpname -views,service=cartA,dc=B value=900 0000000006 +views,service=cartA,dc=B value=1100 0000000006 dbname rpname errors,service=login,dc=C value=7 0000000006 @@ -87,10 +87,10 @@ rpname views,service=front,dc=A value=400 0000000007 dbname rpname -errors,service=cartA,dc=B value=8 0000000007 +errors,service=cartA,dc=B value=12 0000000007 dbname rpname -views,service=cartA,dc=B value=800 0000000007 +views,service=cartA,dc=B value=1200 0000000007 dbname rpname errors,service=front,dc=A value=6 0000000008 diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index d80536893..6e01c23fb 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -1084,13 +1084,9 @@ errorCounts .as('errors', 'views') .tolerance(2s) .streamName('error_view') - |eval(lambda: "errors.value" / "views.value") - .as('error_percent') |window() .period(10s) .every(10s) - |mean('error_percent') - .as('error_percent') |httpOut('TestStream_JoinTolerance') ` @@ -1099,29 +1095,118 @@ errorCounts { Name: "error_view", Tags: map[string]string{"service": "cartA"}, - Columns: []string{"time", "error_percent"}, - Values: [][]interface{}{[]interface{}{ - time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), - 0.01, - }}, + Columns: []string{"time", "errors.value", "views.value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 7.0, + 700.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 9.0, + 900.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 3.0, + 300.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 11.0, + 1100.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 12.0, + 1200.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 6.0, + 600.0, + }, + }, }, { Name: "error_view", Tags: map[string]string{"service": "login"}, - Columns: []string{"time", "error_percent"}, - Values: [][]interface{}{[]interface{}{ - time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), - 0.01, - }}, + Columns: []string{"time", "errors.value", "views.value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 9.0, + 900.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 5.0, + 500.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 9.0, + 900.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 2.0, + 200.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 7.0, + 700.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 10.0, + 1000.0, + }, + }, }, { Name: "error_view", Tags: map[string]string{"service": "front"}, - Columns: []string{"time", "error_percent"}, - Values: [][]interface{}{[]interface{}{ - time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC), - 0.01, - }}, + Columns: []string{"time", "errors.value", "views.value"}, + Values: [][]interface{}{ + { + time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC), + 2.0, + 200.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC), + 9.0, + 900.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), + 7.0, + 700.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 5.0, + 500.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC), + 4.0, + 400.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 6.0, + 600.0, + }, + { + time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC), + 4.0, + 400.0, + }, + }, }, }, } @@ -1226,7 +1311,7 @@ cpu Columns: []string{"time", "count"}, Values: [][]interface{}{[]interface{}{ time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC), - 9.0, + 10.0, }}, }, }, diff --git a/join.go b/join.go index 4fefd98bf..336ae76aa 100644 --- a/join.go +++ b/join.go @@ -233,7 +233,7 @@ type srcPoint struct { // handles emitting joined sets once enough data has arrived from parents. type group struct { - sets map[time.Time]*joinset + sets map[time.Time][]*joinset head []time.Time oldestTime time.Time j *JoinNode @@ -242,7 +242,7 @@ type group struct { func newGroup(i int, j *JoinNode) *group { return &group{ - sets: make(map[time.Time]*joinset), + sets: make(map[time.Time][]*joinset), head: make([]time.Time, i), j: j, points: make(chan srcPoint), @@ -269,44 +269,78 @@ func (g *group) collect(i int, p models.PointInterface) error { g.oldestTime = t } - set := g.sets[t] + var set *joinset + sets := g.sets[t] + if len(sets) == 0 { + set = newJoinset( + g.j.j.StreamName, + g.j.fill, + g.j.fillValue, + g.j.j.Names, + g.j.j.Tolerance, + t, + g.j.logger, + ) + sets = append(sets, set) + g.sets[t] = sets + } + for j := 0; j < len(sets); j++ { + if !sets[j].Has(i) { + set = sets[j] + break + } + } if set == nil { - set = newJoinset(g.j.j.StreamName, g.j.fill, g.j.fillValue, g.j.j.Names, g.j.j.Tolerance, t, g.j.logger) - g.sets[t] = set + set = newJoinset( + g.j.j.StreamName, + g.j.fill, + g.j.fillValue, + g.j.j.Names, + g.j.j.Tolerance, + t, + g.j.logger, + ) + sets = append(sets, set) + g.sets[t] = sets } - set.Add(i, p) + set.Set(i, p) // Update head g.head[i] = t - // Check if all parents have been read past the oldestTime. - // If so we can emit the set. - emit := true + onlyReadySets := false for _, t := range g.head { if !t.After(g.oldestTime) { - // Still posible to get more data - // need to wait for more points - emit = false + onlyReadySets = true break } } - if emit { - err := g.emit() - if err != nil { - return err - } + err := g.emit(onlyReadySets) + if err != nil { + return err } return nil } // emit a set and update the oldestTime. -func (g *group) emit() error { - set := g.sets[g.oldestTime] - err := g.emitJoinedSet(set) - if err != nil { - return err +func (g *group) emit(onlyReadySets bool) error { + sets := g.sets[g.oldestTime] + i := 0 + for ; i < len(sets); i++ { + if sets[i].Ready() || !onlyReadySets { + err := g.emitJoinedSet(sets[i]) + if err != nil { + return err + } + } else { + break + } + } + if i == len(sets) { + delete(g.sets, g.oldestTime) + } else { + g.sets[g.oldestTime] = sets[i:] } - delete(g.sets, g.oldestTime) g.oldestTime = time.Time{} for t := range g.sets { @@ -321,7 +355,7 @@ func (g *group) emit() error { func (g *group) emitAll() error { var lastErr error for len(g.sets) > 0 { - err := g.emit() + err := g.emit(false) if err != nil { lastErr = err } @@ -403,8 +437,16 @@ func newJoinset( } } +func (js *joinset) Ready() bool { + return js.size == js.expected +} + +func (js *joinset) Has(i int) bool { + return js.values[i] != nil +} + // add a point to the set from a given parent index. -func (js *joinset) Add(i int, v models.PointInterface) { +func (js *joinset) Set(i int, v models.PointInterface) { if i < js.first { js.first = i }