Skip to content

Commit

Permalink
fix: drop followers only when scaling in (apecloud#3130)
Browse files Browse the repository at this point in the history
  • Loading branch information
lynnleelhl authored May 8, 2023
1 parent fd77429 commit 8b24b66
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 141 deletions.
9 changes: 0 additions & 9 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,15 +393,6 @@ func main() {
os.Exit(1)
}

if err = (&components.PodReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Recorder: mgr.GetEventRecorderFor("pod-controller"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Pod")
os.Exit(1)
}

if err = (&appscontrollers.ComponentClassReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Expand Down
17 changes: 16 additions & 1 deletion controllers/apps/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

snapshotv1 "github.com/kubernetes-csi/external-snapshotter/client/v6/apis/volumesnapshot/v1"
"github.com/spf13/viper"
Expand Down Expand Up @@ -411,6 +413,7 @@ var _ = Describe("Cluster Controller", func() {
Name: stsName + "-" + strconv.Itoa(i),
Namespace: testCtx.DefaultNamespace,
Labels: map[string]string{
constant.AppManagedByLabelKey: constant.AppName,
constant.AppInstanceLabelKey: clusterName,
constant.KBAppComponentLabelKey: componentName,
appsv1.ControllerRevisionHashLabelKey: "mock-version",
Expand Down Expand Up @@ -942,9 +945,10 @@ var _ = Describe("Cluster Controller", func() {
sts = &stsList.Items[0]
}).Should(Succeed())

By("Creating mock pods in StatefulSet")
By("Creating mock pods in StatefulSet, and set controller reference")
pods := mockPodsForConsensusTest(clusterObj, replicas)
for _, pod := range pods {
Expect(controllerutil.SetControllerReference(sts, &pod, scheme.Scheme)).Should(Succeed())
Expect(testCtx.CreateObj(testCtx.Ctx, &pod)).Should(Succeed())
// mock the status to pass the isReady(pod) check in consensus_set
pod.Status.Conditions = []corev1.PodCondition{{
Expand Down Expand Up @@ -982,6 +986,17 @@ var _ = Describe("Cluster Controller", func() {
g.Expect(followerCount).Should(Equal(2))
}).Should(Succeed())

By("Checking pods' annotations")
Eventually(func(g Gomega) {
pods, err := util.GetPodListByStatefulSet(ctx, k8sClient, sts)
g.Expect(err).ShouldNot(HaveOccurred())
g.Expect(len(pods)).Should(Equal(int(*sts.Spec.Replicas)))
for _, pod := range pods {
g.Expect(pod.Annotations).ShouldNot(BeNil())
g.Expect(pod.Annotations[constant.ComponentReplicasAnnotationKey]).Should(Equal(strconv.Itoa(int(*sts.Spec.Replicas))))
}
}, time.Second*100).Should(Succeed())

By("Updating StatefulSet's status")
sts.Status.UpdateRevision = "mock-version"
sts.Status.Replicas = int32(replicas)
Expand Down
33 changes: 33 additions & 0 deletions controllers/apps/components/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ package components

import (
"context"
"strconv"
"time"

"golang.org/x/exp/slices"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -184,3 +186,34 @@ func patchWorkloadCustomLabel(
}
return nil
}

func updateComponentInfoToPods(
ctx context.Context,
cli client.Client,
cluster *appsv1alpha1.Cluster,
componentSpec *appsv1alpha1.ClusterComponentSpec) error {
ml := client.MatchingLabels{
constant.AppInstanceLabelKey: cluster.GetName(),
constant.KBAppComponentLabelKey: componentSpec.Name,
}
podList := corev1.PodList{}
if err := cli.List(ctx, &podList, ml); err != nil {
return err
}
replicasStr := strconv.Itoa(int(componentSpec.Replicas))
for _, pod := range podList.Items {
if pod.Annotations != nil &&
pod.Annotations[constant.ComponentReplicasAnnotationKey] == replicasStr {
continue
}
patch := client.MergeFrom(pod.DeepCopy())
if pod.Annotations == nil {
pod.Annotations = make(map[string]string)
}
pod.Annotations[constant.ComponentReplicasAnnotationKey] = replicasStr
if err := cli.Patch(ctx, &pod, patch); err != nil {
return err
}
}
return nil
}
6 changes: 6 additions & 0 deletions controllers/apps/components/deployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (r *DeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return workloadCompClusterReconcile(reqCtx, r.Client, deploy,
func(cluster *appsv1alpha1.Cluster, componentSpec *appsv1alpha1.ClusterComponentSpec, component types.Component) (ctrl.Result, error) {
compCtx := newComponentContext(reqCtx, r.Client, r.Recorder, component, deploy, componentSpec)
// update component info to pods' annotations
if err := updateComponentInfoToPods(reqCtx.Ctx, r.Client, cluster, componentSpec); err != nil {
reqCtx.Recorder.Event(cluster, corev1.EventTypeWarning, "StatefulSet Deploy updateComponentInfoToPods Failed", err.Error())
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
// patch the current componentSpec workload's custom labels
if err := patchWorkloadCustomLabel(reqCtx.Ctx, r.Client, cluster, componentSpec); err != nil {
reqCtx.Recorder.Event(cluster, corev1.EventTypeWarning, "Deployment Controller PatchWorkloadCustomLabelFailed", err.Error())
Expand All @@ -96,6 +101,7 @@ func (r *DeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&appsv1.Deployment{}).
Owns(&appsv1.ReplicaSet{}).
Owns(&corev1.Pod{}).
WithEventFilter(predicate.NewPredicateFuncs(intctrlutil.WorkloadFilterPredicate)).
Named("deployment-watcher").
Complete(r)
Expand Down
124 changes: 0 additions & 124 deletions controllers/apps/components/pod_controller.go

This file was deleted.

5 changes: 5 additions & 0 deletions controllers/apps/components/stateful_set_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (r *StatefulSetReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return workloadCompClusterReconcile(reqCtx, r.Client, sts,
func(cluster *appsv1alpha1.Cluster, componentSpec *appsv1alpha1.ClusterComponentSpec, component types.Component) (ctrl.Result, error) {
compCtx := newComponentContext(reqCtx, r.Client, r.Recorder, component, sts, componentSpec)
// update component info to pods' annotations
if err := updateComponentInfoToPods(reqCtx.Ctx, r.Client, cluster, componentSpec); err != nil {
reqCtx.Recorder.Event(cluster, corev1.EventTypeWarning, "StatefulSet Controller updateComponentInfoToPods Failed", err.Error())
return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "")
}
// patch the current componentSpec workload's custom labels
if err := patchWorkloadCustomLabel(reqCtx.Ctx, r.Client, cluster, componentSpec); err != nil {
reqCtx.Recorder.Event(cluster, corev1.EventTypeWarning, "StatefulSet Controller PatchWorkloadCustomLabelFailed", err.Error())
Expand Down
3 changes: 3 additions & 0 deletions deploy/apecloud-mysql/templates/clusterdefinition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,9 @@ spec:
- path: "leader"
fieldRef:
fieldPath: metadata.annotations['cs.apps.kubeblocks.io/leader']
- path: "component-replicas"
fieldRef:
fieldPath: metadata.annotations['apps.kubeblocks.io/component-replicas']
systemAccounts:
cmdExecutorConfig:
image: {{ .Values.image.registry | default "docker.io" }}/{{ .Values.image.repository }}:{{ .Values.image.tag }}
Expand Down
32 changes: 25 additions & 7 deletions deploy/apecloud-mysql/templates/scripts.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ data:
idx=${KB_POD_NAME##*-}
host=$(eval echo \$KB_MYSQL_"$idx"_HOSTNAME)
echo "host=$host"
# update replicas to persistent file
component_replicas_path=/data/mysql/.kb_component_replicas
current_component_replicas=`cat /etc/annotations/component-replicas`
echo $current_component_replicas > $component_replicas_path
if [ -z "$leader" -o "$KB_POD_NAME" = "$leader" ]; then
echo "no leader or self is leader, no need to call add."
else
Expand Down Expand Up @@ -70,7 +74,7 @@ data:
mkdir -p /data/mysql/data /data/mysql/log
chmod +777 -R /data/mysql;
echo "KB_MYSQL_CLUSTER_UID=$KB_MYSQL_CLUSTER_UID"
cluster_uid_path=/data/mysql/data/.kb_cluster_uid
cluster_uid_path=/data/mysql/.kb_cluster_uid
if [ -f $cluster_uid_path ] && [ ! -f /data/mysql/data/.restore_new_cluster ]; then
last_cluster_uid=`cat $cluster_uid_path`
if [ "$last_cluster_uid" != "$KB_MYSQL_CLUSTER_UID" ]; then
Expand Down Expand Up @@ -155,16 +159,17 @@ data:
done
pre-stop.sh: |
#!/bin/bash
drop_followers() {
leader=`cat /etc/annotations/leader`
echo "leader=$leader"
echo "KB_POD_NAME=$KB_POD_NAME"
echo "leader=$leader" >> /data/mysql/.kb_pre_stop.log
echo "KB_POD_NAME=$KB_POD_NAME" >> /data/mysql/.kb_pre_stop.log
if [ -z "$leader" -o "$KB_POD_NAME" = "$leader" ]; then
echo "no leader or self is leader, exit"
echo "no leader or self is leader, exit" >> /data/mysql/.kb_pre_stop.log
exit 0
fi
idx=${KB_POD_NAME##*-}
host=$(eval echo \$KB_MYSQL_"$idx"_HOSTNAME)
echo "host=$host"
echo "host=$host" >> /data/mysql/.kb_pre_stop.log
leader_idx=${leader##*-}
leader_host=$(eval echo \$KB_MYSQL_"$leader_idx"_HOSTNAME)
if [ ! -z $leader_host ]; then
Expand All @@ -173,7 +178,20 @@ data:
if [ ! -z $MYSQL_ROOT_PASSWORD ]; then
password_flag="-p$MYSQL_ROOT_PASSWORD"
fi
echo "mysql $host_flag -uroot $password_flag -e \"call dbms_consensus.downgrade_follower('$host:13306');\" 2>&1 "
echo "mysql $host_flag -uroot $password_flag -e \"call dbms_consensus.downgrade_follower('$host:13306');\" 2>&1 " >> /data/mysql/.kb_pre_stop.log
mysql $host_flag -uroot $password_flag -e "call dbms_consensus.downgrade_follower('$host:13306');" 2>&1
echo "mysql $host_flag -uroot $password_flag -e \"call dbms_consensus.drop_learner('$host:13306');\" 2>&1 "
echo "mysql $host_flag -uroot $password_flag -e \"call dbms_consensus.drop_learner('$host:13306');\" 2>&1 " >> /data/mysql/.kb_pre_stop.log
mysql $host_flag -uroot $password_flag -e "call dbms_consensus.drop_learner('$host:13306');" 2>&1
}
component_replicas_path=/data/mysql/.kb_component_replicas
current_component_replicas=`cat /etc/annotations/component-replicas`
if [ -f $component_replicas_path ]; then
last_component_replicas=`cat $component_replicas_path`
# check is scaling in but not scaling in to 0
if [ "$last_component_replicas" -gt "$current_component_replicas" ] && [ $current_component_replicas -ne 0 ]; then
# only scaling in need to drop followers
drop_followers
else
echo "no need to drop followers"
fi
fi
1 change: 1 addition & 0 deletions internal/constant/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ const (
RestoreFromBackUpAnnotationKey = "kubeblocks.io/restore-from-backup" // RestoreFromBackUpAnnotationKey specifies the component to recover from the backup.
ClusterSnapshotAnnotationKey = "kubeblocks.io/cluster-snapshot" // ClusterSnapshotAnnotationKey saves the snapshot of cluster.
LeaderAnnotationKey = "cs.apps.kubeblocks.io/leader"
ComponentReplicasAnnotationKey = "apps.kubeblocks.io/component-replicas" // ComponentReplicasAnnotationKey specifies the number of pods in replicas
DefaultBackupPolicyAnnotationKey = "dataprotection.kubeblocks.io/is-default-policy" // DefaultBackupPolicyAnnotationKey specifies the default backup policy.
DefaultBackupPolicyTemplateAnnotationKey = "dataprotection.kubeblocks.io/is-default-policy-template" // DefaultBackupPolicyTemplateAnnotationKey specifies the default backup policy template.
BackupDataPathPrefixAnnotationKey = "dataprotection.kubeblocks.io/path-prefix" // BackupDataPathPrefixAnnotationKey specifies the backup data path prefix.
Expand Down

0 comments on commit 8b24b66

Please sign in to comment.