Skip to content

Commit

Permalink
Update heap iterator to allow for entries with duplicate timestamps.
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wilkie <[email protected]>
  • Loading branch information
tomwilkie committed Feb 7, 2019
1 parent 26f3b52 commit db36b3c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 28 deletions.
69 changes: 50 additions & 19 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"io"
"regexp"
"sort"
"time"

"github.com/grafana/loki/pkg/helpers"
Expand Down Expand Up @@ -95,8 +96,9 @@ type heapIterator struct {
heap.Interface
Peek() EntryIterator
}
curr EntryIterator
errs []error
currEntry logproto.Entry
currLabels string
errs []error
}

// NewHeapIterator returns a new iterator which uses a heap to merge together
Expand All @@ -114,14 +116,14 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIter

// pre-next each iterator, drop empty.
for _, i := range is {
result.requeue(i)
result.requeue(i, false)
}

return result
}

func (i *heapIterator) requeue(ei EntryIterator) {
if ei.Next() {
func (i *heapIterator) requeue(ei EntryIterator, advanced bool) {
if advanced || ei.Next() {
heap.Push(i.heap, ei)
return
}
Expand All @@ -133,38 +135,67 @@ func (i *heapIterator) requeue(ei EntryIterator) {
}

func (i *heapIterator) Next() bool {
if i.curr != nil {
i.requeue(i.curr)
}

if i.heap.Len() == 0 {
return false
}

i.curr = heap.Pop(i.heap).(EntryIterator)
currEntry := i.curr.Entry()

// keep popping entries off if they match, to dedupe
// We support multiple entries with the same timestamp, and we want to
// preserve their original order. We look at all the top entries in the
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
type tuple struct {
logproto.Entry
EntryIterator
}
tuples := make([]tuple, 0, i.heap.Len())
for i.heap.Len() > 0 {
next := i.heap.Peek()
nextEntry := next.Entry()
if !currEntry.Equal(nextEntry) {
entry := next.Entry()
if len(tuples) > 0 && !tuples[0].Timestamp.Equal(entry.Timestamp) {
break
}

next = heap.Pop(i.heap).(EntryIterator)
i.requeue(next)
heap.Pop(i.heap)
tuples = append(tuples, tuple{
Entry: entry,
EntryIterator: next,
})
}

// Find in entry which occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
sort.Slice(tuples, func(i, j int) bool {
return tuples[i].Line < tuples[j].Line
})
i.currEntry = tuples[0].Entry
count, max := 1, 1
for j := 1; j < len(tuples); j++ {
if tuples[j].Equal(tuples[j-1]) {
count++
continue
}
if count > max {
i.currEntry = tuples[j-1].Entry
max = count
}
count++
}

// Requeue the iterators, only advancing them if they were not the
// correct pick.
for j := range tuples {
i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line)
}

return true
}

func (i *heapIterator) Entry() logproto.Entry {
return i.curr.Entry()
return i.currEntry
}

func (i *heapIterator) Labels() string {
return i.curr.Labels()
return i.currLabels
}

func (i *heapIterator) Error() error {
Expand Down
35 changes: 26 additions & 9 deletions pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func TestIterator(t *testing.T) {
// Test dedupe of overlapping iterators with the heap iterator.
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, offset(0)),
mkStreamIterator(testSize, offset(testSize/2)),
mkStreamIterator(testSize, offset(testSize)),
mkStreamIterator(testSize, offset(0, identity)),
mkStreamIterator(testSize, offset(testSize/2, identity)),
mkStreamIterator(testSize, offset(testSize, identity)),
}, logproto.FORWARD),
generator: identity,
length: 2 * testSize,
Expand All @@ -45,13 +45,24 @@ func TestIterator(t *testing.T) {
// Test dedupe of overlapping iterators with the heap iterator (backward).
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, inverse(offset(0))),
mkStreamIterator(testSize, inverse(offset(-testSize/2))),
mkStreamIterator(testSize, inverse(offset(-testSize))),
mkStreamIterator(testSize, inverse(offset(0, identity))),
mkStreamIterator(testSize, inverse(offset(-testSize/2, identity))),
mkStreamIterator(testSize, inverse(offset(-testSize, identity))),
}, logproto.BACKWARD),
generator: inverse(identity),
length: 2 * testSize,
},

// Test dedupe of entries with the same timestamp but different entries.
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, offset(0, constant(0))),
mkStreamIterator(testSize, offset(0, constant(0))),
mkStreamIterator(testSize, offset(testSize, constant(0))),
}, logproto.FORWARD),
generator: constant(0),
length: 2 * testSize,
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
for i := int64(0); i < tc.length; i++ {
Expand Down Expand Up @@ -85,11 +96,17 @@ func identity(i int64) logproto.Entry {
}
}

func offset(j int64) generator {
func offset(j int64, g generator) generator {
return func(i int64) logproto.Entry {
return g(i + j)
}
}

func constant(t int64) generator {
return func(i int64) logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(i+j, 0),
Line: fmt.Sprintf("%d", i+j),
Timestamp: time.Unix(t, 0),
Line: fmt.Sprintf("%d", i),
}
}
}
Expand Down

0 comments on commit db36b3c

Please sign in to comment.