Skip to content

Commit

Permalink
Merge pull request fluid-cloudnative#41 from TrafalgarZZZ/dataload-co…
Browse files Browse the repository at this point in the history
…ntroller

Implement DataLoad controller
  • Loading branch information
cheyang authored Aug 14, 2020
2 parents 9f7ad2e + cb94f77 commit 6808079
Show file tree
Hide file tree
Showing 14 changed files with 880 additions and 76 deletions.
36 changes: 20 additions & 16 deletions charts/fluid-dataloader/templates/dataloader.yaml
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
# .Release.Name will be used to decide which dataset will be preload
# .Release.Name should be like `<pvc-name>-load`(e.g. hbase-load for a PersistentVolumeClaim named `hbase`)
# TODO: the length of .Release.Name won't exceed 53(limited by Helm), which means length of `<pvc-name>` can't exceed 48. This might be a problem.
{{ $datasetName := "" -}}
{{- if hasSuffix "-load" .Release.Name -}}
{{- $datasetName = .Release.Name | trimSuffix "-load" -}}
{{- else -}}
{{- $datasetName = .Release.Name -}}
{{- end }}

{{ $datasetName := "" -}}
{{- $randomSuffix := "" -}}
{{- if regexMatch "^[A-Za-z0-9._-]+-load-[A-Za-z0-9]{5}$" .Release.Name -}}
{{- $arr := regexSplit "-load-" .Release.Name -1 -}}
{{- $datasetName = first $arr -}}
{{- $randomSuffix = last $arr -}}
{{- else -}}
{{- printf "Illegal release name. Should be like <dataset-name>-load-<suffix-length-5>. Current name: %s" .Release.Name | fail -}}
{{- end }}
apiVersion: batch/v1
kind: Job
metadata:
name: {{ $datasetName | printf "%s-loader" }}
name: {{ printf "%s-loader-job-%s" $datasetName $randomSuffix }}
labels:
role: dataload-job
release: {{ .Release.Name }}
role: dataloader-job
dataset: {{ $datasetName }}
spec:
backoffLimit: {{ .Values.dataloader.backoffLimit | default "6" }}
Expand All @@ -22,8 +25,9 @@ spec:
parallelism: {{ required "Num of Alluxio Workers should be set" .Values.dataloader.numWorker }}
template:
metadata:
name: {{ $datasetName | printf "%s-loader" }}
name: {{ printf "%s-loader-%s" $datasetName $randomSuffix }}
labels:
release: {{ .Release.Name }}
role: alluxio-dataloader
dataset: {{ $datasetName }}
spec:
Expand Down Expand Up @@ -64,12 +68,12 @@ spec:
- name: THREADS
value: {{ .Values.dataloader.threads | default "2" | quote }}
- name: DATA_PATH
value:
{{- if .Values.dataloader.mountPath -}}
{{- .Values.dataloader.mountPath | trimAll "/" | printf "/data/%s/*" | quote | indent 1 -}}
{{- else -}}
{{- quote "/data/*" | indent 1 -}}
{{- end }}
value:
{{- if .Values.dataloader.mountPath -}}
{{- .Values.dataloader.mountPath | trimAll "/" | printf "/data/%s/*" | quote | indent 1 -}}
{{- else -}}
{{- quote "/data/*" | indent 1 -}}
{{- end }}
volumeMounts:
- mountPath: /data
name: vol
Expand Down
20 changes: 14 additions & 6 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

datav1alpha1 "github.com/cloudnativefluid/fluid/api/v1alpha1"
alluxioctl "github.com/cloudnativefluid/fluid/pkg/controllers/v1alpha1/alluxio"
dataloadctl "github.com/cloudnativefluid/fluid/pkg/controllers/v1alpha1/dataload"
datasetctl "github.com/cloudnativefluid/fluid/pkg/controllers/v1alpha1/dataset"
"github.com/cloudnativefluid/fluid/pkg/ddc/alluxio"
"github.com/cloudnativefluid/fluid/pkg/ddc/base"
Expand Down Expand Up @@ -104,15 +105,22 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "AlluxioRuntime")
os.Exit(1)
}

if err = (&alluxioctl.DataLoadReconciler{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("alluxioctl").WithName("AlluxioDataLoad"),
Scheme: mgr.GetScheme(),
}).SetupWithManager(mgr); err != nil {
if err = (dataloadctl.NewDataLoadReconciler(mgr.GetClient(),
ctrl.Log.WithName("alluxioctl").WithName("AlluxioDataLoad"),
mgr.GetScheme(),
mgr.GetEventRecorderFor("AlluxioDataLoad"),
)).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AlluxioDataLoad")
os.Exit(1)
}
//if err = (&dataload.DataLoadReconciler{
// Client: mgr.GetClient(),
// Log: ctrl.Log.WithName("alluxioctl").WithName("AlluxioDataLoad"),
// Scheme: mgr.GetScheme(),
//}).SetupWithManager(mgr); err != nil {
// setupLog.Error(err, "unable to create controller", "controller", "AlluxioDataLoad")
// os.Exit(1)
//}
// +kubebuilder:scaffold:builder

setupLog.Info("starting manager")
Expand Down
16 changes: 16 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ const (
ErrorProcessDatasetReason = "ErrorProcessDataset"

ErrorProcessRuntimeReason = "ErrorProcessRuntime"

ErrorHelmInstall = "ErrorHelmInstall"

DatasetNotReady = "DatasetNotReady"

RuntimeNotReady = "RuntimeNotReady"

DataLoadCollision = "DataLoadCollision"

PrefetchJobStarted = "Prefetch Started"

PrefetchJobInterrupted = "PrefetchJobInterrupted"

PrefetchJobComplete = "Prefetch Complete"

PrefetchJobFailed = "Prefetch Failed"
)

// Runtime for Alluxio
Expand Down
8 changes: 8 additions & 0 deletions pkg/common/dataload.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,11 @@ const (
// DataloadFailed means the Dataload has failed its execution.
DataloadFailed DataloadConditionType = "Failed"
)

const (
DATALOAD_FINALIZER = "fluid-dataload-controller-finalizer"
DATALOAD_CHART = "fluid-dataloader"
DATALOAD_DEFAULT_IMAGE = "registry.cn-hangzhou.aliyuncs.com/fluid-namespace/coco-perf"
DATALOAD_SUFFIX_LENGTH = 5
ENV_DATALOADER_IMG = "DATALOADER_IMG"
)
54 changes: 0 additions & 54 deletions pkg/controllers/v1alpha1/alluxio/dataload_controller.go

This file was deleted.

117 changes: 117 additions & 0 deletions pkg/controllers/v1alpha1/dataload/dataload_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package dataload

import (
"context"
datav1alpha1 "github.com/cloudnativefluid/fluid/api/v1alpha1"
"github.com/cloudnativefluid/fluid/pkg/common"
cdataload "github.com/cloudnativefluid/fluid/pkg/dataload"
"github.com/cloudnativefluid/fluid/pkg/utils"
"github.com/go-logr/logr"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// DataLoadReconciler reconciles a AlluxioDataLoad object
type DataLoadReconciler struct {
Scheme *runtime.Scheme
*ReconcilerImplement
}

// Return a new DataLoad Reconciler
func NewDataLoadReconciler(client client.Client,
log logr.Logger,
scheme *runtime.Scheme,
recorder record.EventRecorder) *DataLoadReconciler {
r := &DataLoadReconciler{
Scheme: scheme,
}
r.ReconcilerImplement = NewReconcilerImplement(client, log, recorder)
r.Setup()
return r
}

// Reconcile reconciles the AlluxioDataLoad Object
// +kubebuilder:rbac:groups=data.fluid.io,resources=alluxiodataloads,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=data.fluid.io,resources=alluxiodataloads/status,verbs=get;update;patch

func (r *DataLoadReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := cdataload.ReconcileRequestContext{
Context: context.Background(),
Log: r.Log.WithValues("alluxiodataload", req.NamespacedName),
Recorder: r.Recorder,
NamespacedName: req.NamespacedName,
}

ctx.Log.V(1).Info("Reconciling dataload request", "request", req)

/*
1. Load the DataLoad resource object
*/
dataload, err := utils.GetDataLoad(r, req.Name, req.Namespace)
if err != nil {
if utils.IgnoreNotFound(err) == nil {
ctx.Log.Info("Dataload not found", "dataload", ctx.NamespacedName)
return ctrl.Result{}, nil
} else {
ctx.Log.Error(err, "Failed to get dataload info")
return utils.RequeueIfError(errors.Wrap(err, "Failed to get dataload info"))
}
}
ctx.DataLoad = *dataload
ctx.Log.Info("Dataload found.", "dataload", ctx.DataLoad)

/*
2. delete dataload if necessary
*/
if utils.HasDeletionTimestamp(ctx.DataLoad.ObjectMeta) {
return r.ReconcileDataloadDeletion(ctx)
}

/*
3. Add finalizer
*/
if !utils.ContainsString(ctx.DataLoad.ObjectMeta.GetFinalizers(), common.DATALOAD_FINALIZER) {
return r.addFinalizerAndRequeue(ctx)
}

/*
4. Do dataload reconciling
*/
return r.ReconcileDataload(ctx)
}

func (r *DataLoadReconciler) addFinalizerAndRequeue(ctx cdataload.ReconcileRequestContext) (ctrl.Result, error) {
ctx.DataLoad.ObjectMeta.Finalizers = append(ctx.DataLoad.ObjectMeta.Finalizers, common.DATALOAD_FINALIZER)
ctx.Log.Info("Add finalizer and Requeue", "finalizer", common.DATALOAD_FINALIZER)
prevGeneration := ctx.DataLoad.ObjectMeta.GetGeneration()
if err := r.Update(ctx, &ctx.DataLoad); err != nil {
ctx.Log.Error(err, "Failed to add finalizer to dataload", "StatusUpdateError", err)
return utils.RequeueIfError(err)
}
return utils.RequeueImmediatelyUnlessGenerationChanged(prevGeneration, ctx.DataLoad.ObjectMeta.GetGeneration())
}

//SetupWithManager setups the manager with AlluxioDataLoad
func (r *DataLoadReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&datav1alpha1.AlluxioDataLoad{}).
Complete(r)
}
Loading

0 comments on commit 6808079

Please sign in to comment.