Skip to content

Commit

Permalink
Merge pull request kubernetes#85272 from mm4tt/pager_fix
Browse files Browse the repository at this point in the history
pager.go: don't set ResourceVersion on subsequent List calls
  • Loading branch information
k8s-ci-robot authored Nov 15, 2019
2 parents 8548a25 + 977ca43 commit 452c8c9
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 25 deletions.
51 changes: 27 additions & 24 deletions staging/src/k8s.io/client-go/tools/cache/reflector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,27 +572,26 @@ func TestReflectorFullListIfExpired(t *testing.T) {
for i := 0; i < 8; i++ {
pods[i] = v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("pod-%d", i), ResourceVersion: fmt.Sprintf("%d", i)}}
}
switch options.ResourceVersion {
case "0":
rvContinueLimit := func(rv, c string, l int64) metav1.ListOptions {
return metav1.ListOptions{ResourceVersion: rv, Continue: c, Limit: l}
}
switch rvContinueLimit(options.ResourceVersion, options.Continue, options.Limit) {
// initial limited list
case rvContinueLimit("0", "", 4):
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "10"}, Items: pods[0:4]}, nil
case "10":
switch options.Limit {
case 4:
switch options.Continue {
case "":
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
case "C1":
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
default:
t.Fatalf("Unrecognized Continue: %s", options.Continue)
}
case 0:
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized Limit: %d", options.Limit)
}
// first page of the rv=10 list
case rvContinueLimit("10", "", 4):
return &v1.PodList{ListMeta: metav1.ListMeta{Continue: "C1", ResourceVersion: "11"}, Items: pods[0:4]}, nil
// second page of the above list
case rvContinueLimit("", "C1", 4):
return nil, apierrs.NewResourceExpired("The resourceVersion for the provided watch is too old.")
// rv=10 unlimited list
case rvContinueLimit("10", "", 0):
return &v1.PodList{ListMeta: metav1.ListMeta{ResourceVersion: "11"}, Items: pods[0:8]}, nil
default:
t.Fatalf("Unrecognized ResourceVersion: %s", options.ResourceVersion)
err := fmt.Errorf("unexpected list options: %#v", options)
t.Error(err)
return nil, err
}
return nil, nil
},
Expand All @@ -601,25 +600,29 @@ func TestReflectorFullListIfExpired(t *testing.T) {
r.WatchListPageSize = 4

// Initial list should use RV=0
r.ListAndWatch(stopCh)
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}

results := s.List()
if len(results) != 4 {
t.Errorf("Expected 4 results, got %d", len(results))
}

// relist should use lastSyncResourceVersions (RV=10) and since second page of RV=10 is expired, it should full list with RV=10
// relist should use lastSyncResourceVersions (RV=10) and since second page of that expired, it should full list with RV=10
stopCh = make(chan struct{})
r.ListAndWatch(stopCh)
if err := r.ListAndWatch(stopCh); err != nil {
t.Fatal(err)
}

results = s.List()
if len(results) != 8 {
t.Errorf("Expected 8 results, got %d", len(results))
}

expectedRVs := []string{"0", "10", "10", "10"}
expectedRVs := []string{"0", "10", "", "10"}
if !reflect.DeepEqual(listCallRVs, expectedRVs) {
t.Errorf("Expected series of list calls with resource versiosn of %v but got: %v", expectedRVs, listCallRVs)
t.Errorf("Expected series of list calls with resource versiosn of %#v but got: %#v", expectedRVs, listCallRVs)
}
}

Expand Down
9 changes: 8 additions & 1 deletion staging/src/k8s.io/client-go/tools/pager/pager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
if options.Limit == 0 {
options.Limit = p.PageSize
}
requestedResourceVersion := options.ResourceVersion
var list *metainternalversion.List
for {
select {
Expand All @@ -94,9 +95,11 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti
if !errors.IsResourceExpired(err) || !p.FullListIfExpired || options.Continue == "" {
return nil, err
}
// the list expired while we were processing, fall back to a full list
// the list expired while we were processing, fall back to a full list at
// the requested ResourceVersion.
options.Limit = 0
options.Continue = ""
options.ResourceVersion = requestedResourceVersion
return p.PageFn(ctx, options)
}
m, err := meta.ListAccessor(obj)
Expand Down Expand Up @@ -129,6 +132,10 @@ func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runti

// set the next loop up
options.Continue = m.GetContinue()
// Clear the ResourceVersion on the subsequent List calls to avoid the
// `specifying resource version is not allowed when using continue` error.
// See https://github.com/kubernetes/kubernetes/issues/85221#issuecomment-553748143.
options.ResourceVersion = ""
}
}

Expand Down
10 changes: 10 additions & 0 deletions staging/src/k8s.io/client-go/tools/pager/pager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (
p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options)
return nil, fmt.Errorf("invariant violated")
}
if options.Continue != "" && options.ResourceVersion != "" {
p.t.Errorf("invariant violated, specifying resource version (%s) is not allowed when using continue (%s).", options.ResourceVersion, options.Continue)
return nil, fmt.Errorf("invariant violated")
}
var list metainternalversion.List
total := options.Limit
if total == 0 {
Expand Down Expand Up @@ -181,6 +185,12 @@ func TestListPager_List(t *testing.T) {
args: args{},
want: list(21, "rv:20"),
},
{
name: "two pages with resourceVersion",
fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList},
args: args{options: metav1.ListOptions{ResourceVersion: "rv:10"}},
want: list(11, "rv:20"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down

0 comments on commit 452c8c9

Please sign in to comment.