Skip to content

Commit

Permalink
Fix CSI FUSE Recovery (fluid-cloudnative#2747)
Browse files Browse the repository at this point in the history
* Avoid checking container status before FUSE recovery

Signed-off-by: dongyun.xzh <[email protected]>

* Fix unit tests

Signed-off-by: dongyun.xzh <[email protected]>

* Remove container stats

Signed-off-by: dongyun.xzh <[email protected]>

* Fix FUSE Recover test

Signed-off-by: dongyun.xzh <[email protected]>

---------

Signed-off-by: dongyun.xzh <[email protected]>
  • Loading branch information
TrafalgarZZZ authored Mar 20, 2023
1 parent b8870fb commit 09fabaf
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 334 deletions.
92 changes: 5 additions & 87 deletions pkg/csi/recover/recover.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package recover

import (
"context"
"fmt"
"os"
"strconv"
"strings"
Expand All @@ -32,11 +31,9 @@ import (
"github.com/golang/glog"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
k8sexec "k8s.io/utils/exec"
"k8s.io/utils/mount"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -54,24 +51,14 @@ var _ manager.Runnable = &FuseRecover{}

type FuseRecover struct {
mount.SafeFormatAndMount
KubeClient client.Client
ApiReader client.Reader
KubeletClient *kubelet.KubeletClient
Recorder record.EventRecorder

containers map[string]*containerStat // key: <containerName>-<daemonSetName>-<namespace>
KubeClient client.Client
ApiReader client.Reader
// KubeletClient *kubelet.KubeletClient
Recorder record.EventRecorder

recoverFusePeriod time.Duration
}

type containerStat struct {
name string
podName string
namespace string
daemonSetName string
startAt metav1.Time
}

func initializeKubeletClient() (*kubelet.KubeletClient, error) {
// get CSI sa token
tokenByte, err := os.ReadFile(serviceAccountTokenFile)
Expand Down Expand Up @@ -124,11 +111,6 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api
return nil, errors.Wrap(err, "got error when creating kubelet client")
}

kubeletClient, err := initializeKubeletClient()
if err != nil {
return nil, errors.Wrap(err, "failed to initialize kubelet")
}

recoverFusePeriod := defaultFuseRecoveryPeriod
if os.Getenv(FuseRecoveryPeriod) != "" {
recoverFusePeriod, err = time.ParseDuration(os.Getenv(FuseRecoveryPeriod))
Expand All @@ -143,9 +125,7 @@ func NewFuseRecover(kubeClient client.Client, recorder record.EventRecorder, api
},
KubeClient: kubeClient,
ApiReader: apiReader,
KubeletClient: kubeletClient,
Recorder: recorder,
containers: make(map[string]*containerStat),
recoverFusePeriod: recoverFusePeriod,
}, nil
}
Expand All @@ -166,27 +146,7 @@ func (r *FuseRecover) run(stopCh <-chan struct{}) {
}

func (r *FuseRecover) runOnce() {
pods, err := r.KubeletClient.GetNodeRunningPods()
glog.V(6).Info("get pods from kubelet")
if err != nil {
glog.Error(err)
return
}
for _, pod := range pods.Items {
glog.V(6).Infof("get pod: %s, namespace: %s", pod.Name, pod.Namespace)
if !utils.IsFusePod(pod) {
continue
}
if !podutil.IsPodReady(&pod) {
continue
}
glog.V(6).Infof("get fluid fuse pod: %s, namespace: %s", pod.Name, pod.Namespace)
if isRestarted := r.compareOrRecordContainerStat(pod); isRestarted {
glog.V(3).Infof("fuse pod restarted: %s, namespace: %s", pod.Name, pod.Namespace)
r.recover()
return
}
}
r.recover()
}

func (r FuseRecover) recover() {
Expand Down Expand Up @@ -234,48 +194,6 @@ func (r *FuseRecover) umountDuplicate(point mountinfo.MountPoint) {
}
}

func (r *FuseRecover) compareOrRecordContainerStat(pod corev1.Pod) (restarted bool) {
if pod.Status.ContainerStatuses == nil || len(pod.OwnerReferences) == 0 {
return
}
var dsName string
for _, obj := range pod.OwnerReferences {
if obj.Kind == "DaemonSet" {
dsName = obj.Name
}
}
if dsName == "" {
return
}
for _, cn := range pod.Status.ContainerStatuses {
if cn.State.Running == nil {
continue
}
key := fmt.Sprintf("%s-%s-%s", cn.Name, dsName, pod.Namespace)
cs, ok := r.containers[key]
if !ok {
cs = &containerStat{
name: cn.Name,
podName: pod.Name,
namespace: pod.Namespace,
daemonSetName: dsName,
startAt: cn.State.Running.StartedAt,
}
r.containers[key] = cs
continue
}

if cs.startAt.Before(&cn.State.Running.StartedAt) {
glog.Infof("Container %s of pod %s in namespace %s start time is %s, but record %s",
cn.Name, pod.Name, pod.Namespace, cn.State.Running.StartedAt.String(), cs.startAt.String())
r.containers[key].startAt = cn.State.Running.StartedAt
restarted = true
return
}
}
return
}

func (r *FuseRecover) eventRecord(point mountinfo.MountPoint, eventType, eventReason string) {
namespacedName := point.NamespacedDatasetName
strs := strings.Split(namespacedName, "-")
Expand Down
Loading

0 comments on commit 09fabaf

Please sign in to comment.