Skip to content

Commit

Permalink
Merge pull request kubernetes#22384 from janetkuo/skip-not-found-label
Browse files Browse the repository at this point in the history
When retrying updating RSes and pods, ignore the not found error
  • Loading branch information
bgrant0607 committed Mar 3, 2016
2 parents de72b6b + 75e5708 commit ea579d5
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 70 deletions.
110 changes: 40 additions & 70 deletions pkg/util/deployment/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/integer"
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
rsutil "k8s.io/kubernetes/pkg/util/replicaset"
"k8s.io/kubernetes/pkg/util/wait"
)

Expand Down Expand Up @@ -186,21 +185,28 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf
ObjectMeta: meta,
Spec: rs.Spec.Template.Spec,
}))
rsUpdated := false
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
if len(updatedRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
})
if err != nil {
return nil, fmt.Errorf("error updating rs %s pod template label with template hash: %v", updatedRS.Name, err)
return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for rs %s generation %d observed by controller: %v", updatedRS.Name, updatedRS.Generation, err)
if rsUpdated {
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
}
}
glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return &rs, nil
}
glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash)
}

// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
Expand All @@ -213,20 +219,28 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf
if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
}
if err = labelPodsWithHash(podList, c, namespace, hash); err != nil {
allPodsLabeled := false
if allPodsLabeled, err = labelPodsWithHash(podList, updatedRS, c, namespace, hash); err != nil {
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
}
glog.V(4).Infof("Labeled rs %s's pods with hash %s.", rs.Name, hash)
// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
// Return here and retry in the next sync loop.
if !allPodsLabeled {
return updatedRS, nil
}

// 3. Update rs label and selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
return nil, fmt.Errorf("error updating rs %s label and selector with template hash: %v", updatedRS.Name, err)
return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
if rsUpdated {
glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
}
glog.V(4).Infof("Updated rs %s's selector and label with hash %s.", rs.Name, hash)
// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.

// TODO: look for orphaned pods and label them in the background somewhere else periodically

Expand All @@ -244,70 +258,26 @@ func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, na
}

// labelPodsWithHash labels all pods in the given podList with the new hash label.
func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error {
// The returned bool value can be used to tell if all pods are actually labeled.
func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) {
allPodsLabeled := true
for _, pod := range podList.Items {
// Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
if _, err := updatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) {
if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) {
podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
return fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
} else if podUpdated {
glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error.
// Then we can't say all pods are labeled
allPodsLabeled = false
}
glog.V(4).Infof("Labeled pod %s with hash %s.", pod.Name, hash)
}
}
return nil
}

// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet)

func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) {
var err error
oldRs := rs
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
rs, err = rsClient.Get(oldRs.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(rs)
if rs, err = rsClient.Update(rs); err == nil {
// Update successful.
return true, nil
}
// Update could have failed due to conflict error. Try again.
return false, nil
})
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return rs, err
}

type updatePodFunc func(pod *api.Pod)

func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
var err error
oldPod := pod
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
pod, err = podClient.Get(oldPod.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(pod)
if pod, err = podClient.Update(pod); err == nil {
// Update successful.
return true, nil
}
// Update could have failed due to conflict error. Try again.
return false, nil
})
if err == wait.ErrWaitTimeout {
return nil, fmt.Errorf("timed out trying to update pod: %+v", oldPod)
}
return pod, err
return allPodsLabeled, nil
}

// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
Expand Down
47 changes: 47 additions & 0 deletions pkg/util/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,61 @@ limitations under the License.
package pod

import (
"fmt"
"hash/adler32"
"time"

"github.com/golang/glog"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
hashutil "k8s.io/kubernetes/pkg/util/hash"
"k8s.io/kubernetes/pkg/util/wait"
)

func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 {
podTemplateSpecHasher := adler32.New()
hashutil.DeepHashObject(podTemplateSpecHasher, template)
return podTemplateSpecHasher.Sum32()
}

// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updatePodFunc func(pod *api.Pod)

// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored.
// The returned bool value can be used to tell if the pod is actually updated.
func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, bool, error) {
var err error
var podUpdated bool
oldPod := pod
if err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
pod, err = podClient.Get(oldPod.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
// TODO: add precondition for update
applyUpdate(pod)
if pod, err = podClient.Update(pod); err == nil {
// Update successful.
return true, nil
}
// TODO: don't retry on perm-failed errors and handle them gracefully
// Update could have failed due to conflict error. Try again.
return false, nil
}); err == nil {
// When there's no error, we've updated this pod.
podUpdated = true
}

if err == wait.ErrWaitTimeout {
err = fmt.Errorf("timed out trying to update pod: %+v", oldPod)
}
if errors.IsNotFound(err) {
glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name)
err = nil
}
return pod, podUpdated, err
}
71 changes: 71 additions & 0 deletions pkg/util/replicaset/replicaset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package replicaset

import (
"fmt"
"time"

"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/apis/extensions"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/util/wait"
)

// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet)

// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored.
// The returned bool value can be used to tell if the RS is actually updated.
func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) {
var err error
var rsUpdated bool
oldRs := rs
if err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
rs, err = rsClient.Get(oldRs.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
// TODO: add precondition for update
applyUpdate(rs)
if rs, err = rsClient.Update(rs); err == nil {
// Update successful.
return true, nil
}
// TODO: don't retry on perm-failed errors and handle them gracefully
// Update could have failed due to conflict error. Try again.
return false, nil
}); err == nil {
// When there's no error, we've updated this RS.
rsUpdated = true
}

if err == wait.ErrWaitTimeout {
err = fmt.Errorf("timed out trying to update RS: %+v", oldRs)
}
// Ignore the RS not found error, but the RS isn't updated.
if errors.IsNotFound(err) {
glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name)
err = nil
}
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return rs, rsUpdated, err
}

0 comments on commit ea579d5

Please sign in to comment.