Skip to content

Commit

Permalink
feat: h-scale policy auto failover with backup type (apecloud#3817)
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnleelhl authored Jun 16, 2023
1 parent c75dab3 commit 4a26fe6
Show file tree
Hide file tree
Showing 12 changed files with 152 additions and 100 deletions.
13 changes: 6 additions & 7 deletions apis/apps/v1alpha1/clusterdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,13 +549,12 @@ func (r *ServicePort) toSVCPort() corev1.ServicePort {

type HorizontalScalePolicy struct {
// type controls what kind of data synchronization do when component scale out.
// Policy is in enum of {None, Snapshot}. The default policy is `None`.
// None: Default policy, do nothing.
// Snapshot: Do native volume snapshot before scaling and restore to newly scaled pods.
// Prefer backup job to create snapshot if can find a backupPolicy from 'BackupPolicyTemplateName'.
// Notice that 'Snapshot' policy will only take snapshot on one volumeMount, default is
// the first volumeMount of first container (i.e. clusterdefinition.spec.components.podSpec.containers[0].volumeMounts[0]),
// since take multiple snapshots at one time might cause consistency problem.
// Policy is in enum of {None, CloneVolume}. The default policy is `None`.
// None: Default policy, create empty volume and no data clone.
// CloneVolume: Do data clone to newly scaled pods. Prefer to use volume snapshot first,
// and will try backup tool if volume snapshot is not enabled, finally
// report error if both above cannot work.
// Snapshot: Deprecated, alias for CloneVolume.
// +kubebuilder:default=None
// +optional
Type HScaleDataClonePolicyType `json:"type,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions apis/apps/v1alpha1/clusterdefinition_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ func (r *ClusterDefinition) Default() {
probes.RoleProbeTimeoutAfterPodsReady = 0
}
}
// set to CloneVolume if deprecated value used
if r.Spec.ComponentDefs[i].HorizontalScalePolicy != nil &&
r.Spec.ComponentDefs[i].HorizontalScalePolicy.Type == HScaleDataClonePolicyFromSnapshot {
r.Spec.ComponentDefs[i].HorizontalScalePolicy.Type = HScaleDataClonePolicyCloneVolume
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions apis/apps/v1alpha1/clusterdefinition_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,14 @@ var _ = Describe("clusterDefinition webhook", func() {
Expect(k8sClient.Update(ctx, clusterDef)).Should(Succeed())
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: clusterDef.Name}, clusterDef)).Should(Succeed())
Expect(clusterDef.Spec.ComponentDefs[0].Probes.RoleProbeTimeoutAfterPodsReady).Should(Equal(int32(0)))

By("set h-scale policy type to Snapshot")
clusterDef.Spec.ComponentDefs[0].HorizontalScalePolicy = &HorizontalScalePolicy{
Type: HScaleDataClonePolicyFromSnapshot,
}
Expect(k8sClient.Update(ctx, clusterDef)).Should(Succeed())
Expect(k8sClient.Get(ctx, client.ObjectKey{Name: clusterDef.Name}, clusterDef)).Should(Succeed())
Expect(clusterDef.Spec.ComponentDefs[0].HorizontalScalePolicy.Type).Should(Equal(HScaleDataClonePolicyCloneVolume))
})
})

Expand Down
4 changes: 2 additions & 2 deletions apis/apps/v1alpha1/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,13 +279,13 @@ const (

// HScaleDataClonePolicyType defines data clone policy when horizontal scaling.
// +enum
// +kubebuilder:validation:Enum={None,Snapshot,Backup}
// +kubebuilder:validation:Enum={None,CloneVolume,Snapshot}
type HScaleDataClonePolicyType string

const (
HScaleDataClonePolicyNone HScaleDataClonePolicyType = "None"
HScaleDataClonePolicyCloneVolume HScaleDataClonePolicyType = "CloneVolume"
HScaleDataClonePolicyFromSnapshot HScaleDataClonePolicyType = "Snapshot"
HScaleDataClonePolicyFromBackup HScaleDataClonePolicyType = "Backup"
)

// PodAntiAffinity defines pod anti-affinity strategy.
Expand Down
17 changes: 7 additions & 10 deletions config/crd/bases/apps.kubeblocks.io_clusterdefinitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -379,19 +379,16 @@ spec:
default: None
description: 'type controls what kind of data synchronization
do when component scale out. Policy is in enum of {None,
Snapshot}. The default policy is `None`. None: Default
policy, do nothing. Snapshot: Do native volume snapshot
before scaling and restore to newly scaled pods. Prefer
backup job to create snapshot if can find a backupPolicy
from ''BackupPolicyTemplateName''. Notice that ''Snapshot''
policy will only take snapshot on one volumeMount, default
is the first volumeMount of first container (i.e. clusterdefinition.spec.components.podSpec.containers[0].volumeMounts[0]),
since take multiple snapshots at one time might cause
consistency problem.'
CloneVolume}. The default policy is `None`. None: Default
policy, create empty volume and no data clone. CloneVolume:
Do data clone to newly scaled pods. Prefer to use volume
snapshot first, and will try backup tool if volume snapshot
is not enabled, finally report error if both above cannot
work. Snapshot: Deprecated, alias for CloneVolume.'
enum:
- None
- CloneVolume
- Snapshot
- Backup
type: string
volumeMountsName:
description: volumeMountsName defines which volumeMount
Expand Down
88 changes: 77 additions & 11 deletions controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ var _ = Describe("Cluster Controller", func() {
backup.Status.PersistentVolumeClaimName = "backup-data"
})()).Should(Succeed())

if policy.Type == appsv1alpha1.HScaleDataClonePolicyFromSnapshot {
if viper.GetBool("VOLUMESNAPSHOT") {
By("Mocking VolumeSnapshot and set it as ReadyToUse")
pvcName := getPVCName(comp.Name, 0)
volumeSnapshot := &snapshotv1.VolumeSnapshot{
Expand Down Expand Up @@ -566,7 +566,7 @@ var _ = Describe("Cluster Controller", func() {
constant.KBAppComponentLabelKey: comp.Name,
}, client.InNamespace(clusterKey.Namespace))).Should(HaveLen(updatedReplicas))

if policy.Type == appsv1alpha1.HScaleDataClonePolicyFromBackup {
if !viper.GetBool("VOLUMESNAPSHOT") && len(viper.GetString(constant.CfgKeyBackupPVCName)) > 0 {
By("Checking restore job created")
Eventually(testapps.List(&testCtx, generics.JobSignature,
client.MatchingLabels{
Expand Down Expand Up @@ -610,7 +610,7 @@ var _ = Describe("Cluster Controller", func() {
}, client.InNamespace(clusterKey.Namespace))).Should(HaveLen(0))
Eventually(testapps.CheckObjExists(&testCtx, backupKey, &snapshotv1.VolumeSnapshot{}, false)).Should(Succeed())

if policy.Type == appsv1alpha1.HScaleDataClonePolicyFromBackup {
if !viper.GetBool("VOLUMESNAPSHOT") && len(viper.GetString(constant.CfgKeyBackupPVCName)) > 0 {
By("Checking restore job cleanup")
Eventually(testapps.List(&testCtx, generics.JobSignature,
client.MatchingLabels{
Expand Down Expand Up @@ -708,7 +708,6 @@ var _ = Describe("Cluster Controller", func() {
// @argument componentDefsWithHScalePolicy assign ClusterDefinition.spec.componentDefs[].horizontalScalePolicy for
// the matching names. If not provided, will set 1st ClusterDefinition.spec.componentDefs[0].horizontalScalePolicy.
horizontalScale := func(updatedReplicas int, policyType appsv1alpha1.HScaleDataClonePolicyType, componentDefsWithHScalePolicy ...string) {
viper.Set("VOLUMESNAPSHOT", true)
cluster := &appsv1alpha1.Cluster{}
Expect(testCtx.Cli.Get(testCtx.Ctx, clusterKey, cluster)).Should(Succeed())
initialGeneration := int(cluster.Status.ObservedGeneration)
Expand Down Expand Up @@ -736,7 +735,7 @@ var _ = Describe("Cluster Controller", func() {
Eventually(testapps.CheckObjExists(&testCtx, client.ObjectKey{Name: policyName, Namespace: clusterKey.Namespace},
&dataprotectionv1alpha1.BackupPolicy{}, true)).Should(Succeed())

if policyType == appsv1alpha1.HScaleDataClonePolicyFromBackup {
if policyType == appsv1alpha1.HScaleDataClonePolicyCloneVolume {
By("creating backup tool if backup policy is backup")
backupTool := &dataprotectionv1alpha1.BackupTool{
ObjectMeta: metav1.ObjectMeta{
Expand Down Expand Up @@ -820,7 +819,9 @@ var _ = Describe("Cluster Controller", func() {
waitForCreatingResourceCompletely(clusterKey, compName)

// REVIEW: this test flow, wait for running phase?
horizontalScale(int(updatedReplicas), appsv1alpha1.HScaleDataClonePolicyFromSnapshot, compDefName)
viper.Set("VOLUMESNAPSHOT", true)
viper.Set(constant.CfgKeyBackupPVCName, "")
horizontalScale(int(updatedReplicas), appsv1alpha1.HScaleDataClonePolicyCloneVolume, compDefName)
}

testStorageExpansion := func(compName, compDefName string) {
Expand Down Expand Up @@ -1401,6 +1402,61 @@ var _ = Describe("Cluster Controller", func() {
Should(Equal(appsv1alpha1.RunningClusterCompPhase))
}

testHScaleError := func(compName, compDefName string) {

viper.Set("VOLUMESNAPSHOT", false)
viper.Set(constant.CfgKeyBackupPVCName, "")
initialReplicas := int32(1)
updatedReplicas := int32(3)

By("Set HorizontalScalePolicy")
Expect(testapps.GetAndChangeObj(&testCtx, client.ObjectKeyFromObject(clusterDefObj),
func(clusterDef *appsv1alpha1.ClusterDefinition) {
for i, def := range clusterDef.Spec.ComponentDefs {
if def.Name != compDefName {
continue
}
clusterDef.Spec.ComponentDefs[i].HorizontalScalePolicy =
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyCloneVolume,
BackupPolicyTemplateName: backupPolicyTPLName}
}
})()).ShouldNot(HaveOccurred())

By("Creating a cluster with VolumeClaimTemplate")
pvcSpec := testapps.NewPVCSpec("1Gi")
clusterObj = testapps.NewClusterFactory(testCtx.DefaultNamespace, clusterName,
clusterDefObj.Name, clusterVersionObj.Name).WithRandomName().
AddComponent(compName, compDefName).
AddVolumeClaimTemplate(testapps.DataVolumeName, pvcSpec).
SetReplicas(initialReplicas).
Create(&testCtx).GetObject()
clusterKey = client.ObjectKeyFromObject(clusterObj)

By("Waiting for the cluster controller to create resources completely")
waitForCreatingResourceCompletely(clusterKey, compName)
Eventually(testapps.GetClusterObservedGeneration(&testCtx, clusterKey)).Should(BeEquivalentTo(1))

By(fmt.Sprintf("Changing replicas to %d", updatedReplicas))
changeCompReplicas(clusterKey, updatedReplicas, &clusterObj.Spec.ComponentSpecs[0])

By("Checking h-scale failed cluster status failed with backup error")
Eventually(testapps.CheckObj(&testCtx, clusterKey, func(g Gomega, cluster *appsv1alpha1.Cluster) {
g.Expect(cluster.Status.Conditions).ShouldNot(BeEmpty())
var err error
for _, cond := range cluster.Status.Conditions {
if strings.Contains(cond.Message, "h-scale policy is Backup but neither snapshot nor backup tool is enabled") {
err = errors.New("has h-scale error")
break
}
}
if err == nil {
// this expect is intended for print all cluster.Status.Conditions
g.Expect(cluster.Status.Conditions).Should(BeEmpty())
}
g.Expect(err).Should(HaveOccurred())
})).Should(Succeed())
}

testBackupError := func(compName, compDefName string) {
initialReplicas := int32(1)
updatedReplicas := int32(3)
Expand All @@ -1414,7 +1470,7 @@ var _ = Describe("Cluster Controller", func() {
continue
}
clusterDef.Spec.ComponentDefs[i].HorizontalScalePolicy =
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyFromSnapshot,
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyCloneVolume,
BackupPolicyTemplateName: backupPolicyTPLName}
}
})()).ShouldNot(HaveOccurred())
Expand Down Expand Up @@ -1459,7 +1515,7 @@ var _ = Describe("Cluster Controller", func() {
Expect(testapps.GetAndChangeObj(&testCtx, client.ObjectKeyFromObject(clusterDefObj),
func(clusterDef *appsv1alpha1.ClusterDefinition) {
clusterDef.Spec.ComponentDefs[0].HorizontalScalePolicy =
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyFromSnapshot,
&appsv1alpha1.HorizontalScalePolicy{Type: appsv1alpha1.HScaleDataClonePolicyCloneVolume,
BackupPolicyTemplateName: backupPolicyTPLName}
})()).ShouldNot(HaveOccurred())

Expand Down Expand Up @@ -1789,11 +1845,15 @@ var _ = Describe("Cluster Controller", func() {
})

It("should successfully h-scale with multiple components", func() {
testMultiCompHScale(appsv1alpha1.HScaleDataClonePolicyFromSnapshot)
viper.Set("VOLUMESNAPSHOT", true)
viper.Set(constant.CfgKeyBackupPVCName, "")
testMultiCompHScale(appsv1alpha1.HScaleDataClonePolicyCloneVolume)
})

It("should successfully h-scale with multiple components by backup tool", func() {
testMultiCompHScale(appsv1alpha1.HScaleDataClonePolicyFromBackup)
viper.Set("VOLUMESNAPSHOT", false)
viper.Set(constant.CfgKeyBackupPVCName, "test-backup-pvc")
testMultiCompHScale(appsv1alpha1.HScaleDataClonePolicyCloneVolume)
})
})

Expand Down Expand Up @@ -1889,10 +1949,16 @@ var _ = Describe("Cluster Controller", func() {
testBackupError(compName, compDefName)
})

It(fmt.Sprintf("[comp: %s] should report h-scale error", compName), func() {
testHScaleError(compName, compDefName)
})

Context(fmt.Sprintf("[comp: %s] with horizontal scale after storage expansion", compName), func() {
It("should succeed with horizontal scale to 5 replicas", func() {
testStorageExpansion(compName, compDefName)
horizontalScale(5, appsv1alpha1.HScaleDataClonePolicyFromSnapshot, compDefName)
viper.Set("VOLUMESNAPSHOT", true)
viper.Set(constant.CfgKeyBackupPVCName, "")
horizontalScale(5, appsv1alpha1.HScaleDataClonePolicyCloneVolume, compDefName)
})
})

Expand Down
18 changes: 12 additions & 6 deletions controllers/apps/components/internal/component_base_stateful.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,13 +572,15 @@ func (c *StatefulComponentBase) scaleOut(reqCtx intctrlutil.RequestCtx, cli clie

c.WorkloadVertex.Immutable = true
stsProto := c.WorkloadVertex.Obj.(*appsv1.StatefulSet)
dataClone := newDataClone(reqCtx, cli, c.Cluster, c.Component, stsObj, stsProto, backupKey)
d, err := newDataClone(reqCtx, cli, c.Cluster, c.Component, stsObj, stsProto, backupKey)
if err != nil {
return err
}
var succeed bool
var err error
if dataClone == nil {
if d == nil {
succeed = true
} else {
succeed, err = dataClone.succeed()
succeed, err = d.succeed()
if err != nil {
return err
}
Expand All @@ -590,7 +592,7 @@ func (c *StatefulComponentBase) scaleOut(reqCtx intctrlutil.RequestCtx, cli clie
} else {
c.WorkloadVertex.Immutable = true
// update objs will trigger cluster reconcile, no need to requeue error
objs, err := dataClone.cloneData(dataClone)
objs, err := d.cloneData(d)
if err != nil {
return err
}
Expand All @@ -609,7 +611,11 @@ func (c *StatefulComponentBase) postScaleOut(reqCtx intctrlutil.RequestCtx, cli
}
)

if d := newDataClone(reqCtx, cli, c.Cluster, c.Component, stsObj, stsObj, snapshotKey); d != nil {
d, err := newDataClone(reqCtx, cli, c.Cluster, c.Component, stsObj, stsObj, snapshotKey)
if err != nil {
return err
}
if d != nil {
// clean backup resources.
// there will not be any backup resources other than scale out.
tmpObjs, err := d.clearTmpResources()
Expand Down

This file was deleted.

Loading

0 comments on commit 4a26fe6

Please sign in to comment.