Skip to content

Commit

Permalink
refactor DataMigrateReconcile to use DataOperationReconcile (fluid-cl…
Browse files Browse the repository at this point in the history
…oudnative#2813)

* refactor datamigrate using dataoperation

Signed-off-by: xliu1992 <[email protected]>

* fix first test error

Signed-off-by: xliuqq <[email protected]>

* rename test name

Signed-off-by: xliuqq <[email protected]>

* fix update typo, add dataset info when not found

Signed-off-by: xliuqq <[email protected]>

* fix typo, fix update status when runtime not support operation

Signed-off-by: xliuqq <[email protected]>

---------

Signed-off-by: xliu1992 <[email protected]>
Signed-off-by: xliuqq <[email protected]>
  • Loading branch information
xliuqq authored Apr 13, 2023
1 parent 8ca3871 commit f3fd35e
Show file tree
Hide file tree
Showing 15 changed files with 244 additions and 515 deletions.
6 changes: 1 addition & 5 deletions api/v1alpha1/dataset_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,5 @@ func (dataset *Dataset) SetDataOperationInProgress(operationType string, name st

// RemoveDataOperationInProgress release Dataset for operation
func (dataset *Dataset) RemoveDataOperationInProgress(operationType string) {
if dataset.Status.OperationRef == nil {
return
}

dataset.Status.OperationRef[operationType] = ""
delete(dataset.Status.OperationRef, operationType)
}
16 changes: 6 additions & 10 deletions pkg/controllers/operation_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -80,7 +81,8 @@ func (o *OperationReconciler) ReconcileDeletion(ctx dataoperation.ReconcileReque

// 2. Release lock on target dataset if necessary
err = base.ReleaseTargetDataset(ctx.ReconcileRequestContext, object, o.implement)
if err != nil {
// ignore the not found error, as dataset can be deleted first, then the data operation will be deleted by owner reference.
if utils.IgnoreNotFound(err) != nil {
log.Error(err, "can't release lock on target dataset")
return utils.RequeueIfError(err)
}
Expand Down Expand Up @@ -116,17 +118,11 @@ func (o *OperationReconciler) ReconcileInternal(ctx dataoperation.ReconcileReque
}

// 2. set target dataset
targetDatasetNamespacedName, err := o.implement.GetTargetDatasetNamespacedName(object)
if err != nil {
ctx.Log.Error(err, "Failed to get the ddc dataset namespace and name")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset"))
}

targetDataset, err := utils.GetDataset(o.Client, targetDatasetNamespacedName.Name, targetDatasetNamespacedName.Namespace)
targetDataset, err := o.implement.GetTargetDataset(object)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("The dataset is not found", "dataset", targetDatasetNamespacedName)
// no dataset means no metadata, not necessary to Reconcile
statusError := err.(*apierrors.StatusError)
ctx.Log.Info("The dataset is not found", "dataset", statusError.Status().Details.Name)
return utils.RequeueAfterInterval(20 * time.Second)
} else {
ctx.Log.Error(err, "Failed to get the ddc dataset")
Expand Down
7 changes: 2 additions & 5 deletions pkg/controllers/v1alpha1/databackup/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,15 @@ func (r *DataBackupReconciler) GetOperationType() dataoperation.OperationType {
return dataoperation.DataBackup
}

func (r *DataBackupReconciler) GetTargetDatasetNamespacedName(object client.Object) (*types.NamespacedName, error) {
func (r *DataBackupReconciler) GetTargetDataset(object client.Object) (*v1alpha1.Dataset, error) {
typeObject, ok := object.(*v1alpha1.DataBackup)
if !ok {
return nil, fmt.Errorf("object %v is not a DataBackup", object)
}

targetDataBackup := *typeObject

return &types.NamespacedName{
Name: targetDataBackup.Spec.Dataset,
Namespace: object.GetNamespace(),
}, nil
return utils.GetDataset(r.Client, targetDataBackup.Spec.Dataset, targetDataBackup.Namespace)
}

func (r *DataBackupReconciler) GetReleaseNameSpacedName(object client.Object) types.NamespacedName {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/v1alpha1/dataload/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (r *DataLoadReconcilerImplement) releaseLockOnTargetDataset(ctx cruntime.Re
}
datasetToUpdate := dataset.DeepCopy()
datasetToUpdate.RemoveDataOperationInProgress(cdataload.DataLoadLockName)
if !reflect.DeepEqual(datasetToUpdate.Status, dataset) {
if !reflect.DeepEqual(datasetToUpdate.Status, dataset.Status) {
if err := r.Status().Update(ctx, datasetToUpdate); err != nil {
return err
}
Expand Down
189 changes: 20 additions & 169 deletions pkg/controllers/v1alpha1/datamigrate/datamigrate_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,11 @@ package datamigrate

import (
"context"
"fmt"
"sync"
"time"

"github.com/fluid-cloudnative/fluid/pkg/controllers"
"github.com/fluid-cloudnative/fluid/pkg/dataoperation"
"github.com/go-logr/logr"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -36,21 +31,16 @@ import (
datav1alpha1 "github.com/fluid-cloudnative/fluid/api/v1alpha1"
"github.com/fluid-cloudnative/fluid/pkg/common"
cdatamigrate "github.com/fluid-cloudnative/fluid/pkg/datamigrate"
"github.com/fluid-cloudnative/fluid/pkg/ddc"
"github.com/fluid-cloudnative/fluid/pkg/ddc/base"
cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime"
"github.com/fluid-cloudnative/fluid/pkg/utils"
"github.com/fluid-cloudnative/fluid/pkg/utils/jindo"
)

const controllerName string = "DataMigrateReconciler"

// DataMigrateReconciler reconciles a DataMigrate object
type DataMigrateReconciler struct {
Scheme *runtime.Scheme
engines map[string]base.Engine
mutex *sync.Mutex
*DataMigrateReconcilerImplement
Scheme *runtime.Scheme
*controllers.OperationReconciler
}

// NewDataMigrateReconciler returns a DataMigrateReconciler
Expand All @@ -59,28 +49,29 @@ func NewDataMigrateReconciler(client client.Client,
scheme *runtime.Scheme,
recorder record.EventRecorder) *DataMigrateReconciler {
r := &DataMigrateReconciler{
Scheme: scheme,
mutex: &sync.Mutex{},
engines: map[string]base.Engine{},
Scheme: scheme,
}
r.DataMigrateReconcilerImplement = NewDataMigrateReconcilerImplement(client, log, recorder)
r.OperationReconciler = controllers.NewDataOperationReconciler(r, client, log, recorder)
return r
}

// +kubebuilder:rbac:groups=data.fluid.io,resources=datamigrates,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=data.fluid.io,resources=datamigrates/status,verbs=get;update;patch
// Reconcile reconciles the DataMigrate object
func (r *DataMigrateReconciler) Reconcile(context context.Context, req ctrl.Request) (ctrl.Result, error) {
ctx := cruntime.ReconcileRequestContext{
Context: context,
Log: r.Log.WithValues("datamigrate", req.NamespacedName),
Recorder: r.Recorder,
Client: r.Client,
Category: common.AccelerateCategory,
ctx := dataoperation.ReconcileRequestContext{
// used for create engine
ReconcileRequestContext: cruntime.ReconcileRequestContext{
Context: context,
Log: r.Log.WithValues(string(r.GetOperationType()), req.NamespacedName),
Recorder: r.Recorder,
Client: r.Client,
Category: common.AccelerateCategory,
},
DataOpFinalizerName: cdatamigrate.DataMigrateFinalizer,
}

// 1. Get DataMigrate object
dataMigrate, err := utils.GetDataMigrate(r.Client, req.Name, req.Namespace)
targetDataMigrate, err := utils.GetDataMigrate(r.Client, req.Name, req.Namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("DataMigrate not found")
Expand All @@ -90,128 +81,10 @@ func (r *DataMigrateReconciler) Reconcile(context context.Context, req ctrl.Requ
return utils.RequeueIfError(errors.Wrap(err, "failed to get DataMigrate info"))
}
}
ctx.DataObject = targetDataMigrate
ctx.OpStatus = &targetDataMigrate.Status

targetDataMigrate := *dataMigrate
ctx.Log.V(1).Info("dataMigrate found", "detail", dataMigrate)

// 2. Reconcile deletion of the object if necessary
if utils.HasDeletionTimestamp(dataMigrate.ObjectMeta) {
return r.ReconcileDataMigrateDeletion(ctx, targetDataMigrate, r.engines, r.mutex)
}

// 3. get target dataset
targetDataset, err := utils.GetTargetDatasetOfMigrate(r.Client, targetDataMigrate)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("can't find target dataset", "dataMigrate", targetDataMigrate.Name)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.TargetDatasetNotFound,
"Target dataset not found")
return utils.RequeueAfterInterval(20 * time.Second)
}
// other error
ctx.Log.Error(err, "Failed to get the ddc dataset")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get dataset"))
}
ctx.Dataset = targetDataset
ctx.NamespacedName = types.NamespacedName{
Name: targetDataset.Name,
Namespace: targetDataset.Namespace,
}

// 4. get the runtime
index, boundedRuntime := utils.GetRuntimeByCategory(targetDataset.Status.Runtimes, common.AccelerateCategory)
if index == -1 {
ctx.Log.Info("bounded runtime with Accelerate Category is not found on the target dataset", "targetDataset", targetDataset)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.RuntimeNotReady,
"Bounded accelerate runtime not ready")
return utils.RequeueAfterInterval(20 * time.Second)
}
if targetDataMigrate.Spec.RuntimeType != "" && targetDataMigrate.Spec.RuntimeType != boundedRuntime.Type {
err = fmt.Errorf("the runtime type of the target dataset is %s, but the runtime type of the dataMigrate is %s", boundedRuntime.Type, targetDataMigrate.Spec.RuntimeType)
return utils.RequeueIfError(errors.Wrap(err, "Unable to get ddc runtime"))
}
ctx.RuntimeType = boundedRuntime.Type

var fluidRuntime client.Object
switch ctx.RuntimeType {
case common.AlluxioRuntime:
fluidRuntime, err = utils.GetAlluxioRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
case common.JindoRuntime:
fluidRuntime, err = utils.GetJindoRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
ctx.RuntimeType = jindo.GetRuntimeType()
case common.GooseFSRuntime:
fluidRuntime, err = utils.GetGooseFSRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
case common.JuiceFSRuntime:
fluidRuntime, err = utils.GetJuiceFSRuntime(ctx.Client, boundedRuntime.Name, boundedRuntime.Namespace)
default:
ctx.Log.Error(fmt.Errorf("RuntimeNotSupported"), "The runtime is not supported yet", "runtime", boundedRuntime)
r.Recorder.Eventf(&targetDataMigrate,
v1.EventTypeNormal,
common.RuntimeNotReady,
"Bounded accelerate runtime not supported")
}

if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.V(1).Info("The runtime is not found", "runtime", ctx.NamespacedName)
return ctrl.Result{}, nil
} else {
ctx.Log.Error(err, "Failed to get the ddc runtime")
return utils.RequeueIfError(errors.Wrap(err, "Unable to get ddc runtime"))
}
}
ctx.Runtime = fluidRuntime
ctx.Log.V(1).Info("get the runtime", "runtime", ctx.Runtime)

// 5. create or get engine
engine, err := r.GetOrCreateEngine(ctx)
if err != nil {
r.Recorder.Eventf(&targetDataMigrate, v1.EventTypeWarning, common.ErrorProcessDatasetReason, "Process dataMigrate error %v", err)
return utils.RequeueIfError(errors.Wrap(err, "Failed to create or get engine"))
}

// 6. add finalizer and requeue
if !utils.ContainsString(targetDataMigrate.ObjectMeta.GetFinalizers(), cdatamigrate.DataMigrateFinalizer) {
return r.addFinalizerAndRequeue(ctx, targetDataMigrate)
}

// 7. add owner and requeue
if !utils.ContainsOwners(targetDataMigrate.GetOwnerReferences(), targetDataset) {
return r.AddOwnerAndRequeue(ctx, targetDataMigrate, targetDataset)
}

return r.ReconcileDataMigrate(ctx, targetDataMigrate, engine)
}

// AddOwnerAndRequeue adds Owner and requeue
func (r *DataMigrateReconciler) AddOwnerAndRequeue(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate, targetDataset *datav1alpha1.Dataset) (ctrl.Result, error) {
targetDataMigrate.ObjectMeta.OwnerReferences = append(targetDataMigrate.GetOwnerReferences(), metav1.OwnerReference{
APIVersion: targetDataset.APIVersion,
Kind: targetDataset.Kind,
Name: targetDataset.Name,
UID: targetDataset.UID,
})
if err := r.Update(ctx, &targetDataMigrate); err != nil {
ctx.Log.Error(err, "Failed to add ownerreference", "StatusUpdateError", ctx)
return utils.RequeueIfError(err)
}

return utils.RequeueImmediately()
}

func (r *DataMigrateReconciler) addFinalizerAndRequeue(ctx cruntime.ReconcileRequestContext, targetDataMigrate datav1alpha1.DataMigrate) (ctrl.Result, error) {
targetDataMigrate.ObjectMeta.Finalizers = append(targetDataMigrate.ObjectMeta.Finalizers, cdatamigrate.DataMigrateFinalizer)
ctx.Log.Info("Add finalizer and requeue", "finalizer", cdatamigrate.DataMigrateFinalizer)
prevGeneration := targetDataMigrate.ObjectMeta.GetGeneration()
if err := r.Update(ctx, &targetDataMigrate); err != nil {
ctx.Log.Error(err, "failed to add finalizer to dataMigrate", "StatusUpdateError", err)
return utils.RequeueIfError(err)
}
return utils.RequeueImmediatelyUnlessGenerationChanged(prevGeneration, targetDataMigrate.ObjectMeta.GetGeneration())
return r.ReconcileInternal(ctx)
}

// SetupWithManager sets up the controller with the given controller manager
Expand All @@ -222,28 +95,6 @@ func (r *DataMigrateReconciler) SetupWithManager(mgr ctrl.Manager, options contr
Complete(r)
}

// GetOrCreateEngine gets the Engine
func (r *DataMigrateReconciler) GetOrCreateEngine(
ctx cruntime.ReconcileRequestContext) (engine base.Engine, err error) {
found := false
id := ddc.GenerateEngineID(ctx.NamespacedName)
r.mutex.Lock()
defer r.mutex.Unlock()
if engine, found = r.engines[id]; !found {
engine, err = ddc.CreateEngine(id,
ctx)
if err != nil {
return nil, err
}
r.engines[id] = engine
r.Log.V(1).Info("Put Engine to engine map")
} else {
r.Log.V(1).Info("Get Engine from engine map")
}

return engine, err
}

func (r *DataMigrateReconciler) ControllerName() string {
return controllerName
}
Loading

0 comments on commit f3fd35e

Please sign in to comment.