Skip to content

Commit

Permalink
fix: h-scale ops failed but pod is created successfully (apecloud#7564)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangyelei authored Jun 19, 2024
1 parent 58ef663 commit 825515c
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 69 deletions.
4 changes: 1 addition & 3 deletions controllers/apps/operations/horizontal_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,7 @@ func (hs horizontalScalingOpsHandler) ReconcileAction(reqCtx intctrlutil.Request
lastCompConfiguration := opsRes.OpsRequest.Status.LastConfiguration.Components[pgRes.compOps.GetComponentName()]
horizontalScaling := pgRes.compOps.(appsv1alpha1.HorizontalScaling)
pgRes.createdPodSet, pgRes.deletedPodSet = hs.getCreateAndDeletePodSet(opsRes, lastCompConfiguration, *pgRes.clusterComponent, horizontalScaling, pgRes.fullComponentName)
if horizontalScaling.Replicas == nil {
pgRes.noWaitComponentCompleted = true
}
pgRes.noWaitComponentCompleted = true
return handleComponentProgressForScalingReplicas(reqCtx, cli, opsRes, pgRes, compStatus)
}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
Expand Down
55 changes: 32 additions & 23 deletions controllers/apps/operations/ops_comp_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
"slices"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

appsv1alpha1 "github.com/apecloud/kubeblocks/apis/apps/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
"github.com/apecloud/kubeblocks/pkg/controller/component"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

Expand Down Expand Up @@ -134,6 +134,15 @@ func (c componentOpsHelper) cancelComponentOps(ctx context.Context,
return cli.Update(ctx, opsRes.Cluster)
}

func (c componentOpsHelper) existFailure(ops *appsv1alpha1.OpsRequest, componentName string) bool {
for _, v := range ops.Status.Components[componentName].ProgressDetails {
if v.Status == appsv1alpha1.FailedProgressStatus {
return true
}
}
return false
}

// reconcileActionWithComponentOps will be performed when action is done and loops till OpsRequest.status.phase is Succeed/Failed.
// the common function to reconcile opsRequest status when the opsRequest will affect the lifecycle of the components.
func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.RequestCtx,
Expand All @@ -148,7 +157,6 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
var (
opsRequestPhase = appsv1alpha1.OpsRunningPhase
opsRequest = opsRes.OpsRequest
isFailed bool
expectProgressCount int32
completedProgressCount int32
requeueTimeAfterFailed time.Duration
Expand Down Expand Up @@ -225,7 +233,8 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
}
}
}
var waitComponentCompleted bool
opsIsCompleted := true
existFailure := false
for i := range progressResources {
pgResource := progressResources[i]
opsCompStatus := opsRequest.Status.Components[pgResource.compOps.GetComponentName()]
Expand All @@ -235,29 +244,29 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
}
expectProgressCount += expectCount
completedProgressCount += completedCount
if c.existFailure(opsRes.OpsRequest, pgResource.compOps.GetComponentName()) {
existFailure = true
}
componentPhase := opsRes.Cluster.Status.Components[pgResource.compOps.GetComponentName()].Phase
if !pgResource.isShardingComponent {
lastFailedTime := opsCompStatus.LastFailedTime
componentPhase := opsRes.Cluster.Status.Components[pgResource.compOps.GetComponentName()].Phase
if isFailedOrAbnormal(componentPhase) {
isFailed = true
if lastFailedTime.IsZero() {
lastFailedTime = metav1.Now()
}
if time.Now().Before(lastFailedTime.Add(componentFailedTimeout)) {
requeueTimeAfterFailed = componentFailedTimeout - time.Since(lastFailedTime.Time)
}
} else if !lastFailedTime.IsZero() {
// reset lastFailedTime if component is not failed
lastFailedTime = metav1.Time{}
}
if opsCompStatus.Phase != componentPhase {
opsCompStatus.Phase = componentPhase
opsCompStatus.LastFailedTime = lastFailedTime
}
// wait the component to complete
if !pgResource.noWaitComponentCompleted && !slices.Contains(appsv1alpha1.GetComponentTerminalPhases(), componentPhase) {
waitComponentCompleted = true
} else {
compObj, err := component.GetComponentByName(reqCtx.Ctx, cli, opsRes.Cluster.Namespace,
constant.GenerateClusterComponentName(opsRes.Cluster.Name, pgResource.fullComponentName))
if err != nil {
return opsRequestPhase, 0, err
}
componentPhase = compObj.Status.Phase
}
// conditions whether ops is running:
// 1. completedProgressCount is not equal to expectProgressCount when the ops do not need to wait component phase to a terminal phase.
// 2. the component phase is not a terminal phase.
// 3. no completed progresses
if (pgResource.noWaitComponentCompleted && expectCount != completedCount) ||
!slices.Contains(appsv1alpha1.GetComponentTerminalPhases(), componentPhase) || completedCount == 0 {
opsIsCompleted = false
}
opsRequest.Status.Components[pgResource.compOps.GetComponentName()] = opsCompStatus
}
Expand All @@ -268,10 +277,10 @@ func (c componentOpsHelper) reconcileActionWithComponentOps(reqCtx intctrlutil.R
return opsRequestPhase, 0, err
}
}
if waitComponentCompleted || completedProgressCount != expectProgressCount {
if !opsIsCompleted {
return opsRequestPhase, 0, nil
}
if isFailed {
if existFailure {
if requeueTimeAfterFailed != 0 {
// component failure may be temporary, waiting for component failure timeout.
return opsRequestPhase, requeueTimeAfterFailed, nil
Expand Down
22 changes: 2 additions & 20 deletions controllers/apps/operations/ops_progress_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,13 +320,9 @@ func handleFailedOrProcessingProgressDetail(opsRes *OpsResource,
progressDetail appsv1alpha1.ProgressStatusDetail,
pod *corev1.Pod) (completedCount int32) {
componentName := pgRes.clusterComponent.Name
opsStartTime := opsRes.OpsRequest.Status.StartTimestamp
if podIsFailedDuringOperation(opsStartTime, pod, compStatus.Phase) {
isFailed, isTimeout, _ := intctrlutil.IsPodFailedAndTimedOut(pod)
if isFailed && isTimeout {
podMessage := getFailedPodMessage(opsRes.Cluster, componentName, pod)
// if the pod is not failed, return
if len(podMessage) == 0 {
return
}
message := getProgressFailedMessage(pgRes.opsMessageKey, progressDetail.ObjectKey, componentName, podMessage)
progressDetail.SetStatusAndMessage(appsv1alpha1.FailedProgressStatus, message)
completedCount = 1
Expand All @@ -344,20 +340,6 @@ func podIsPendingDuringOperation(opsStartTime metav1.Time, pod *corev1.Pod) bool
return pod.CreationTimestamp.Before(&opsStartTime) && pod.DeletionTimestamp.IsZero()
}

// podIsFailedDuringOperation checks if pod is failed during operation.
func podIsFailedDuringOperation(
opsStartTime metav1.Time,
pod *corev1.Pod,
componentPhase appsv1alpha1.ClusterComponentPhase) bool {
if !isFailedOrAbnormal(componentPhase) {
return false
}
// When the component is running and the pod has been created after opsStartTime,
// but it does not meet the success condition, it indicates that the changes made
// to the operations have been overwritten, resulting in a failed status.
return !pod.CreationTimestamp.Before(&opsStartTime) && componentPhase == appsv1alpha1.RunningClusterCompPhase
}

// podProcessedSuccessful checks if the pod has been processed successfully:
// 1. the pod is recreated after OpsRequest.status.startTime and pod is available.
// 2. the component is running and pod is available.
Expand Down
11 changes: 0 additions & 11 deletions controllers/apps/operations/ops_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,6 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
)

const (
// componentFailedTimeout when the duration of component failure exceeds this threshold, it is determined that opsRequest has failed
componentFailedTimeout = 30 * time.Second
)

var _ error = &WaitForClusterPhaseErr{}

type WaitForClusterPhaseErr struct {
Expand All @@ -62,12 +57,6 @@ type handleStatusProgressWithComponent func(reqCtx intctrlutil.RequestCtx,

type handleReconfigureOpsStatus func(cmStatus *appsv1alpha1.ConfigurationItemStatus) error

func isFailedOrAbnormal(phase appsv1alpha1.ClusterComponentPhase) bool {
return slices.Index([]appsv1alpha1.ClusterComponentPhase{
appsv1alpha1.FailedClusterCompPhase,
appsv1alpha1.AbnormalClusterCompPhase}, phase) != -1
}

// getClusterDefByName gets the ClusterDefinition object by the name.
func getClusterDefByName(ctx context.Context, cli client.Client, clusterDefName string) (*appsv1alpha1.ClusterDefinition, error) {
clusterDef := &appsv1alpha1.ClusterDefinition{}
Expand Down
42 changes: 30 additions & 12 deletions controllers/apps/operations/ops_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
"github.com/apecloud/kubeblocks/pkg/generics"
testapps "github.com/apecloud/kubeblocks/pkg/testutil/apps"
testk8s "github.com/apecloud/kubeblocks/pkg/testutil/k8s"
)

var _ = Describe("OpsUtil functions", func() {
Expand Down Expand Up @@ -62,6 +63,7 @@ var _ = Describe("OpsUtil functions", func() {
// namespaced
testapps.ClearResourcesWithRemoveFinalizerOption(&testCtx, generics.InstanceSetSignature, true, inNS, ml)
testapps.ClearResources(&testCtx, generics.ConfigMapSignature, inNS, ml)
testapps.ClearResources(&testCtx, generics.OpsRequestSignature, inNS, ml)
}

BeforeEach(cleanEnv)
Expand Down Expand Up @@ -89,33 +91,49 @@ var _ = Describe("OpsUtil functions", func() {
By("init operations resources ")
opsRes, _, _ := initOperationsResources(clusterDefinitionName, clusterVersionName, clusterName)
testapps.MockInstanceSetComponent(&testCtx, clusterName, consensusComp)

pods := testapps.MockInstanceSetPods(&testCtx, nil, opsRes.Cluster, consensusComp)
time.Sleep(time.Second)
By("Test the functions in ops_util.go")
opsRes.OpsRequest = createHorizontalScaling(clusterName, appsv1alpha1.HorizontalScaling{
ComponentOps: appsv1alpha1.ComponentOps{ComponentName: consensusComp},
Replicas: pointer.Int32(1),
})
ops := testapps.NewOpsRequestObj("restart-ops-"+randomStr, testCtx.DefaultNamespace,
clusterName, appsv1alpha1.RestartType)
ops.Spec.RestartList = []appsv1alpha1.ComponentOps{{ComponentName: consensusComp}}
opsRes.OpsRequest = testapps.CreateOpsRequest(ctx, testCtx, ops)
opsRes.OpsRequest.Status.Phase = appsv1alpha1.OpsRunningPhase
opsRes.OpsRequest.Status.StartTimestamp = metav1.Now()

By("mock component failed")
clusterComp := opsRes.Cluster.Status.Components[consensusComp]
clusterComp.Phase = appsv1alpha1.FailedClusterCompPhase
opsRes.Cluster.Status.SetComponentStatus(consensusComp, clusterComp)

By("expect for opsRequest is running")
handleRestartProgress := func(reqCtx intctrlutil.RequestCtx,
cli client.Client,
opsRes *OpsResource,
pgRes *progressResource,
compStatus *appsv1alpha1.OpsRequestComponentStatus) (expectProgressCount int32, completedCount int32, err error) {
return handleComponentStatusProgress(reqCtx, cli, opsRes, pgRes, compStatus,
func(pod *corev1.Pod, inteface ComponentOpsInteface, opsStartTime metav1.Time, s string) bool {
return !pod.CreationTimestamp.Before(&opsStartTime)
})
}

reqCtx := intctrlutil.RequestCtx{Ctx: ctx}
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.HorizontalScalingList)
compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.RestartList)

opsPhase, _, err := compOpsHelper.reconcileActionWithComponentOps(reqCtx, k8sClient, opsRes,
"test", handleComponentProgressForScalingReplicas)
"test", handleRestartProgress)
Expect(err).Should(BeNil())
Expect(opsPhase).Should(Equal(appsv1alpha1.OpsRunningPhase))

By("mock component failed time reaches the threshold, expect for opsRequest is Failed")
compStatus := opsRes.OpsRequest.Status.Components[consensusComp]
compStatus.LastFailedTime = metav1.Time{Time: compStatus.LastFailedTime.Add(-1 * componentFailedTimeout).Add(-1 * time.Second)}
opsRes.OpsRequest.Status.Components[consensusComp] = compStatus
opsPhase, _, err = compOpsHelper.reconcileActionWithComponentOps(reqCtx, k8sClient, opsRes, "test", handleComponentProgressForScalingReplicas)
By("mock one pod recreates failed, expect for opsRequest is Failed")
testk8s.MockPodIsTerminating(ctx, testCtx, pods[2])
testk8s.RemovePodFinalizer(ctx, testCtx, pods[2])
// recreate it
pod := testapps.MockInstanceSetPod(&testCtx, nil, clusterName, consensusComp, pods[2].Name, "follower", "Readonly")
// mock pod is failed
testk8s.MockPodIsFailed(ctx, testCtx, pod)
opsPhase, _, err = compOpsHelper.reconcileActionWithComponentOps(reqCtx, k8sClient, opsRes, "test", handleRestartProgress)
Expect(err).Should(BeNil())
Expect(opsPhase).Should(Equal(appsv1alpha1.OpsFailedPhase))
})
Expand Down
23 changes: 23 additions & 0 deletions pkg/testutil/k8s/pod_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package testutil
import (
"context"
"fmt"
"time"

"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -68,6 +69,28 @@ func MockPodIsTerminating(ctx context.Context, testCtx testutil.TestContext, pod
}).Should(gomega.Succeed())
}

func MockPodIsFailed(ctx context.Context, testCtx testutil.TestContext, pod *corev1.Pod) {
patch := client.MergeFrom(pod.DeepCopy())
pod.Status.Conditions = []corev1.PodCondition{
{
Type: corev1.ContainersReady,
Status: corev1.ConditionFalse,
LastTransitionTime: metav1.Time{Time: time.Now().Add(-20 * time.Second)},
},
}
pod.Status.ContainerStatuses = []corev1.ContainerStatus{
{
Name: pod.Spec.Containers[0].Name,
State: corev1.ContainerState{
Terminated: &corev1.ContainerStateTerminated{
ExitCode: 1,
},
},
},
}
gomega.Expect(testCtx.Cli.Status().Patch(ctx, pod, patch)).Should(gomega.Succeed())
}

// RemovePodFinalizer removes the pod finalizer to delete the pod finally.
func RemovePodFinalizer(ctx context.Context, testCtx testutil.TestContext, pod *corev1.Pod) {
patch := client.MergeFrom(pod.DeepCopy())
Expand Down

0 comments on commit 825515c

Please sign in to comment.