Skip to content

Commit

Permalink
Fix DaemonSet surging with minReadySeconds (openkruise#1014)
Browse files Browse the repository at this point in the history
Signed-off-by: FillZpp <[email protected]>
  • Loading branch information
FillZpp authored Jul 2, 2022
1 parent 69f33a1 commit fb5fc11
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 12 deletions.
4 changes: 1 addition & 3 deletions apis/apps/defaults/v1alpha1.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,11 +283,9 @@ func SetDefaultsDaemonSet(obj *v1alpha1.DaemonSet) {
obj.Spec.UpdateStrategy.RollingUpdate.Type = v1alpha1.StandardRollingUpdateType
}

if obj.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil {
if obj.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable == nil && obj.Spec.UpdateStrategy.RollingUpdate.MaxSurge == nil {
maxUnavailable := intstr.FromInt(1)
obj.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable = &maxUnavailable
}
if obj.Spec.UpdateStrategy.RollingUpdate.MaxSurge == nil {
MaxSurge := intstr.FromInt(0)
obj.Spec.UpdateStrategy.RollingUpdate.MaxSurge = &MaxSurge
}
Expand Down
8 changes: 5 additions & 3 deletions apis/apps/v1alpha1/daemonset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,11 @@ type DaemonSetStatus struct {
// +kubebuilder:object:root=true
// +kubebuilder:subresource:status
// +kubebuilder:resource:shortName=daemon;ads
// +kubebuilder:printcolumn:name="DesiredNumber",type="integer",JSONPath=".status.desiredNumberScheduled",description="The desired number of pods."
// +kubebuilder:printcolumn:name="CurrentNumber",type="integer",JSONPath=".status.currentNumberScheduled",description="The current number of pods."
// +kubebuilder:printcolumn:name="UpdatedNumberScheduled",type="integer",JSONPath=".status.updatedNumberScheduled",description="The updated number of pods."
// +kubebuilder:printcolumn:name="DESIRED",type="integer",JSONPath=".status.desiredNumberScheduled",description="The desired number of pods."
// +kubebuilder:printcolumn:name="CURRENT",type="integer",JSONPath=".status.currentNumberScheduled",description="The current number of pods."
// +kubebuilder:printcolumn:name="READY",type="integer",JSONPath=".status.numberReady",description="The ready number of pods."
// +kubebuilder:printcolumn:name="UP-TO-DATE",type="integer",JSONPath=".status.updatedNumberScheduled",description="The updated number of pods."
// +kubebuilder:printcolumn:name="AVAILABLE",type="integer",JSONPath=".status.numberAvailable",description="The updated number of pods."
// +kubebuilder:printcolumn:name="AGE",type="date",JSONPath=".metadata.creationTimestamp",description="CreationTimestamp is a timestamp representing the server time when this object was created. It is not guaranteed to be set in happens-before order across separate operations. Clients may not set this value. It is represented in RFC3339 form and is in UTC."
// +kubebuilder:printcolumn:name="CONTAINERS",type="string",priority=1,JSONPath=".spec.template.spec.containers[*].name",description="The containers of currently daemonset."
// +kubebuilder:printcolumn:name="IMAGES",type="string",priority=1,JSONPath=".spec.template.spec.containers[*].image",description="The images of currently advanced daemonset."
Expand Down
14 changes: 11 additions & 3 deletions config/crd/bases/apps.kruise.io_daemonsets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,23 @@ spec:
- additionalPrinterColumns:
- description: The desired number of pods.
jsonPath: .status.desiredNumberScheduled
name: DesiredNumber
name: DESIRED
type: integer
- description: The current number of pods.
jsonPath: .status.currentNumberScheduled
name: CurrentNumber
name: CURRENT
type: integer
- description: The ready number of pods.
jsonPath: .status.numberReady
name: READY
type: integer
- description: The updated number of pods.
jsonPath: .status.updatedNumberScheduled
name: UpdatedNumberScheduled
name: UP-TO-DATE
type: integer
- description: The updated number of pods.
jsonPath: .status.numberAvailable
name: AVAILABLE
type: integer
- description: CreationTimestamp is a timestamp representing the server time when
this object was created. It is not guaranteed to be set in happens-before
Expand Down
19 changes: 16 additions & 3 deletions pkg/controller/daemonset/daemonset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,13 +290,23 @@ type ReconcileDaemonSet struct {

// Reconcile reads that state of the cluster for a DaemonSet object and makes changes based on the state read
// and what is in the DaemonSet.Spec
func (dsc *ReconcileDaemonSet) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
func (dsc *ReconcileDaemonSet) Reconcile(ctx context.Context, request reconcile.Request) (res reconcile.Result, retErr error) {
onceBackoffGC.Do(func() {
go wait.Until(dsc.failedPodsBackoff.GC, BackoffGCInterval, ctx.Done())
})
startTime := dsc.failedPodsBackoff.Clock.Now()
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing DaemonSet %q (%v)", request.String(), dsc.failedPodsBackoff.Clock.Now().Sub(startTime))
if retErr == nil {
if res.Requeue || res.RequeueAfter > 0 {
klog.Infof("Finished syncing DaemonSet %s, cost %v, result: %v", request, time.Since(startTime), res)
} else {
klog.Infof("Finished syncing DaemonSet %s, cost %v", request, time.Since(startTime))
}
} else {
klog.Errorf("Failed syncing DaemonSet %s: %v", request, retErr)
}
// clean the duration store
_ = durationStore.Pop(request.String())
}()

err := dsc.syncDaemonSet(request)
Expand Down Expand Up @@ -561,6 +571,7 @@ func (dsc *ReconcileDaemonSet) storeDaemonSetStatus(ds *appsv1alpha1.DaemonSet,
toUpdate.Status.DaemonSetHash = hash

if _, updateErr = dsClient.UpdateStatus(context.TODO(), toUpdate, metav1.UpdateOptions{}); updateErr == nil {
klog.Infof("Updated DaemonSet %s/%s status to %v", ds.Namespace, ds.Name, kruiseutil.DumpJSON(toUpdate.Status))
return nil
}

Expand Down Expand Up @@ -888,6 +899,8 @@ func (dsc *ReconcileDaemonSet) podsShouldBeOnNode(
case podutil.IsPodAvailable(oldestNewPod, ds.Spec.MinReadySeconds, metav1.Time{Time: dsc.failedPodsBackoff.Clock.Now()}):
klog.V(5).Infof("Pod %s/%s from daemonset %s is now ready and will replace older pod %s", oldestNewPod.Namespace, oldestNewPod.Name, ds.Name, oldestOldPod.Name)
podsToDelete = append(podsToDelete, oldestOldPod.Name)
case podutil.IsPodReady(oldestNewPod) && ds.Spec.MinReadySeconds > 0:
durationStore.Push(keyFunc(ds), podAvailableWaitingTime(oldestNewPod, ds.Spec.MinReadySeconds, dsc.failedPodsBackoff.Clock.Now()))
}
}

Expand Down
10 changes: 10 additions & 0 deletions pkg/controller/daemonset/daemonset_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"sort"
"sync"
"time"

appspub "github.com/openkruise/kruise/apis/apps/pub"
appsv1alpha1 "github.com/openkruise/kruise/apis/apps/v1alpha1"
Expand Down Expand Up @@ -357,3 +358,12 @@ func isPodPreDeleting(pod *corev1.Pod) bool {
func isPodNilOrPreDeleting(pod *corev1.Pod) bool {
return pod == nil || isPodPreDeleting(pod)
}

func podAvailableWaitingTime(pod *corev1.Pod, minReadySeconds int32, now time.Time) time.Duration {
c := podutil.GetPodReadyCondition(pod.Status)
minReadySecondsDuration := time.Duration(minReadySeconds) * time.Second
if c == nil || c.LastTransitionTime.IsZero() {
return minReadySecondsDuration
}
return minReadySecondsDuration - now.Sub(c.LastTransitionTime.Time)
}
91 changes: 91 additions & 0 deletions test/e2e/apps/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
daemonutil "k8s.io/kubernetes/pkg/controller/daemon/util"
)

Expand Down Expand Up @@ -406,5 +408,94 @@ var _ = SIGDescribe("DaemonSet", func() {
}, time.Second*60, time.Second).Should(gomega.Equal(1))

})

framework.ConformanceIt("should successfully surging update daemonset with minReadySeconds", func() {
label := map[string]string{framework.DaemonSetNameLabel: dsName}

ginkgo.By(fmt.Sprintf("Creating DaemonSet %q", dsName))
maxSurge := intstr.FromString("100%")
maxUnavailable := intstr.FromInt(0)
ds := tester.NewDaemonSet(dsName, label, WebserverImage, appsv1alpha1.DaemonSetUpdateStrategy{
Type: appsv1alpha1.RollingUpdateDaemonSetStrategyType,
RollingUpdate: &appsv1alpha1.RollingUpdateDaemonSet{
MaxSurge: &maxSurge,
MaxUnavailable: &maxUnavailable,
},
})
ds.Spec.MinReadySeconds = 10
ds, err := tester.CreateDaemonSet(ds)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Check that daemon pods launch on every node of the cluster.")
err = wait.PollImmediate(framework.DaemonSetRetryPeriod, framework.DaemonSetRetryTimeout, tester.CheckRunningOnAllNodes(ds))

gomega.Expect(err).NotTo(gomega.HaveOccurred(), "error waiting for daemon pod to start")
err = tester.CheckDaemonStatus(dsName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ds, err = tester.GetDaemonSet(dsName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

ginkgo.By("Get all old daemon pods on nodes")
oldNodeToDaemonPods, err := tester.GetNodesToDaemonPods(label)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(oldNodeToDaemonPods).To(gomega.HaveLen(int(ds.Status.DesiredNumberScheduled)))

nodeNameList := sets.NewString()
for nodeName, pods := range oldNodeToDaemonPods {
nodeNameList.Insert(nodeName)
gomega.Expect(pods).To(gomega.HaveLen(1))
gomega.Expect(podutil.IsPodReady(pods[0])).To(gomega.BeTrue())
}

//change pods container image
err = tester.UpdateDaemonSet(ds.Name, func(ds *appsv1alpha1.DaemonSet) {
ds.Spec.Template.Spec.Containers[0].Image = NewWebserverImage
})
gomega.Expect(err).NotTo(gomega.HaveOccurred(), "error to update daemon")

ginkgo.By("Check all surging Pods created")
err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
nodeToDaemonPods, err := tester.GetNodesToDaemonPods(label)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(nodeToDaemonPods).To(gomega.HaveLen(int(ds.Status.DesiredNumberScheduled)))

for _, nodeName := range nodeNameList.List() {
pods := nodeToDaemonPods[nodeName]
if len(pods) < 2 {
continue
}

for _, pod := range pods {
if pod.Name == oldNodeToDaemonPods[nodeName][0].Name {
gomega.Expect(pod.DeletionTimestamp).To(gomega.BeNil())
}
}
nodeNameList.Delete(nodeName)
}
return nodeNameList.Len() == 0, nil
})

ginkgo.By("Check all old Pods deleted")
err = wait.PollImmediate(time.Second, time.Minute, func() (done bool, err error) {
nodeToDaemonPods, err := tester.GetNodesToDaemonPods(label)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(nodeToDaemonPods).To(gomega.HaveLen(int(ds.Status.DesiredNumberScheduled)))

finished := true
for nodeName, pods := range nodeToDaemonPods {
if len(pods) != 1 {
finished = false
continue
}

gomega.Expect(pods[0].Name).NotTo(gomega.Equal(oldNodeToDaemonPods[nodeName][0].Name))
gomega.Expect(podutil.IsPodReady(pods[0])).To(gomega.BeTrue())
c := podutil.GetPodReadyCondition(pods[0].Status)
gomega.Expect(int32(time.Since(c.LastTransitionTime.Time) / time.Second)).To(gomega.BeNumerically(">", ds.Spec.MinReadySeconds))
}
return finished, nil
})
})
})
})
21 changes: 21 additions & 0 deletions test/e2e/framework/daemonset_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package framework
import (
"context"
"fmt"

"reflect"
"strings"
"time"
Expand All @@ -22,6 +23,7 @@ import (
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/util/retry"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
daemonutil "k8s.io/kubernetes/pkg/controller/daemon/util"
utilpointer "k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -419,3 +421,22 @@ func (t *DaemonSetTester) SortPodNames(podList *v1.PodList) []string {
}
return names.List()
}

func (t *DaemonSetTester) GetNodesToDaemonPods(label map[string]string) (map[string][]*v1.Pod, error) {
podList, err := t.ListDaemonPods(label)
if err != nil {
return nil, err
}
// Group Pods by Node name.
nodeToDaemonPods := make(map[string][]*v1.Pod)
for i := range podList.Items {
pod := &podList.Items[i]
nodeName, err := daemonutil.GetTargetNodeName(pod)
if err != nil {
continue
}
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], pod)
}

return nodeToDaemonPods, nil
}

0 comments on commit fb5fc11

Please sign in to comment.