Skip to content

Commit

Permalink
Fix watches in StatefulSet e2e tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tnozicka committed Apr 23, 2020
1 parent 3e2ae63 commit 2256991
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions test/e2e/apps/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,19 @@ import (

"github.com/onsi/ginkgo"
"github.com/onsi/gomega"

appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
klabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
watchtools "k8s.io/client-go/tools/watch"
"k8s.io/kubernetes/test/e2e/framework"
e2enode "k8s.io/kubernetes/test/e2e/framework/node"
Expand Down Expand Up @@ -572,8 +576,14 @@ var _ = SIGDescribe("StatefulSet", func() {
*/
framework.ConformanceIt("Scaling should happen in predictable order and halt if any stateful pod is unhealthy [Slow]", func() {
psLabels := klabels.Set(labels)
w := &cache.ListWatch{
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.LabelSelector = psLabels.AsSelector().String()
return f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), options)
},
}
ginkgo.By("Initializing watcher for selector " + psLabels.String())
watcher, err := f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{
pl, err := f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: psLabels.AsSelector().String(),
})
framework.ExpectNoError(err)
Expand Down Expand Up @@ -602,7 +612,7 @@ var _ = SIGDescribe("StatefulSet", func() {
expectedOrder := []string{ssName + "-0", ssName + "-1", ssName + "-2"}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
_, err = watchtools.Until(ctx, pl.ResourceVersion, w, func(event watch.Event) (bool, error) {
if event.Type != watch.Added {
return false, nil
}
Expand All @@ -616,7 +626,7 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ExpectNoError(err)

ginkgo.By("Scale down will halt with unhealthy stateful pod")
watcher, err = f.ClientSet.CoreV1().Pods(ns).Watch(context.TODO(), metav1.ListOptions{
pl, err = f.ClientSet.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
LabelSelector: psLabels.AsSelector().String(),
})
framework.ExpectNoError(err)
Expand All @@ -635,7 +645,7 @@ var _ = SIGDescribe("StatefulSet", func() {
expectedOrder = []string{ssName + "-2", ssName + "-1", ssName + "-0"}
ctx, cancel = watchtools.ContextWithOptionalTimeout(context.Background(), statefulSetTimeout)
defer cancel()
_, err = watchtools.UntilWithoutRetry(ctx, watcher, func(event watch.Event) (bool, error) {
_, err = watchtools.Until(ctx, pl.ResourceVersion, w, func(event watch.Event) (bool, error) {
if event.Type != watch.Deleted {
return false, nil
}
Expand Down Expand Up @@ -738,12 +748,21 @@ var _ = SIGDescribe("StatefulSet", func() {

var initialStatefulPodUID types.UID
ginkgo.By("Waiting until stateful pod " + statefulPodName + " will be recreated and deleted at least once in namespace " + f.Namespace.Name)
w, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), metav1.SingleObject(metav1.ObjectMeta{Name: statefulPodName}))
framework.ExpectNoError(err)
fieldSelector := fields.OneTermEqualSelector("metadata.name", statefulPodName).String()
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
options.FieldSelector = fieldSelector
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
options.FieldSelector = fieldSelector
return f.ClientSet.CoreV1().Pods(f.Namespace.Name).Watch(context.TODO(), options)
},
}
ctx, cancel := watchtools.ContextWithOptionalTimeout(context.Background(), statefulPodTimeout)
defer cancel()
// we need to get UID from pod in any state and wait until stateful set controller will remove pod at least once
_, err = watchtools.UntilWithoutRetry(ctx, w, func(event watch.Event) (bool, error) {
_, err = watchtools.ListWatchUntil(ctx, lw, func(event watch.Event) (bool, error) {
pod := event.Object.(*v1.Pod)
switch event.Type {
case watch.Deleted:
Expand Down

0 comments on commit 2256991

Please sign in to comment.