Skip to content

Commit

Permalink
reconcile/statefulset: fail fast on pod revision mismatch
Browse files Browse the repository at this point in the history
Previously, operator didn't check of pod-revision-hash value after Pod re-creation during StatefulSet rolling upgrade.
It's responsobility of Kubernetes controller-manager to set proper revision-hash based on Revision of StatefulSet.
But sometimes it may fail, due to network delays or performance issues at controller-manager.

 This commit adds revision check and fail fast in this case.
It also removes StatefulSet Update of CurrentRevision field. It must be handeled by controller-manager and operator shouldn't change it.

Related issue:
VictoriaMetrics#1227

Signed-off-by: f41gh7 <[email protected]>
  • Loading branch information
f41gh7 committed Jan 30, 2025
1 parent eb72da1 commit ddafd4a
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 24 deletions.
2 changes: 2 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ aliases:

* Dependency: [vmoperator](https://docs.victoriametrics.com/operator/): Updated default versions for VM apps to v1.110.0 version

* BUGFIX: [vmoperator](https://docs.victoriametrics.com/operator/): properly check `Pod` state during `StatefulSet` rolling upgrade procedure. See [this issue](https://github.com/VictoriaMetrics/operator/issues/1227) for details.

## [v0.52.0](https://github.com/VictoriaMetrics/operator/releases/tag/v0.52.0)

**Release date:** 21 Jan 2025
Expand Down
37 changes: 16 additions & 21 deletions internal/controller/operator/factory/reconcile/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,9 @@ func performRollingUpdateOnSts(ctx context.Context, podMustRecreate bool, rclien
}

stsVersion := sts.Status.UpdateRevision
if stsVersion == "" {
return fmt.Errorf("sts.Status.UpdateRevision is empty. Update cannot be performed. Please check logs of Kubernetes controller-manager or change rollingUpdateStrategy to RollingUpdate")
}
l := logger.WithContext(ctx)
// fast path
if neededPodCount < 1 {
Expand Down Expand Up @@ -218,7 +221,7 @@ func performRollingUpdateOnSts(ctx context.Context, podMustRecreate bool, rclien
// or kubernetes for some reason cannot create pod
// it's better to fail fast
case len(podList.Items) < neededPodCount:
return fmt.Errorf("actual pod count: %d less then needed: %d, possible statefulset misconfiguration", len(podList.Items), neededPodCount)
return fmt.Errorf("actual pod count: %d less than needed: %d, possible statefulset misconfiguration", len(podList.Items), neededPodCount)
}

// first we must ensure, that already updated pods in ready status
Expand Down Expand Up @@ -256,21 +259,15 @@ func performRollingUpdateOnSts(ctx context.Context, podMustRecreate bool, rclien

if !updatedNeeded {
l.Info("no pod needs to be updated")
if sts.Status.UpdateRevision != sts.Status.CurrentRevision {
logger.WithContext(ctx).Info(fmt.Sprintf("update statefulSet.Status.CurrentRevision from revision=%q to desired revision=%q", sts.Status.CurrentRevision, sts.Status.UpdateRevision))
sts.Status.CurrentRevision = sts.Status.UpdateRevision
if err := rclient.Status().Update(ctx, sts); err != nil {
return fmt.Errorf("cannot update sts currentRevision after sts updated finished, err: %w", err)
}
}
return nil
}

l.Info(fmt.Sprintf("discovered already updated pods=%d, pods needed to be update=%d", len(updatedPods), len(podsForUpdate)))
// check updated, by not ready pods
for _, pod := range updatedPods {
l.Info(fmt.Sprintf("checking ready status for already updated pod %s to revision version=%q", pod.Name, stsVersion))
err := waitForPodReady(ctx, rclient, ns, pod.Name, sts.Spec.MinReadySeconds)
podNsn := types.NamespacedName{Namespace: ns, Name: pod.Name}
err := waitForPodReady(ctx, rclient, podNsn, stsVersion, sts.Spec.MinReadySeconds)
if err != nil {
return fmt.Errorf("cannot wait for pod ready state for already updated pod: %w", err)
}
Expand All @@ -284,20 +281,14 @@ func performRollingUpdateOnSts(ctx context.Context, podMustRecreate bool, rclien
if err != nil {
return err
}
err = waitForPodReady(ctx, rclient, ns, pod.Name, sts.Spec.MinReadySeconds)
if err != nil {
podNsn := types.NamespacedName{Namespace: ns, Name: pod.Name}
if err = waitForPodReady(ctx, rclient, podNsn, stsVersion, sts.Spec.MinReadySeconds); err != nil {
return fmt.Errorf("cannot wait for pod ready state during re-creation: %w", err)
}
l.Info(fmt.Sprintf("pod %s was updated successfully", pod.Name))
}

if sts.Status.CurrentRevision != sts.Status.UpdateRevision {
l.Info(fmt.Sprintf("finishing statefulset update by changing status from revision=%q to desired revision=%q", sts.Status.CurrentRevision, sts.Status.UpdateRevision))
sts.Status.CurrentRevision = sts.Status.UpdateRevision
if err := rclient.Status().Update(ctx, sts); err != nil {
return fmt.Errorf("cannot update sts currentRevision after sts updated finished, err: %w", err)
}
}
l.Info(fmt.Sprintf("finished statefulset update from revision=%q to revision=%q", sts.Status.CurrentRevision, stsVersion))

return nil
}
Expand All @@ -322,13 +313,17 @@ func PodIsReady(pod *corev1.Pod, minReadySeconds int32) bool {
return false
}

func waitForPodReady(ctx context.Context, rclient client.Client, ns, podName string, minReadySeconds int32) error {
func waitForPodReady(ctx context.Context, rclient client.Client, nsn types.NamespacedName, desiredRevision string, minReadySeconds int32) error {
var pod *corev1.Pod
if err := wait.PollUntilContextTimeout(ctx, podWaitReadyIntervalCheck, podWaitReadyTimeout, false, func(_ context.Context) (done bool, err error) {
pod = &corev1.Pod{}
err = rclient.Get(ctx, types.NamespacedName{Namespace: ns, Name: podName}, pod)
err = rclient.Get(ctx, nsn, pod)
if err != nil {
return false, fmt.Errorf("cannot get pod: %q: %w", podName, err)
return false, fmt.Errorf("cannot get pod: %q: %w", nsn, err)
}
revision := pod.Labels[podRevisionLabel]
if revision != desiredRevision {
return true, fmt.Errorf("unexpected pod label %s=%s, want revision=%s", podRevisionLabel, revision, desiredRevision)
}
if PodIsReady(pod, minReadySeconds) {
return true, nil
Expand Down
77 changes: 74 additions & 3 deletions internal/controller/operator/factory/reconcile/statefulset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ import (

func Test_waitForPodReady(t *testing.T) {
type args struct {
ns string
podName string
ns string
podName string
desiredVersion string
}
tests := []struct {
name string
Expand Down Expand Up @@ -91,12 +92,82 @@ func Test_waitForPodReady(t *testing.T) {
},
wantErr: false,
},
{
name: "with desiredVersion",
args: args{
ns: "default",
podName: "vmselect-example-0",
desiredVersion: "some-version",
},
predefinedObjects: []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "vmselect-example-0",
Namespace: "default",
Labels: map[string]string{
podRevisionLabel: "some-version",
},
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{Status: "True", Type: corev1.PodReady},
},
Phase: corev1.PodRunning,
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "vmselect-example-1",
Namespace: "default",
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{},
Phase: corev1.PodPending,
},
},
},
wantErr: false,
},
{
name: "with missing desiredVersion",
args: args{
ns: "default",
podName: "vmselect-example-0",
desiredVersion: "some-version",
},
predefinedObjects: []runtime.Object{
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "vmselect-example-0",
Namespace: "default",
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{Status: "True", Type: corev1.PodReady},
},
Phase: corev1.PodRunning,
},
},
&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "vmselect-example-1",
Namespace: "default",
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{},
Phase: corev1.PodPending,
},
},
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
fclient := k8stools.GetTestClientWithObjects(tt.predefinedObjects)

if err := waitForPodReady(context.Background(), fclient, tt.args.ns, tt.args.podName, 0); (err != nil) != tt.wantErr {
nsn := types.NamespacedName{Namespace: tt.args.ns, Name: tt.args.podName}
if err := waitForPodReady(context.Background(), fclient, nsn, tt.args.desiredVersion, 0); (err != nil) != tt.wantErr {
t.Errorf("waitForPodReady() error = %v, wantErr %v", err, tt.wantErr)
}
})
Expand Down

0 comments on commit ddafd4a

Please sign in to comment.