diff --git a/pkg/csi/recover/recover.go b/pkg/csi/recover/recover.go index 238bcf22973..fad14cd8166 100644 --- a/pkg/csi/recover/recover.go +++ b/pkg/csi/recover/recover.go @@ -18,7 +18,6 @@ package recover import ( "context" - "fmt" "os" "strconv" "strings" @@ -32,11 +31,9 @@ import ( "github.com/golang/glog" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/rest" "k8s.io/client-go/tools/record" - podutil "k8s.io/kubernetes/pkg/api/v1/pod" k8sexec "k8s.io/utils/exec" "k8s.io/utils/mount" "sigs.k8s.io/controller-runtime/pkg/client" @@ -54,24 +51,14 @@ var _ manager.Runnable = &FuseRecover{} type FuseRecover struct { mount.SafeFormatAndMount - KubeClient client.Client - ApiReader client.Reader - KubeletClient *kubelet.KubeletClient - Recorder record.EventRecorder - - containers map[string]*containerStat // key: -- + KubeClient client.Client + ApiReader client.Reader + // KubeletClient *kubelet.KubeletClient + Recorder record.EventRecorder recoverFusePeriod time.Duration } -type containerStat struct { - name string - podName string - namespace string - daemonSetName string - startAt metav1.Time -} - func initializeKubeletClient() (*kubelet.KubeletClient, error) { // get CSI sa token tokenByte, err := os.ReadFile(serviceAccountTokenFile) @@ -124,11 +111,6 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api return nil, errors.Wrap(err, "got error when creating kubelet client") } - kubeletClient, err := initializeKubeletClient() - if err != nil { - return nil, errors.Wrap(err, "failed to initialize kubelet") - } - recoverFusePeriod := defaultFuseRecoveryPeriod if os.Getenv(FuseRecoveryPeriod) != "" { recoverFusePeriod, err = time.ParseDuration(os.Getenv(FuseRecoveryPeriod)) @@ -143,9 +125,7 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api }, KubeClient: kubeClient, ApiReader: apiReader, - KubeletClient: kubeletClient, Recorder: recorder, - containers: make(map[string]*containerStat), recoverFusePeriod: recoverFusePeriod, }, nil } @@ -166,27 +146,7 @@ func (r *FuseRecover) run(stopCh <-chan struct{}) { } func (r *FuseRecover) runOnce() { - pods, err := r.KubeletClient.GetNodeRunningPods() - glog.V(6).Info("get pods from kubelet") - if err != nil { - glog.Error(err) - return - } - for _, pod := range pods.Items { - glog.V(6).Infof("get pod: %s, namespace: %s", pod.Name, pod.Namespace) - if !utils.IsFusePod(pod) { - continue - } - if !podutil.IsPodReady(&pod) { - continue - } - glog.V(6).Infof("get fluid fuse pod: %s, namespace: %s", pod.Name, pod.Namespace) - if isRestarted := r.compareOrRecordContainerStat(pod); isRestarted { - glog.V(3).Infof("fuse pod restarted: %s, namespace: %s", pod.Name, pod.Namespace) - r.recover() - return - } - } + r.recover() } func (r FuseRecover) recover() { @@ -234,48 +194,6 @@ func (r *FuseRecover) umountDuplicate(point mountinfo.MountPoint) { } } -func (r *FuseRecover) compareOrRecordContainerStat(pod corev1.Pod) (restarted bool) { - if pod.Status.ContainerStatuses == nil || len(pod.OwnerReferences) == 0 { - return - } - var dsName string - for _, obj := range pod.OwnerReferences { - if obj.Kind == "DaemonSet" { - dsName = obj.Name - } - } - if dsName == "" { - return - } - for _, cn := range pod.Status.ContainerStatuses { - if cn.State.Running == nil { - continue - } - key := fmt.Sprintf("%s-%s-%s", cn.Name, dsName, pod.Namespace) - cs, ok := r.containers[key] - if !ok { - cs = &containerStat{ - name: cn.Name, - podName: pod.Name, - namespace: pod.Namespace, - daemonSetName: dsName, - startAt: cn.State.Running.StartedAt, - } - r.containers[key] = cs - continue - } - - if cs.startAt.Before(&cn.State.Running.StartedAt) { - glog.Infof("Container %s of pod %s in namespace %s start time is %s, but record %s", - cn.Name, pod.Name, pod.Namespace, cn.State.Running.StartedAt.String(), cs.startAt.String()) - r.containers[key].startAt = cn.State.Running.StartedAt - restarted = true - return - } - } - return -} - func (r *FuseRecover) eventRecord(point mountinfo.MountPoint, eventType, eventReason string) { namespacedName := point.NamespacedDatasetName strs := strings.Split(namespacedName, "-") diff --git a/pkg/csi/recover/recover_test.go b/pkg/csi/recover/recover_test.go index ea62630bf9f..c9f4ff0a690 100644 --- a/pkg/csi/recover/recover_test.go +++ b/pkg/csi/recover/recover_test.go @@ -17,23 +17,22 @@ limitations under the License. package recover import ( - "errors" "os" "reflect" "testing" - "time" . "github.com/agiledragon/gomonkey/v2" "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/dataset/volume" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" - "github.com/fluid-cloudnative/fluid/pkg/utils/kubelet" "github.com/fluid-cloudnative/fluid/pkg/utils/mountinfo" . "github.com/smartystreets/goconvey/convey" - v1 "k8s.io/api/core/v1" + corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" apimachineryRuntime "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" k8sexec "k8s.io/utils/exec" "k8s.io/utils/mount" @@ -42,31 +41,6 @@ import ( const testfuseRecoverPeriod = 30 -var mockPod = v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"role": "juicefs-fuse"}, - Name: "test-pod", - Namespace: "default", - OwnerReferences: []metav1.OwnerReference{{ - Kind: "DaemonSet", - Name: "test-juicefs-fuse", - }}, - }, - Spec: v1.PodSpec{}, - Status: v1.PodStatus{ - Conditions: []v1.PodCondition{{ - Type: v1.PodReady, - Status: v1.ConditionTrue, - }}, - ContainerStatuses: []v1.ContainerStatus{{ - Name: "test-container", - State: v1.ContainerState{Running: &v1.ContainerStateRunning{ - StartedAt: metav1.Time{Time: time.Now()}, - }}, - }}, - }, -} - func Test_initializeKubeletClient(t *testing.T) { Convey("Test_initializeKubeletClient", t, func() { Convey("initialize success with non-default kubelet timeout", func() { @@ -97,223 +71,64 @@ func Test_initializeKubeletClient(t *testing.T) { func TestRecover_run(t *testing.T) { Convey("TestRecover_run", t, func() { Convey("run success", func() { - kubeclient := &kubelet.KubeletClient{} - patch1 := ApplyMethod(reflect.TypeOf(kubeclient), "GetNodeRunningPods", func(_ *kubelet.KubeletClient) (*v1.PodList, error) { - return &v1.PodList{Items: []v1.Pod{mockPod}}, nil - }) - defer patch1.Reset() - patch2 := ApplyFunc(mountinfo.GetBrokenMountPoints, func() ([]mountinfo.MountPoint, error) { - return []mountinfo.MountPoint{{ - SourcePath: "/runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse", - MountPath: "/var/lib/kubelet/pods/1140aa96-18c2-4896-a14f-7e3965a51406/volumes/kubernetes.io~csi/default-jfsdemo/mount", - FilesystemType: "fuse.juicefs", - ReadOnly: false, - Count: 0, - NamespacedDatasetName: "default-jfsdemo", - }}, nil - }) - defer patch2.Reset() + dataset := &v1alpha1.Dataset{ + ObjectMeta: metav1.ObjectMeta{ + Name: "jfsdemo", + Namespace: "default", + }, + } + + s := apimachineryRuntime.NewScheme() + _ = v1alpha1.AddToScheme(s) + _ = corev1.AddToScheme(s) + fakeClient := fake.NewFakeClientWithScheme(s, dataset) + mockedFsMounts := map[string]string{} + + sourcePath := "/runtime-mnt/juicefs/default/jfsdemo/juicefs-fuse" + targetPath := "/var/lib/kubelet/pods/1140aa96-18c2-4896-a14f-7e3965a51406/volumes/kubernetes.io~csi/default-jfsdemo/mount" + + fakeMounter := &mount.FakeMounter{} r := &FuseRecover{ SafeFormatAndMount: mount.SafeFormatAndMount{ - Interface: &mount.FakeMounter{}, + Interface: fakeMounter, }, - KubeClient: fake.NewFakeClient(), - KubeletClient: kubeclient, + KubeClient: fakeClient, + ApiReader: fakeClient, Recorder: record.NewFakeRecorder(1), - containers: make(map[string]*containerStat), recoverFusePeriod: testfuseRecoverPeriod, } - r.runOnce() - }) - Convey("GetNodeRunningPods error", func() { - kubeclient := &kubelet.KubeletClient{} - patch1 := ApplyMethod(reflect.TypeOf(kubeclient), "GetNodeRunningPods", func(_ *kubelet.KubeletClient) (*v1.PodList, error) { - return &v1.PodList{}, errors.New("test") - }) - defer patch1.Reset() - patch2 := ApplyFunc(mountinfo.GetBrokenMountPoints, func() ([]mountinfo.MountPoint, error) { - return []mountinfo.MountPoint{}, nil - }) - defer patch2.Reset() - r := FuseRecover{ - SafeFormatAndMount: mount.SafeFormatAndMount{}, - KubeClient: fake.NewFakeClient(), - KubeletClient: &kubelet.KubeletClient{}, - Recorder: record.NewFakeRecorder(1), - } - r.runOnce() - }) - Convey("container restart", func() { - kubeclient := &kubelet.KubeletClient{} - patch1 := ApplyMethod(reflect.TypeOf(kubeclient), "GetNodeRunningPods", func(_ *kubelet.KubeletClient) (*v1.PodList, error) { - return &v1.PodList{Items: []v1.Pod{mockPod}}, nil + patch1 := ApplyMethod(reflect.TypeOf(fakeMounter), "Mount", func(_ *mount.FakeMounter, source string, target string, _ string, _ []string) error { + mockedFsMounts[source] = target + return nil }) defer patch1.Reset() + patch2 := ApplyFunc(mountinfo.GetBrokenMountPoints, func() ([]mountinfo.MountPoint, error) { - return []mountinfo.MountPoint{}, nil + return []mountinfo.MountPoint{{ + SourcePath: sourcePath, + MountPath: targetPath, + FilesystemType: "fuse.juicefs", + ReadOnly: false, + Count: 0, + NamespacedDatasetName: "default-jfsdemo", + }}, nil }) defer patch2.Reset() - r := &FuseRecover{ - SafeFormatAndMount: mount.SafeFormatAndMount{ - Interface: &mount.FakeMounter{}, - }, - KubeClient: fake.NewFakeClient(), - KubeletClient: kubeclient, - Recorder: record.NewFakeRecorder(1), - containers: make(map[string]*containerStat), - recoverFusePeriod: testfuseRecoverPeriod, - } + patch3 := ApplyFunc(volume.GetNamespacedNameByVolumeId, func(client client.Reader, volumeId string) (namespace, name string, err error) { + return "default", "jfsdemo", nil + }) + defer patch3.Reset() - r.containers = map[string]*containerStat{ - "test-container-test-juicefs-fuse-default": { - name: "test-container", - podName: "test-pod", - namespace: "default", - daemonSetName: "test-juicefs-fuse", - startAt: metav1.Time{ - Time: time.Now().Add(-1 * time.Minute), - }, - }, - } r.runOnce() - }) - }) -} -func TestFuseRecover_compareOrRecordContainerStat(t *testing.T) { - type fields struct { - key string - container *containerStat - } - type args struct { - pod v1.Pod - } - tests := []struct { - name string - fields fields - args args - wantRestarted bool - }{ - { - name: "test1", - fields: fields{ - key: "test-container-test-juicefs-fuse-default", - container: &containerStat{ - name: "test-container", - podName: "test-pod", - namespace: "default", - daemonSetName: "test-juicefs-fuse", - startAt: metav1.Time{ - Time: time.Now().Add(-1 * time.Minute), - }, - }, - }, - args: args{ - pod: mockPod, - }, - wantRestarted: true, - }, - { - name: "test2", - fields: fields{ - key: "test-container-test-juicefs-fuse-default", - container: &containerStat{ - name: "test-container", - podName: "test-pod", - namespace: "default", - daemonSetName: "test-juicefs-fuse", - startAt: metav1.Time{ - Time: time.Now(), - }, - }, - }, - args: args{ - pod: mockPod, - }, - wantRestarted: false, - }, - { - name: "test-nods", - fields: fields{}, - args: args{ - pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "test", - }, - }, - }, - wantRestarted: false, - }, - { - name: "test-cn-not-running", - fields: fields{ - key: "test-container-test-juicefs-fuse-default", - container: &containerStat{ - name: "test-container", - podName: "test-pod", - namespace: "default", - daemonSetName: "test-juicefs-fuse", - startAt: metav1.Time{ - Time: time.Now().Add(-1 * time.Minute), - }, - }, - }, - args: args{ - pod: v1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Labels: map[string]string{"role": "juicefs-fuse"}, - Name: "test-pod", - Namespace: "default", - OwnerReferences: []metav1.OwnerReference{{ - Kind: "DaemonSet", - Name: "test-juicefs-fuse", - }}, - }, - Spec: v1.PodSpec{}, - Status: v1.PodStatus{ - ContainerStatuses: []v1.ContainerStatus{{ - Name: "test-container", - State: v1.ContainerState{Terminated: &v1.ContainerStateTerminated{ - StartedAt: metav1.Time{Time: time.Now()}, - }}, - }}, - }}, - }, - wantRestarted: false, - }, - { - name: "test-no-container-record", - fields: fields{}, - args: args{ - pod: mockPod, - }, - wantRestarted: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - kubeletClient := &kubelet.KubeletClient{} - r := &FuseRecover{ - SafeFormatAndMount: mount.SafeFormatAndMount{ - Interface: &mount.FakeMounter{}, - }, - KubeClient: fake.NewFakeClient(), - KubeletClient: kubeletClient, - Recorder: record.NewFakeRecorder(1), - containers: make(map[string]*containerStat), - recoverFusePeriod: testfuseRecoverPeriod, - } - if tt.fields.container != nil { - r.containers[tt.fields.key] = tt.fields.container - } - if gotRestarted := r.compareOrRecordContainerStat(tt.args.pod); gotRestarted != tt.wantRestarted { - t.Errorf("compareOrRecordContainerStat() = %v, want %v", gotRestarted, tt.wantRestarted) + if target, exists := mockedFsMounts[sourcePath]; !exists || target != targetPath { + t.Errorf("failed to recover mount point") } }) - } + }) } func TestFuseRecover_umountDuplicate(t *testing.T) { @@ -386,8 +201,8 @@ func TestFuseRecover_recoverBrokenMount(t *testing.T) { func TestFuseRecover_eventRecord(t *testing.T) { type fields struct { - containers map[string]*containerStat - dataset *v1alpha1.Dataset + dataset *v1alpha1.Dataset + pv *corev1.PersistentVolume } type args struct { point mountinfo.MountPoint @@ -408,6 +223,11 @@ func TestFuseRecover_eventRecord(t *testing.T) { Namespace: "default", }, }, + pv: &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "default-jfsdemo", + }, + }, }, args: args{ point: mountinfo.MountPoint{ @@ -418,7 +238,7 @@ func TestFuseRecover_eventRecord(t *testing.T) { Count: 0, NamespacedDatasetName: "default-jfsdemo", }, - eventType: v1.EventTypeNormal, + eventType: corev1.EventTypeNormal, eventReason: common.FuseRecoverSucceed, }, }, @@ -431,6 +251,11 @@ func TestFuseRecover_eventRecord(t *testing.T) { Namespace: "default", }, }, + pv: &corev1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: "other-pv", + }, + }, }, args: args{ point: mountinfo.MountPoint{ @@ -441,7 +266,7 @@ func TestFuseRecover_eventRecord(t *testing.T) { Count: 0, NamespacedDatasetName: "jfsdemo", }, - eventType: v1.EventTypeNormal, + eventType: corev1.EventTypeNormal, eventReason: common.FuseRecoverSucceed, }, }, @@ -449,15 +274,15 @@ func TestFuseRecover_eventRecord(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { s := apimachineryRuntime.NewScheme() - s.AddKnownTypes(v1alpha1.GroupVersion, tt.fields.dataset) - fakeClient := fake.NewFakeClientWithScheme(s, tt.fields.dataset) + _ = v1alpha1.AddToScheme(s) + _ = scheme.AddToScheme(s) + fakeClient := fake.NewFakeClientWithScheme(s, tt.fields.dataset, tt.fields.pv) r := &FuseRecover{ - KubeClient: fakeClient, - ApiReader: fakeClient, - KubeletClient: nil, - Recorder: record.NewFakeRecorder(1), - containers: tt.fields.containers, + KubeClient: fakeClient, + ApiReader: fakeClient, + Recorder: record.NewFakeRecorder(1), } + r.eventRecord(tt.args.point, tt.args.eventType, tt.args.eventReason) }) } @@ -472,8 +297,6 @@ func TestNewFuseRecover(t *testing.T) { fakeClient := fake.NewFakeClient() fakeRecorder := record.NewFakeRecorder(1) - fakeKubeletClient := &kubelet.KubeletClient{} - fakeContainersMap := make(map[string]*containerStat) tests := []struct { name string @@ -495,9 +318,7 @@ func TestNewFuseRecover(t *testing.T) { }, KubeClient: fakeClient, ApiReader: fakeClient, - KubeletClient: fakeKubeletClient, Recorder: fakeRecorder, - containers: fakeContainersMap, recoverFusePeriod: defaultFuseRecoveryPeriod, }, wantErr: false, @@ -508,11 +329,6 @@ func TestNewFuseRecover(t *testing.T) { t.Setenv(utils.MountRoot, "/runtime-mnt") t.Setenv(FuseRecoveryPeriod, tt.args.recoverFusePeriod) - patch := ApplyFunc(initializeKubeletClient, func() (*kubelet.KubeletClient, error) { - return fakeKubeletClient, nil - }) - defer patch.Reset() - got, err := NewFuseRecover(tt.args.kubeClient, tt.args.recorder, tt.args.kubeClient) if (err != nil) != tt.wantErr { t.Errorf("NewFuseRecover() error = %v, wantErr %v", err, tt.wantErr)