Skip to content

Commit

Permalink
Join can emit ready sets without waiting (influxdata#523)
Browse files Browse the repository at this point in the history
* join can emit ready sets without waiting

* changelog
  • Loading branch information
Nathaniel Cook committed May 6, 2016
1 parent a4bff55 commit d3ae18d
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 55 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ For example, let's say we want to store all data that triggered an alert in Infl
- [#425](https://github.com/influxdata/kapacitor/pull/425): BREAKING: Preserving tags on influxql simple selectors - first, last, max, min, percentile
- [#423](https://github.com/influxdata/kapacitor/issues/423): Recording stream queries with group by now correctly saves data in time order not group by order.
- [#331](https://github.com/influxdata/kapacitor/issues/331): Fix panic when missing `.as()` for JoinNode.
- [#523](https://github.com/influxdata/kapacitor/pull/523): JoinNode will now emit join sets as soon as they are ready. If multiple joinable sets arrive in the same tolerance window than each will be emitted (previously the first points were dropped).

## v0.12.0 [2016-04-04]

Expand Down
20 changes: 10 additions & 10 deletions integrations/data/TestStream_JoinTolerance.srpl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ rpname
errors,service=front,dc=A value=2 0000000001
dbname
rpname
views,service=front,dc=A value=200 0000000002
views,service=front,dc=A value=200 0000000001
dbname
rpname
errors,service=cartA,dc=B value=9 0000000002
Expand All @@ -27,13 +27,13 @@ rpname
views,service=cartA,dc=B value=900 0000000002
dbname
rpname
errors,service=login,dc=A value=5 0000000003
errors,service=login,dc=A value=5 0000000002
dbname
rpname
views,service=login,dc=A value=500 0000000003
views,service=login,dc=A value=500 0000000002
dbname
rpname
errors,service=front,dc=B value=9 0000000003
errors,service=front,dc=B value=9 0000000002
dbname
rpname
views,service=front,dc=B value=900 0000000003
Expand All @@ -51,10 +51,10 @@ rpname
views,service=login,dc=B value=900 0000000004
dbname
rpname
errors,service=front,dc=A value=2 0000000005
errors,service=front,dc=A value=7 0000000005
dbname
rpname
views,service=front,dc=A value=200 0000000005
views,service=front,dc=A value=700 0000000005
dbname
rpname
errors,service=login,dc=B value=2 0000000005
Expand All @@ -69,10 +69,10 @@ rpname
views,service=front,dc=A value=500 0000000006
dbname
rpname
errors,service=cartA,dc=B value=9 0000000006
errors,service=cartA,dc=B value=11 0000000006
dbname
rpname
views,service=cartA,dc=B value=900 0000000006
views,service=cartA,dc=B value=1100 0000000006
dbname
rpname
errors,service=login,dc=C value=7 0000000006
Expand All @@ -87,10 +87,10 @@ rpname
views,service=front,dc=A value=400 0000000007
dbname
rpname
errors,service=cartA,dc=B value=8 0000000007
errors,service=cartA,dc=B value=12 0000000007
dbname
rpname
views,service=cartA,dc=B value=800 0000000007
views,service=cartA,dc=B value=1200 0000000007
dbname
rpname
errors,service=front,dc=A value=6 0000000008
Expand Down
125 changes: 105 additions & 20 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1084,13 +1084,9 @@ errorCounts
.as('errors', 'views')
.tolerance(2s)
.streamName('error_view')
|eval(lambda: "errors.value" / "views.value")
.as('error_percent')
|window()
.period(10s)
.every(10s)
|mean('error_percent')
.as('error_percent')
|httpOut('TestStream_JoinTolerance')
`

Expand All @@ -1099,29 +1095,118 @@ errorCounts
{
Name: "error_view",
Tags: map[string]string{"service": "cartA"},
Columns: []string{"time", "error_percent"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
}},
Columns: []string{"time", "errors.value", "views.value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
7.0,
700.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
9.0,
900.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
3.0,
300.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
11.0,
1100.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
12.0,
1200.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
6.0,
600.0,
},
},
},
{
Name: "error_view",
Tags: map[string]string{"service": "login"},
Columns: []string{"time", "error_percent"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
0.01,
}},
Columns: []string{"time", "errors.value", "views.value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
9.0,
900.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
5.0,
500.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
9.0,
900.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
2.0,
200.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
7.0,
700.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
10.0,
1000.0,
},
},
},
{
Name: "error_view",
Tags: map[string]string{"service": "front"},
Columns: []string{"time", "error_percent"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 12, 0, time.UTC),
0.01,
}},
Columns: []string{"time", "errors.value", "views.value"},
Values: [][]interface{}{
{
time.Date(1971, 1, 1, 0, 0, 0, 0, time.UTC),
2.0,
200.0,
},
{
time.Date(1971, 1, 1, 0, 0, 2, 0, time.UTC),
9.0,
900.0,
},
{
time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC),
7.0,
700.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
5.0,
500.0,
},
{
time.Date(1971, 1, 1, 0, 0, 6, 0, time.UTC),
4.0,
400.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
6.0,
600.0,
},
{
time.Date(1971, 1, 1, 0, 0, 8, 0, time.UTC),
4.0,
400.0,
},
},
},
},
}
Expand Down Expand Up @@ -1226,7 +1311,7 @@ cpu
Columns: []string{"time", "count"},
Values: [][]interface{}{[]interface{}{
time.Date(1971, 1, 1, 0, 0, 10, 0, time.UTC),
9.0,
10.0,
}},
},
},
Expand Down
92 changes: 67 additions & 25 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ type srcPoint struct {

// handles emitting joined sets once enough data has arrived from parents.
type group struct {
sets map[time.Time]*joinset
sets map[time.Time][]*joinset
head []time.Time
oldestTime time.Time
j *JoinNode
Expand All @@ -242,7 +242,7 @@ type group struct {

func newGroup(i int, j *JoinNode) *group {
return &group{
sets: make(map[time.Time]*joinset),
sets: make(map[time.Time][]*joinset),
head: make([]time.Time, i),
j: j,
points: make(chan srcPoint),
Expand All @@ -269,44 +269,78 @@ func (g *group) collect(i int, p models.PointInterface) error {
g.oldestTime = t
}

set := g.sets[t]
var set *joinset
sets := g.sets[t]
if len(sets) == 0 {
set = newJoinset(
g.j.j.StreamName,
g.j.fill,
g.j.fillValue,
g.j.j.Names,
g.j.j.Tolerance,
t,
g.j.logger,
)
sets = append(sets, set)
g.sets[t] = sets
}
for j := 0; j < len(sets); j++ {
if !sets[j].Has(i) {
set = sets[j]
break
}
}
if set == nil {
set = newJoinset(g.j.j.StreamName, g.j.fill, g.j.fillValue, g.j.j.Names, g.j.j.Tolerance, t, g.j.logger)
g.sets[t] = set
set = newJoinset(
g.j.j.StreamName,
g.j.fill,
g.j.fillValue,
g.j.j.Names,
g.j.j.Tolerance,
t,
g.j.logger,
)
sets = append(sets, set)
g.sets[t] = sets
}
set.Add(i, p)
set.Set(i, p)

// Update head
g.head[i] = t

// Check if all parents have been read past the oldestTime.
// If so we can emit the set.
emit := true
onlyReadySets := false
for _, t := range g.head {
if !t.After(g.oldestTime) {
// Still posible to get more data
// need to wait for more points
emit = false
onlyReadySets = true
break
}
}
if emit {
err := g.emit()
if err != nil {
return err
}
err := g.emit(onlyReadySets)
if err != nil {
return err
}
return nil
}

// emit a set and update the oldestTime.
func (g *group) emit() error {
set := g.sets[g.oldestTime]
err := g.emitJoinedSet(set)
if err != nil {
return err
func (g *group) emit(onlyReadySets bool) error {
sets := g.sets[g.oldestTime]
i := 0
for ; i < len(sets); i++ {
if sets[i].Ready() || !onlyReadySets {
err := g.emitJoinedSet(sets[i])
if err != nil {
return err
}
} else {
break
}
}
if i == len(sets) {
delete(g.sets, g.oldestTime)
} else {
g.sets[g.oldestTime] = sets[i:]
}
delete(g.sets, g.oldestTime)

g.oldestTime = time.Time{}
for t := range g.sets {
Expand All @@ -321,7 +355,7 @@ func (g *group) emit() error {
func (g *group) emitAll() error {
var lastErr error
for len(g.sets) > 0 {
err := g.emit()
err := g.emit(false)
if err != nil {
lastErr = err
}
Expand Down Expand Up @@ -403,8 +437,16 @@ func newJoinset(
}
}

func (js *joinset) Ready() bool {
return js.size == js.expected
}

func (js *joinset) Has(i int) bool {
return js.values[i] != nil
}

// add a point to the set from a given parent index.
func (js *joinset) Add(i int, v models.PointInterface) {
func (js *joinset) Set(i int, v models.PointInterface) {
if i < js.first {
js.first = i
}
Expand Down

0 comments on commit d3ae18d

Please sign in to comment.