Skip to content

Commit

Permalink
Test & fix queries return multiple streams. (grafana#341)
Browse files Browse the repository at this point in the history
Signed-off-by: Tom Wilkie <[email protected]>
  • Loading branch information
tomwilkie authored Feb 18, 2019
1 parent 02ff6bc commit b115835
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 11 deletions.
46 changes: 44 additions & 2 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
"google.golang.org/grpc"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
Expand All @@ -36,7 +37,10 @@ func TestIngester(t *testing.T) {
req := logproto.PushRequest{
Streams: []*logproto.Stream{
{
Labels: `{foo="bar"}`,
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}
Expand All @@ -45,10 +49,33 @@ func TestIngester(t *testing.T) {
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: fmt.Sprintf("line %d", i),
})
}

_, err = i.Push(user.InjectOrgID(context.Background(), "test"), &req)
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)

fmt.Println("hehe")

result := mockQuerierServer{
ctx: ctx,
}
err = i.Query(&logproto.QueryRequest{
Query: `{foo="bar"}`,
Limit: 100,
Start: time.Unix(0, 0),
End: time.Unix(1, 0),
}, &result)
require.NoError(t, err)

require.Len(t, result.resps, 1)
require.Len(t, result.resps[0].Streams, 2)
require.Equal(t, `{foo="bar", bar="baz1"}`, result.resps[0].Streams[0].Labels)
require.Equal(t, `{foo="bar", bar="baz2"}`, result.resps[0].Streams[1].Labels)
}

type mockStore struct {
Expand All @@ -68,3 +95,18 @@ func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.chunks[userid] = append(s.chunks[userid], chunks...)
return nil
}

type mockQuerierServer struct {
ctx context.Context
resps []*logproto.QueryResponse
grpc.ServerStream
}

func (m *mockQuerierServer) Send(resp *logproto.QueryResponse) error {
m.resps = append(m.resps, resp)
return nil
}

func (m *mockQuerierServer) Context() context.Context {
return m.ctx
}
22 changes: 14 additions & 8 deletions pkg/iter/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,23 @@ type iteratorMinHeap struct {
}

func (h iteratorMinHeap) Less(i, j int) bool {
return h.iteratorHeap[i].Entry().Timestamp.Before(h.iteratorHeap[j].Entry().Timestamp)
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
if !t1.Equal(t2) {
return t1.Before(t2)
}
return h.iteratorHeap[i].Labels() < h.iteratorHeap[j].Labels()
}

type iteratorMaxHeap struct {
iteratorHeap
}

func (h iteratorMaxHeap) Less(i, j int) bool {
return h.iteratorHeap[i].Entry().Timestamp.After(h.iteratorHeap[j].Entry().Timestamp)
t1, t2 := h.iteratorHeap[i].Entry().Timestamp, h.iteratorHeap[j].Entry().Timestamp
if !t1.Equal(t2) {
return t1.After(t2)
}
return h.iteratorHeap[i].Labels() > h.iteratorHeap[j].Labels()
}

// heapIterator iterates over a heap of iterators.
Expand Down Expand Up @@ -119,10 +127,6 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) EntryIter
result.requeue(i, false)
}

if len(is) > 0 {
result.currLabels = is[0].Labels()
}

return result
}

Expand Down Expand Up @@ -157,7 +161,7 @@ func (i *heapIterator) Next() bool {
for i.heap.Len() > 0 {
next := i.heap.Peek()
entry := next.Entry()
if len(tuples) > 0 && !tuples[0].Timestamp.Equal(entry.Timestamp) {
if len(tuples) > 0 && (tuples[0].Labels() != next.Labels() || !tuples[0].Timestamp.Equal(entry.Timestamp)) {
break
}

Expand All @@ -170,7 +174,9 @@ func (i *heapIterator) Next() bool {

// Find in entry which occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
i.currEntry = mostCommon(tuples).Entry
t := mostCommon(tuples)
i.currEntry = t.Entry
i.currLabels = t.Labels()

// Requeue the iterators, advancing them if they were consumed.
for j := range tuples {
Expand Down
59 changes: 58 additions & 1 deletion pkg/iter/iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,63 @@ func TestIterator(t *testing.T) {
}
}

func TestIteratorMultipleLabels(t *testing.T) {
for i, tc := range []struct {
iterator EntryIterator
generator generator
length int64
labels func(int64) string
}{
// Test merging with differing labels but same timestamps and values.
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, identity, "{foobar: \"baz1\"}"),
mkStreamIterator(testSize, identity, "{foobar: \"baz2\"}"),
}, logproto.FORWARD),
generator: func(i int64) logproto.Entry {
return identity(i / 2)
},
length: testSize * 2,
labels: func(i int64) string {
if i%2 == 0 {
return "{foobar: \"baz1\"}"
}
return "{foobar: \"baz2\"}"
},
},

// Test merging with differing labels but all the same timestamps and different values.
{
iterator: NewHeapIterator([]EntryIterator{
mkStreamIterator(testSize, constant(0), "{foobar: \"baz1\"}"),
mkStreamIterator(testSize, constant(0), "{foobar: \"baz2\"}"),
}, logproto.FORWARD),
generator: func(i int64) logproto.Entry {
return constant(0)(i % testSize)
},
length: testSize * 2,
labels: func(i int64) string {
if i/testSize == 0 {
return "{foobar: \"baz1\"}"
}
return "{foobar: \"baz2\"}"
},
},
} {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
for i := int64(0); i < tc.length; i++ {
assert.Equal(t, true, tc.iterator.Next())
assert.Equal(t, tc.generator(i), tc.iterator.Entry(), fmt.Sprintln("iteration", i))
assert.Equal(t, tc.labels(i), tc.iterator.Labels(), fmt.Sprintln("iteration", i))
}

assert.Equal(t, false, tc.iterator.Next())
assert.Equal(t, nil, tc.iterator.Error())
assert.NoError(t, tc.iterator.Close())
})
}
}

type generator func(i int64) logproto.Entry

func mkStreamIterator(numEntries int64, f generator, labels string) EntryIterator {
Expand Down Expand Up @@ -136,7 +193,7 @@ func inverse(g generator) generator {
}
}

func TestMostCommont(t *testing.T) {
func TestMostCommon(t *testing.T) {
// First is most common.
tuples := []tuple{
{Entry: logproto.Entry{Line: "a"}},
Expand Down

0 comments on commit b115835

Please sign in to comment.