Skip to content

Commit

Permalink
fix Join not catching up after a data pause
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Sep 5, 2018
1 parent e58f95a commit 92c615e
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 7 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## unreleased

### Bugfixes

- [#2048](https://github.com/influxdata/kapacitor/pull/2048): Fix join not catching up fast enough after a pause in the data stream.

## v1.5.1 [2018-08-06]

### Bugfixes
Expand Down
33 changes: 26 additions & 7 deletions join.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,7 @@ func (g *joinGroup) Collect(src int, p timeMessage) error {
// Update head
g.head[src] = t

onlyReadySets := false
for _, t := range g.head {
if !t.After(g.oldestTime) {
onlyReadySets = true
break
}
}
onlyReadySets := g.checkOnlyReadSets()
err := g.emit(onlyReadySets)
if err != nil {
return err
Expand All @@ -367,6 +361,9 @@ func (g *joinGroup) newJoinset(t time.Time) *joinset {

// emit a set and update the oldestTime.
func (g *joinGroup) emit(onlyReadySets bool) error {
if len(g.sets) == 0 {
return nil
}
sets := g.sets[g.oldestTime]
i := 0
for ; i < len(sets); i++ {
Expand All @@ -391,9 +388,31 @@ func (g *joinGroup) emit(onlyReadySets bool) error {
g.oldestTime = t
}
}
// Check if there are more non ready sets we can emit.
// This occurs when one of the parents missed a section of data
// while the other parents continued on.
// We need to emit all the buffered sets as soon as all the parent heads have passed the oldesttime.
if !onlyReadySets {
onlyReadySets = g.checkOnlyReadSets()
return g.emit(onlyReadySets)
}
return nil
}

// checkOnlyReadSets reports if all heads are past the oldesttime,
// indicated whether its ok to emit non ready sets.
func (g *joinGroup) checkOnlyReadSets() bool {
onlyReadySets := false
// Check if heads are past oldest time
for _, t := range g.head {
if !t.After(g.oldestTime) {
onlyReadySets = true
break
}
}
return onlyReadySets
}

// emit sets until we have none left.
func (g *joinGroup) emitAll() error {
var lastErr error
Expand Down

0 comments on commit 92c615e

Please sign in to comment.