Skip to content

Commit

Permalink
etcd3 store: retry w/live object on conflict
Browse files Browse the repository at this point in the history
In GuaranteedUpdate, if it was called with a suggestion (e.g. via the
watch cache), and the suggested object is stale, perform a live lookup
and then retry the update.

Signed-off-by: Andy Goldstein <[email protected]>
  • Loading branch information
ncdc committed Sep 15, 2017
1 parent 9aef242 commit bf33df1
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 10 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ go_test(
"//vendor/github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes:go_default_library",
"//vendor/github.com/coreos/etcd/integration:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/fields:go_default_library",
Expand Down
33 changes: 23 additions & 10 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/golang/glog"
"golang.org/x/net/context"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -271,6 +272,14 @@ func (s *store) GuaranteedUpdate(
}
key = path.Join(s.pathPrefix, key)

getCurrentState := func() (*objState, error) {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return nil, err
}
return s.getState(getResp, key, v, ignoreNotFound)
}

var origState *objState
var mustCheckData bool
if len(suggestion) == 1 && suggestion[0] != nil {
Expand All @@ -280,11 +289,7 @@ func (s *store) GuaranteedUpdate(
}
mustCheckData = true
} else {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
origState, err = s.getState(getResp, key, v, ignoreNotFound)
origState, err = getCurrentState()
if err != nil {
return err
}
Expand All @@ -299,6 +304,18 @@ func (s *store) GuaranteedUpdate(

ret, ttl, err := s.updateState(origState, tryUpdate)
if err != nil {
// It's possible we were working with stale data
if mustCheckData && apierrors.IsConflict(err) {
// Actually fetch
origState, err = getCurrentState()
if err != nil {
return err
}
mustCheckData = false
// Retry
continue
}

return err
}

Expand All @@ -311,11 +328,7 @@ func (s *store) GuaranteedUpdate(
// etcd in order to be sure the data in the store is equivalent to
// our desired serialization
if mustCheckData {
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
if err != nil {
return err
}
origState, err = s.getState(getResp, key, v, ignoreNotFound)
origState, err = getCurrentState()
if err != nil {
return err
}
Expand Down
49 changes: 49 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/storage/etcd3/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
Expand All @@ -29,6 +30,7 @@ import (
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
apierrors "k8s.io/apimachinery/pkg/api/errors"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -584,6 +586,53 @@ func TestGuaranteedUpdateWithConflict(t *testing.T) {
}
}

func TestGuaranteedUpdateWithSuggestionAndConflict(t *testing.T) {
ctx, store, cluster := testSetup(t)
defer cluster.Terminate(t)
key, originalPod := testPropogateStore(ctx, t, store, &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}})

// First, update without a suggestion so originalPod is outdated
updatedPod := &example.Pod{}
err := store.GuaranteedUpdate(ctx, key, updatedPod, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*example.Pod)
pod.Name = "foo-2"
return pod, nil
}),
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

// Second, update using the outdated originalPod as the suggestion. Return a conflict error when
// passed originalPod, and make sure that SimpleUpdate is called a second time after a live lookup
// with the value of updatedPod.
sawConflict := false
updatedPod2 := &example.Pod{}
err = store.GuaranteedUpdate(ctx, key, updatedPod2, false, nil,
storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) {
pod := obj.(*example.Pod)
if pod.Name != "foo-2" {
if sawConflict {
t.Fatalf("unexpected second conflict")
}
sawConflict = true
// simulated stale object - return a conflict
return nil, apierrors.NewConflict(example.SchemeGroupVersion.WithResource("pods").GroupResource(), "name", errors.New("foo"))
}
pod.Name = "foo-3"
return pod, nil
}),
originalPod,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if updatedPod2.Name != "foo-3" {
t.Errorf("unexpected pod name: %q", updatedPod2.Name)
}
}

func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
Expand Down

0 comments on commit bf33df1

Please sign in to comment.