Skip to content

Commit

Permalink
chore: support replicationSet rolling update with role strategy (apec…
Browse files Browse the repository at this point in the history
  • Loading branch information
Y-Rookie authored Jun 27, 2023
1 parent 4e32760 commit 8fc7de7
Show file tree
Hide file tree
Showing 8 changed files with 173 additions and 135 deletions.
1 change: 0 additions & 1 deletion apis/apps/v1alpha1/clusterdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@ func (r *ReplicationSetSpec) FinalStsUpdateStrategy() (appsv1.PodManagementPolic
return r.LLPodManagementPolicy, *r.LLUpdateStrategy
}
_, s := r.StatefulSetSpec.finalStsUpdateStrategy()
// TODO(xingran): The update of the replicationSet needs to generate a plan according to the role
s.Type = appsv1.OnDeleteStatefulSetStrategyType
s.RollingUpdate = nil
return appsv1.ParallelPodManagement, s
Expand Down
4 changes: 2 additions & 2 deletions controllers/apps/components/component_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ var _ = Describe("ComponentStatusSynchronizer", func() {
BeforeEach(func() {
clusterDef = testapps.NewClusterDefFactory(clusterDefName).
AddComponentDef(testapps.ReplicationRedisComponent, compDefName).
GetObject()
Create(&testCtx).GetObject()

cluster = testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterName, clusterDefName, clusterVersionName).
AddComponent(compName, compDefName).
SetReplicas(2).
GetObject()
Create(&testCtx).GetObject()

reqCtx = &intctrlutil.RequestCtx{
Ctx: ctx,
Expand Down
43 changes: 3 additions & 40 deletions controllers/apps/components/consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/apecloud/kubeblocks/controllers/apps/components/util"
"github.com/apecloud/kubeblocks/internal/constant"
"github.com/apecloud/kubeblocks/internal/controller/graph"
ictrltypes "github.com/apecloud/kubeblocks/internal/controller/types"
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

Expand Down Expand Up @@ -202,46 +201,10 @@ func (r *ConsensusSet) HandleRestart(ctx context.Context, obj client.Object) ([]
if r.getWorkloadType() != appsv1alpha1.Consensus {
return nil, nil
}

stsObj := util.ConvertToStatefulSet(obj)
pods, err := util.GetPodListByStatefulSet(ctx, r.Cli, stsObj)
if err != nil {
return nil, err
}

// prepare to do pods Deletion, that's the only thing we should do,
// the statefulset reconciler will do the rest.
// to simplify the process, we do pods Deletion after statefulset reconciliation done,
// it is when stsObj.Generation == stsObj.Status.ObservedGeneration
if stsObj.Generation != stsObj.Status.ObservedGeneration {
return nil, nil
}

// then we wait for all pods' presence when len(pods) == stsObj.Spec.Replicas
// at that point, we have enough info about the previous pods before deleting the current one
if len(pods) != int(*stsObj.Spec.Replicas) {
return nil, nil
}

// we don't check whether pod role label is present: prefer stateful set's Update done than role probing ready

// generate the pods Deletion plan
podsToDelete := make([]*corev1.Pod, 0)
plan := generateRestartPodPlan(ctx, r.Cli, stsObj, pods, r.getConsensusSpec(), &podsToDelete)
// execute plan
if _, err := plan.WalkOneStep(); err != nil {
return nil, err
}

vertexes := make([]graph.Vertex, 0)
for _, pod := range podsToDelete {
vertexes = append(vertexes, &ictrltypes.LifecycleVertex{
Obj: pod,
Action: ictrltypes.ActionDeletePtr(),
Orphan: true,
})
priorityMapperFn := func(component *appsv1alpha1.ClusterComponentDefinition) map[string]int {
return ComposeRolePriorityMap(component.ConsensusSpec)
}
return vertexes, nil
return r.HandleUpdateWithStrategy(ctx, obj, nil, priorityMapperFn, generateConsensusSerialPlan, generateConsensusBestEffortParallelPlan, generateConsensusParallelPlan)
}

// HandleRoleChange is the implementation of the type Component interface method, which is used to handle the role change of the Consensus workload.
Expand Down
50 changes: 0 additions & 50 deletions controllers/apps/components/consensus/consensus_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@ package consensus

import (
"context"
"errors"
"sort"
"strings"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -64,54 +62,6 @@ const (
// unknownPriority = 0
)

// generateRestartPodPlan generates update plan to restart pods based on UpdateStrategy
func generateRestartPodPlan(ctx context.Context, cli client.Client, stsObj *appsv1.StatefulSet, pods []corev1.Pod,
consensusSpec *appsv1alpha1.ConsensusSetSpec, podsToDelete *[]*corev1.Pod) *util.Plan {
restartPod := func(obj interface{}) (bool, error) {
pod, ok := obj.(corev1.Pod)
if !ok {
return false, errors.New("wrong type: obj not Pod")
}
// if DeletionTimestamp is not nil, it is terminating.
if pod.DeletionTimestamp != nil {
return true, nil
}
// if pod is the latest version, we do nothing
if intctrlutil.GetPodRevision(&pod) == stsObj.Status.UpdateRevision {
// wait until ready
return !intctrlutil.PodIsReadyWithLabel(pod), nil
}

// delete the pod to trigger associate StatefulSet to re-create it
*podsToDelete = append(*podsToDelete, &pod)

return true, nil
}
return generateConsensusUpdatePlanLow(ctx, cli, stsObj, pods, consensusSpec, restartPod)
}

// generateConsensusUpdatePlanLow generates Update plan based on UpdateStrategy
func generateConsensusUpdatePlanLow(ctx context.Context, cli client.Client, stsObj *appsv1.StatefulSet, pods []corev1.Pod,
consensusSpec *appsv1alpha1.ConsensusSetSpec, restartPod func(obj any) (bool, error)) *util.Plan {
plan := &util.Plan{}
plan.Start = &util.Step{}
plan.WalkFunc = restartPod

rolePriorityMap := ComposeRolePriorityMap(consensusSpec)
util.SortPods(pods, rolePriorityMap, constant.RoleLabelKey)

// generate plan by UpdateStrategy
switch consensusSpec.UpdateStrategy {
case appsv1alpha1.SerialStrategy:
generateConsensusSerialPlan(plan, pods, rolePriorityMap)
case appsv1alpha1.ParallelStrategy:
generateConsensusParallelPlan(plan, pods, rolePriorityMap)
case appsv1alpha1.BestEffortParallelStrategy:
generateConsensusBestEffortParallelPlan(plan, pods, rolePriorityMap)
}
return plan
}

// unknown & empty & learner & 1/2 followers -> 1/2 followers -> leader
func generateConsensusBestEffortParallelPlan(plan *util.Plan, pods []corev1.Pod, rolePriorityMap map[string]int) {
start := plan.Start
Expand Down
25 changes: 10 additions & 15 deletions controllers/apps/components/replication/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ import (
intctrlutil "github.com/apecloud/kubeblocks/internal/controllerutil"
)

const (
emptyPriority = iota
secondaryPriority
primaryPriority
)

// ReplicationSet is a component object used by Cluster, ClusterComponentDefinition and ClusterComponentSpec
type ReplicationSet struct {
stateful.Stateful
Expand Down Expand Up @@ -155,25 +161,14 @@ func (r *ReplicationSet) GetPhaseWhenPodsNotReady(ctx context.Context,
}

// HandleRestart is the implementation of the type Component interface method, which is used to handle the restart of the Replication workload.
// TODO(xingran): handle the restart of the Replication workload with rolling update by Pod role.
func (r *ReplicationSet) HandleRestart(ctx context.Context, obj client.Object) ([]graph.Vertex, error) {
sts := util.ConvertToStatefulSet(obj)
if sts.Generation != sts.Status.ObservedGeneration {
if r.getWorkloadType() != appsv1alpha1.Replication {
return nil, nil
}
vertexes := make([]graph.Vertex, 0)
pods, err := util.GetPods4Delete(ctx, r.Cli, sts)
if err != nil {
return nil, err
}
for _, pod := range pods {
vertexes = append(vertexes, &ictrltypes.LifecycleVertex{
Obj: pod,
Action: ictrltypes.ActionDeletePtr(),
Orphan: true,
})
priorityMapperFn := func(component *appsv1alpha1.ClusterComponentDefinition) map[string]int {
return ComposeReplicationRolePriorityMap()
}
return vertexes, nil
return r.HandleUpdateWithStrategy(ctx, obj, nil, priorityMapperFn, generateReplicationSerialPlan, generateReplicationBestEffortParallelPlan, generateReplicationParallelPlan)
}

// HandleRoleChange is the implementation of the type Component interface method, which is used to handle the role change of the Replication workload.
Expand Down
42 changes: 40 additions & 2 deletions controllers/apps/components/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,14 @@ var _ = Describe("Replication Component", func() {
It("Replication Component test", func() {

By("Create a clusterDefinition obj with replication workloadType.")
replicationSpec := &appsv1alpha1.ReplicationSetSpec{
StatefulSetSpec: appsv1alpha1.StatefulSetSpec{
UpdateStrategy: appsv1alpha1.SerialStrategy,
},
}
clusterDefObj = testapps.NewClusterDefFactory(clusterDefName).
AddComponentDef(testapps.ReplicationRedisComponent, testapps.DefaultRedisCompDefName).
AddReplicationSpec(replicationSpec).
Create(&testCtx).GetObject()

By("Create a clusterVersion obj with replication workloadType.")
Expand Down Expand Up @@ -207,13 +213,45 @@ var _ = Describe("Replication Component", func() {
Expect(len(pods)).To(Equal(int(replicas)))
Expect(util.IsStsAndPodsRevisionConsistent(ctx, k8sClient, replicationSetSts)).Should(BeTrue())

By("Checking if the pod is deleted when statefulSet is updated")
By("Checking if the pod is deleted when statefulSet is updated and UpdateStrategy is SerialStrategy")
status.UpdateRevision = "new-mock-revision"
testk8s.PatchStatefulSetStatus(&testCtx, replicationSetSts.Name, status)
Expect(testCtx.Cli.Get(testCtx.Ctx, stsObjectKey, replicationSetSts)).Should(Succeed())
vertexes, err = replicationComponent.HandleRestart(ctx, replicationSetSts)
Expect(err).To(Succeed())
Expect(len(vertexes)).To(Equal(int(replicas)))
Expect(len(vertexes)).To(Equal(1))
Expect(*vertexes[0].(*ictrltypes.LifecycleVertex).Action == ictrltypes.DELETE).To(BeTrue())

By("Checking if the pod is deleted when statefulSet is updated and UpdateStrategy is BestEffortParallelStrategy")
Expect(testapps.ChangeObj(&testCtx, clusterDefObj, func(clusterDef *appsv1alpha1.ClusterDefinition) {
clusterDef.Spec.ComponentDefs[0].ReplicationSpec = &appsv1alpha1.ReplicationSetSpec{
StatefulSetSpec: appsv1alpha1.StatefulSetSpec{
UpdateStrategy: appsv1alpha1.BestEffortParallelStrategy,
},
}
})).Should(Succeed())
status.UpdateRevision = "new-mock-revision-2"
testk8s.PatchStatefulSetStatus(&testCtx, replicationSetSts.Name, status)
Expect(testCtx.Cli.Get(testCtx.Ctx, stsObjectKey, replicationSetSts)).Should(Succeed())
vertexes, err = replicationComponent.HandleRestart(ctx, replicationSetSts)
Expect(err).To(Succeed())
Expect(len(vertexes)).To(Equal(1))
Expect(*vertexes[0].(*ictrltypes.LifecycleVertex).Action == ictrltypes.DELETE).To(BeTrue())

By("Checking if the pod is deleted when statefulSet is updated and UpdateStrategy is ParallelStrategy")
Expect(testapps.ChangeObj(&testCtx, clusterDefObj, func(clusterDef *appsv1alpha1.ClusterDefinition) {
clusterDef.Spec.ComponentDefs[0].ReplicationSpec = &appsv1alpha1.ReplicationSetSpec{
StatefulSetSpec: appsv1alpha1.StatefulSetSpec{
UpdateStrategy: appsv1alpha1.ParallelStrategy,
},
}
})).Should(Succeed())
status.UpdateRevision = "new-mock-revision-2"
testk8s.PatchStatefulSetStatus(&testCtx, replicationSetSts.Name, status)
Expect(testCtx.Cli.Get(testCtx.Ctx, stsObjectKey, replicationSetSts)).Should(Succeed())
vertexes, err = replicationComponent.HandleRestart(ctx, replicationSetSts)
Expect(err).To(Succeed())
Expect(len(vertexes)).To(Equal(2))
Expect(*vertexes[0].(*ictrltypes.LifecycleVertex).Action == ictrltypes.DELETE).To(BeTrue())

By("Test handleRoleChange when statefulSet Pod with role label but without primary annotation")
Expand Down
89 changes: 89 additions & 0 deletions controllers/apps/components/replication/replication_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,3 +167,92 @@ func HandleReplicationSetRoleChangeEvent(cli client.Client,

return nil
}

// ComposeReplicationRolePriorityMap generates a priority map based on roles.
func ComposeReplicationRolePriorityMap() map[string]int {
rolePriorityMap := make(map[string]int, 0)
rolePriorityMap[""] = emptyPriority
rolePriorityMap[constant.Primary] = primaryPriority
rolePriorityMap[constant.Secondary] = secondaryPriority
return rolePriorityMap
}

// generateReplicationParallelPlan generates a parallel plan for the replication workload.
// unknown & empty & secondary & primary
func generateReplicationParallelPlan(plan *util.Plan, pods []corev1.Pod, rolePriorityMap map[string]int) {
start := plan.Start
for _, pod := range pods {
nextStep := &util.Step{}
nextStep.Obj = pod
start.NextSteps = append(start.NextSteps, nextStep)
}
}

// generateReplicationSerialPlan generates a serial plan for the replication workload.
// unknown -> empty -> secondary -> primary
func generateReplicationSerialPlan(plan *util.Plan, pods []corev1.Pod, rolePriorityMap map[string]int) {
start := plan.Start
for _, pod := range pods {
nextStep := &util.Step{}
nextStep.Obj = pod
start.NextSteps = append(start.NextSteps, nextStep)
start = nextStep
}
}

// generateReplicationBestEffortParallelPlan generates a best effort parallel plan for the replication workload.
// unknown & empty & 1/2 secondaries -> 1/2 secondaries -> primary
func generateReplicationBestEffortParallelPlan(plan *util.Plan, pods []corev1.Pod, rolePriorityMap map[string]int) {
start := plan.Start
// append unknown, empty
index := 0
for _, pod := range pods {
role := pod.Labels[constant.RoleLabelKey]
if rolePriorityMap[role] <= emptyPriority {
nextStep := &util.Step{}
nextStep.Obj = pod
start.NextSteps = append(start.NextSteps, nextStep)
index++
}
}
if len(start.NextSteps) > 0 {
start = start.NextSteps[0]
}
// append 1/2 secondaries
podList := pods[index:]
secondaryCount := 0
for _, pod := range podList {
if rolePriorityMap[pod.Labels[constant.RoleLabelKey]] < primaryPriority {
secondaryCount++
}
}
end := secondaryCount / 2
for i := 0; i < end; i++ {
nextStep := &util.Step{}
nextStep.Obj = podList[i]
start.NextSteps = append(start.NextSteps, nextStep)
}

if len(start.NextSteps) > 0 {
start = start.NextSteps[0]
}
// append the other 1/2 secondaries
podList = podList[end:]
end = secondaryCount - end
for i := 0; i < end; i++ {
nextStep := &util.Step{}
nextStep.Obj = podList[i]
start.NextSteps = append(start.NextSteps, nextStep)
}

if len(start.NextSteps) > 0 {
start = start.NextSteps[0]
}
// append primary
podList = podList[end:]
for _, pod := range podList {
nextStep := &util.Step{}
nextStep.Obj = pod
start.NextSteps = append(start.NextSteps, nextStep)
}
}
Loading

0 comments on commit 8fc7de7

Please sign in to comment.