Skip to content

Commit

Permalink
datastore: completely overhaul query execution
Browse files Browse the repository at this point in the history
This change unifies and simplifies the logic for GetAll, Count and
Run/Next. Added integration tests to check the correctness of queries
across large datasets, including those with Limit and/or Offset
specified, as well as cursors.

Deletes the ugly callNext func. Phew.

Fixes googleapis#268.

Change-Id: I39cb89ef198f676a6f4987afa72442c6760257a9
Reviewed-on: https://code-review.googlesource.com/4911
Reviewed-by: Michael McGreevy <[email protected]>
  • Loading branch information
okdave committed Jun 6, 2016
1 parent f44df7a commit 0bf7a07
Show file tree
Hide file tree
Showing 2 changed files with 226 additions and 124 deletions.
144 changes: 122 additions & 22 deletions datastore/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -378,7 +379,7 @@ func TestLargeQuery(t *testing.T) {
keys := make([]*Key, 0, n)
for i := 0; i < n; i++ {
children = append(children, &SQChild{I: i, T: now, U: now})
keys = append(keys, NewIncompleteKey(ctx, "LQChild", parent))
keys = append(keys, NewIncompleteKey(ctx, "SQChild", parent))
}

// Store using PutMulti in batches.
Expand All @@ -400,31 +401,130 @@ func TestLargeQuery(t *testing.T) {
}()
}

// Load all the entities.
q := NewQuery("LQChild").Ancestor(parent).Filter("T=", now)
var got []SQChild
_, err := client.GetAll(ctx, q, &got)
if err != nil {
t.Fatalf("client.GetAll: %v", err)
}
q := NewQuery("SQChild").Ancestor(parent).Filter("T=", now).Order("I")

if len(got) != n {
t.Errorf("GetAll returned %d entities, want %d", len(got), n)
}
sort.Sort(byI(got))
for i, child := range got {
if child.I != i {
// Only print first mismatch.
t.Fatalf("got[%d].I = %d; want %d", i, child.I, i)
}
// Wait group to allow us to run query tests in parallel below.
var wg sync.WaitGroup

// Check we get the expected count and results for various limits/offsets.
queryTests := []struct {
limit, offset, want int
}{
// Just limit.
{limit: 0, want: 0},
{limit: 100, want: 100},
{limit: 501, want: 501},
{limit: n, want: n},
{limit: n * 2, want: n},
{limit: -1, want: n},
// Just offset.
{limit: -1, offset: 100, want: n - 100},
{limit: -1, offset: 500, want: n - 500},
{limit: -1, offset: n, want: 0},
// Limit and offset.
{limit: 100, offset: 100, want: 100},
{limit: 1000, offset: 100, want: n - 100},
{limit: 500, offset: 500, want: n - 500},
}
for _, tt := range queryTests {
q := q.Limit(tt.limit).Offset(tt.offset)
wg.Add(1)

go func(limit, offset, want int) {
defer wg.Done()
// Check Count returns the expected number of results.
count, err := client.Count(ctx, q)
if err != nil {
t.Errorf("client.Count(limit=%d offset=%d): %v", limit, offset, err)
return
}
if count != want {
t.Errorf("Count(limit=%d offset=%d) returned %d, want %d", limit, offset, count, want)
}

var got []SQChild
_, err = client.GetAll(ctx, q, &got)
if err != nil {
t.Errorf("client.GetAll(limit=%d offset=%d): %v", limit, offset, err)
return
}
if len(got) != want {
t.Errorf("GetAll(limit=%d offset=%d) returned %d, want %d", limit, offset, len(got), want)
}
for i, child := range got {
if got, want := child.I, i+offset; got != want {
t.Errorf("GetAll(limit=%d offset=%d) got[%d].I == %d; want %d", limit, got, want)
break
}
}
}(tt.limit, tt.offset, tt.want)
}
}

type byI []SQChild
// Also check iterator cursor behaviour.
cursorTests := []struct {
limit, offset int // Query limit and offset.
count int // The number of times to call "next"
want int // The I value of the desired element, -1 for "Done".
}{
// No limits.
{count: 5, limit: -1, want: 5},
{count: 500, limit: -1, want: 500},
{count: 1000, limit: -1, want: -1}, // No more results.
// Limits.
{count: 5, limit: 5, want: 5},
{count: 500, limit: 5, want: 5},
{count: 1000, limit: 1000, want: -1}, // No more results.
// Offsets.
{count: 0, offset: 5, limit: -1, want: 5},
{count: 5, offset: 5, limit: -1, want: 10},
{count: 200, offset: 500, limit: -1, want: 700},
{count: 200, offset: 1000, limit: -1, want: -1}, // No more results.
}
for _, tt := range cursorTests {
wg.Add(1)

go func(count, limit, offset, want int) {
defer wg.Done()

// Run iterator through count calls to Next.
it := client.Run(ctx, q.Limit(limit).Offset(offset).KeysOnly())
for i := 0; i < count; i++ {
_, err := it.Next(nil)
if err == Done {
break
}
if err != nil {
t.Errorf("count=%d, limit=%d, offset=%d: it.Next failed at i=%d", count, limit, offset, i)
return
}
}

// Grab the cursor.
cursor, err := it.Cursor()
if err != nil {
t.Errorf("count=%d, limit=%d, offset=%d: it.Cursor: %v", count, limit, offset, err)
return
}

func (b byI) Len() int { return len(b) }
func (b byI) Less(x, y int) bool { return b[x].I < b[y].I }
func (b byI) Swap(x, y int) { b[x], b[y] = b[y], b[x] }
// Make a request for the next element.
it = client.Run(ctx, q.Limit(1).Start(cursor))
var entity SQChild
_, err = it.Next(&entity)
switch {
case want == -1:
if err != Done {
t.Errorf("count=%d, limit=%d, offset=%d: it.Next from cursor %v, want Done", count, limit, offset, err)
}
case err != nil:
t.Errorf("count=%d, limit=%d, offset=%d: it.Next from cursor: %v, want nil", count, limit, offset, err)
case entity.I != want:
t.Errorf("count=%d, limit=%d, offset=%d: got.I = %d, want %d", count, limit, offset, entity.I, want)
}
}(tt.count, tt.limit, tt.offset, tt.want)
}

wg.Wait()
}

func TestEventualConsistency(t *testing.T) {
if testing.Short() {
Expand Down
Loading

0 comments on commit 0bf7a07

Please sign in to comment.