Skip to content

Commit

Permalink
when pod mounting no dataset, change the affinity info from pod AntiP…
Browse files Browse the repository at this point in the history
…refer to node Prefer. (fluid-cloudnative#827)

* edit the webhook plugins

Signed-off-by: yangyuliufeng <[email protected]>

* edit doc

Signed-off-by: yangyuliufeng <[email protected]>

* fix ci

Signed-off-by: yangyuliufeng <[email protected]>

* remove unused object

Signed-off-by: yangyuliufeng <[email protected]>

* merge conflict

Signed-off-by: yangyuliufeng <[email protected]>
  • Loading branch information
yangyuliufeng authored Jun 2, 2021
1 parent 3f07c51 commit 53c0c27
Show file tree
Hide file tree
Showing 14 changed files with 308 additions and 240 deletions.
36 changes: 12 additions & 24 deletions docs/zh/operation/pod_schedule.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,19 +114,12 @@ $ kubectl create -f nginx.yaml
```yaml
spec:
affinity:
nodeAffinity: {}
podAffinity: {}
podAntiAffinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
matchExpressions:
- key: role
operator: In
values:
- alluxio-worker
- jindofs-worker
topologyKey: kubernetes.io/hostname
- preference:
matchExpressions:
- key: fluid.io/dataset-num
operator: DoesNotExist
weight: 50
```
正如亲和性所影响的,Pod调度到了没有缓存的cn-beijing.192.168.1.147节点。
Expand Down Expand Up @@ -166,20 +159,15 @@ $ kubectl create -f nginx.yaml
```yaml
spec:
affinity:
nodeAffinity: {}
podAffinity: {}
podAntiAffinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- podAffinityTerm:
labelSelector:
- preference:
matchExpressions:
- key: role
operator: In
values:
- alluxio-worker
- jindofs-worker
topologyKey: kubernetes.io/hostname
weight: 50
- key: fluid.io/s-default-hbase
operator: In
values:
- "true"
weight: 50
```
正如亲和性所影响的,Pod调度到了有缓存的cn-beijing.192.168.1.146节点。
```shell
Expand Down
4 changes: 4 additions & 0 deletions pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ const (
LabelAnnotationDatasetNum = LabelAnnotationPrefix + "dataset-num"
)

func GetDatasetNumLabelName() string {
return LabelAnnotationDatasetNum
}

//Reason for Fluid events
const (
ErrorProcessDatasetReason = "ErrorProcessDataset"
Expand Down
12 changes: 6 additions & 6 deletions pkg/ddc/alluxio/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ func (e *AlluxioEngine) destroyWorkers(expectedWorkers int32) (currentWorkers in
var (
nodeList = &corev1.NodeList{}
labelExclusiveName = utils.GetExclusiveKey()
labelName = runtimeInfo.GetRuntimeLabelname()
labelCommonName = runtimeInfo.GetCommonLabelname()
labelMemoryName = runtimeInfo.GetLabelnameForMemory()
labelDiskName = runtimeInfo.GetLabelnameForDisk()
labelTotalname = runtimeInfo.GetLabelnameForTotal()
labelDatasetNum = runtimeInfo.GetDatasetNumLabelname()
labelName = runtimeInfo.GetRuntimeLabelName()
labelCommonName = runtimeInfo.GetCommonLabelName()
labelMemoryName = runtimeInfo.GetLabelNameForMemory()
labelDiskName = runtimeInfo.GetLabelNameForDisk()
labelTotalname = runtimeInfo.GetLabelNameForTotal()
labelDatasetNum = runtimeInfo.GetDatasetNumLabelName()
)

labelNames := []string{labelName, labelTotalname, labelDiskName, labelMemoryName, labelCommonName}
Expand Down
23 changes: 12 additions & 11 deletions pkg/ddc/base/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/fluid-cloudnative/fluid/pkg/common/deprecated"
)

func (info *RuntimeInfo) getStoragetLabelname(read common.ReadType, storage common.StorageType) string {
func (info *RuntimeInfo) getStoragetLabelName(read common.ReadType, storage common.StorageType) string {
prefix := common.LabelAnnotationStorageCapacityPrefix
if info.IsDeprecatedNodeLabel() {
prefix = deprecated.LabelAnnotationStorageCapacityPrefix
Expand All @@ -35,37 +35,37 @@ func (info *RuntimeInfo) getStoragetLabelname(read common.ReadType, storage comm
info.name
}

func (info *RuntimeInfo) GetLabelnameForMemory() string {
func (info *RuntimeInfo) GetLabelNameForMemory() string {
read := common.HumanReadType
storage := common.MemoryStorageType
if info.IsDeprecatedNodeLabel() {
read = deprecated.HumanReadType
storage = deprecated.MemoryStorageType
}
return info.getStoragetLabelname(read, storage)
return info.getStoragetLabelName(read, storage)
}

func (info *RuntimeInfo) GetLabelnameForDisk() string {
func (info *RuntimeInfo) GetLabelNameForDisk() string {
read := common.HumanReadType
storage := common.DiskStorageType
if info.IsDeprecatedNodeLabel() {
read = deprecated.HumanReadType
storage = deprecated.DiskStorageType
}
return info.getStoragetLabelname(read, storage)
return info.getStoragetLabelName(read, storage)
}

func (info *RuntimeInfo) GetLabelnameForTotal() string {
func (info *RuntimeInfo) GetLabelNameForTotal() string {
read := common.HumanReadType
storage := common.TotalStorageType
if info.IsDeprecatedNodeLabel() {
read = deprecated.HumanReadType
storage = deprecated.TotalStorageType
}
return info.getStoragetLabelname(read, storage)
return info.getStoragetLabelName(read, storage)
}

func (info *RuntimeInfo) GetCommonLabelname() string {
func (info *RuntimeInfo) GetCommonLabelName() string {
prefix := common.LabelAnnotationStorageCapacityPrefix
if info.IsDeprecatedNodeLabel() {
prefix = deprecated.LabelAnnotationStorageCapacityPrefix
Expand All @@ -74,7 +74,7 @@ func (info *RuntimeInfo) GetCommonLabelname() string {
return prefix + info.namespace + "-" + info.name
}

func (info *RuntimeInfo) GetRuntimeLabelname() string {
func (info *RuntimeInfo) GetRuntimeLabelName() string {
prefix := common.LabelAnnotationStorageCapacityPrefix
if info.IsDeprecatedNodeLabel() {
prefix = deprecated.LabelAnnotationStorageCapacityPrefix
Expand All @@ -83,6 +83,7 @@ func (info *RuntimeInfo) GetRuntimeLabelname() string {
return prefix + info.runtimeType + "-" + info.namespace + "-" + info.name
}

func (info *RuntimeInfo) GetDatasetNumLabelname() string {
return common.LabelAnnotationDatasetNum
// GetDatasetNumLabelname get the label to record how much datasets on a node
func (info *RuntimeInfo) GetDatasetNumLabelName() string {
return common.GetDatasetNumLabelName()
}
12 changes: 6 additions & 6 deletions pkg/ddc/base/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@ type RuntimeInfoInterface interface {

// GetStoragetLabelname(read common.ReadType, storage common.StorageType) string

GetLabelnameForMemory() string
GetLabelNameForMemory() string

GetLabelnameForDisk() string
GetLabelNameForDisk() string

GetLabelnameForTotal() string
GetLabelNameForTotal() string

GetCommonLabelname() string
GetCommonLabelName() string

GetRuntimeLabelname() string
GetRuntimeLabelName() string

GetDatasetNumLabelname() string
GetDatasetNumLabelName() string

GetPersistentVolumeName() string

Expand Down
12 changes: 6 additions & 6 deletions pkg/ddc/jindo/shutdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,12 @@ func (e *JindoEngine) destroyWorkers(expectedWorkers int32) (currentWorkers int3
var (
nodeList = &corev1.NodeList{}
labelExclusiveName = utils.GetExclusiveKey()
labelName = runtimeInfo.GetRuntimeLabelname()
labelCommonName = runtimeInfo.GetCommonLabelname()
labelMemoryName = runtimeInfo.GetLabelnameForMemory()
labelDiskName = runtimeInfo.GetLabelnameForDisk()
labelTotalname = runtimeInfo.GetLabelnameForTotal()
labelDatasetNum = runtimeInfo.GetDatasetNumLabelname()
labelName = runtimeInfo.GetRuntimeLabelName()
labelCommonName = runtimeInfo.GetCommonLabelName()
labelMemoryName = runtimeInfo.GetLabelNameForMemory()
labelDiskName = runtimeInfo.GetLabelNameForDisk()
labelTotalname = runtimeInfo.GetLabelNameForTotal()
labelDatasetNum = runtimeInfo.GetDatasetNumLabelName()
)

labelNames := []string{labelName, labelTotalname, labelDiskName, labelMemoryName, labelCommonName}
Expand Down
16 changes: 8 additions & 8 deletions pkg/utils/dataset/lifecycle/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func init() {
func AlreadyAssigned(runtimeInfo base.RuntimeInfoInterface, node v1.Node) (assigned bool) {
// label := e.getCommonLabelname()

label := runtimeInfo.GetCommonLabelname()
label := runtimeInfo.GetCommonLabelName()
log := rootLog.WithValues("runtime", runtimeInfo.GetName(), "namespace", runtimeInfo.GetNamespace())

if len(node.Labels) > 0 {
Expand Down Expand Up @@ -127,18 +127,18 @@ func LabelCacheNode(nodeToLabel v1.Node, runtimeInfo base.RuntimeInfoInterface,
var (
// runtimeLabel indicates the specific runtime pod is on the node
// e.g. fluid.io/s-alluxio-default-hbase=true
runtimeLabel = runtimeInfo.GetRuntimeLabelname()
runtimeLabel = runtimeInfo.GetRuntimeLabelName()

// commonLabel indicates that any of fluid supported runtime is on the node
// e.g. fluid.io/s-default-hbase=true
commonLabel = runtimeInfo.GetCommonLabelname()
commonLabel = runtimeInfo.GetCommonLabelName()

// exclusiveLabel is the label key indicates the node is exclusively assigned
// e.g. fluid_exclusive=default_hbase
exclusiveLabel string

// datasetLabel indicates the number of the dataset in specific node
datasetLabel = runtimeInfo.GetDatasetNumLabelname()
datasetLabel = runtimeInfo.GetDatasetNumLabelName()
)

log := rootLog.WithValues("runtime", runtimeInfo.GetName(), "namespace", runtimeInfo.GetNamespace())
Expand Down Expand Up @@ -229,15 +229,15 @@ func labelNodeWithCapacityInfo(toUpdate *v1.Node, runtimeInfo base.RuntimeInfoIn
var (
// memCapacityLabel indicates in-memory cache capacity assigned on the node
// e.g. fluid.io/s-h-alluxio-m-default-hbase=1GiB
memCapacityLabel = runtimeInfo.GetLabelnameForMemory()
memCapacityLabel = runtimeInfo.GetLabelNameForMemory()

// diskCapacityLabel indicates on-disk cache capacity assigned on the node
// e.g. fluid.io/s-h-alluxio-d-default-hbase=2GiB
diskCapacityLabel = runtimeInfo.GetLabelnameForDisk()
diskCapacityLabel = runtimeInfo.GetLabelNameForDisk()

// totalCapacityLabel indicates total cache capacity assigned on the node
// e.g. fluid.io/s-h-alluxio-t-default-hbase=3GiB
totalCapacityLabel = runtimeInfo.GetLabelnameForTotal()
totalCapacityLabel = runtimeInfo.GetLabelNameForTotal()
)

storageMap := tieredstore.GetLevelStorageMap(runtimeInfo)
Expand All @@ -261,7 +261,7 @@ func labelNodeWithCapacityInfo(toUpdate *v1.Node, runtimeInfo base.RuntimeInfoIn

// DecreaseDatasetNum deletes the datasetNum label or updates the number of the dataset in the specific node.
func DecreaseDatasetNum(toUpdate *v1.Node, runtimeInfo base.RuntimeInfoInterface) (isDeleted bool, err error) {
var labelDatasetNum = runtimeInfo.GetDatasetNumLabelname()
var labelDatasetNum = runtimeInfo.GetDatasetNumLabelName()
if val, exist := toUpdate.Labels[labelDatasetNum]; exist {
currentDataset, err := strconv.Atoi(val)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/utils/dataset/lifecycle/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func AssignDatasetToNodes(runtimeInfo base.RuntimeInfoInterface,
}

// 2. filters scheduled nodes and build a map for future use
datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", runtimeInfo.GetCommonLabelname()))
datasetLabels, err := labels.Parse(fmt.Sprintf("%s=true", runtimeInfo.GetCommonLabelName()))
if err != nil {
return currentScheduleNum, err
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/utils/dataset/volume/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func CreatePersistentVolumeForRuntime(client client.Client,
Name: pvName,
Namespace: runtime.GetNamespace(),
Labels: map[string]string{
runtime.GetCommonLabelname(): "true",
runtime.GetCommonLabelName(): "true",
},
Annotations: common.ExpectedFluidAnnotations,
},
Expand All @@ -80,7 +80,7 @@ func CreatePersistentVolumeForRuntime(client client.Client,
// {
// MatchExpressions: []v1.NodeSelectorRequirement{
// {
// Key: runtime.GetCommonLabelname(),
// Key: runtime.GetCommonLabelName(),
// Operator: v1.NodeSelectorOpIn,
// Values: []string{"true"},
// },
Expand Down Expand Up @@ -122,7 +122,7 @@ func CreatePersistentVolumeForRuntime(client client.Client,
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: runtime.GetCommonLabelname(),
Key: runtime.GetCommonLabelName(),
Operator: v1.NodeSelectorOpIn,
Values: []string{"true"},
},
Expand Down Expand Up @@ -164,14 +164,14 @@ func CreatePersistentVolumeClaimForRuntime(client client.Client,
Name: runtime.GetName(),
Namespace: runtime.GetNamespace(),
Labels: map[string]string{
runtime.GetCommonLabelname(): "true",
runtime.GetCommonLabelName(): "true",
},
Annotations: common.ExpectedFluidAnnotations,
},
Spec: v1.PersistentVolumeClaimSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
runtime.GetCommonLabelname(): "true",
runtime.GetCommonLabelName(): "true",
},
},
StorageClassName: &common.FLUID_STORAGECLASS,
Expand Down
42 changes: 42 additions & 0 deletions pkg/utils/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import corev1 "k8s.io/api/core/v1"

//InjectPreferredSchedulingTerms inject the preferredSchedulingTerms into a pod
func InjectPreferredSchedulingTerms(preferredSchedulingTerms []corev1.PreferredSchedulingTerm, pod *corev1.Pod) {
if len(preferredSchedulingTerms) == 0 {
return
}
if pod.Spec.Affinity != nil {
if pod.Spec.Affinity.NodeAffinity != nil {
pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
append(pod.Spec.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
preferredSchedulingTerms...)
} else {
pod.Spec.Affinity.NodeAffinity = &corev1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: preferredSchedulingTerms,
}
}
} else {
pod.Spec.Affinity = &corev1.Affinity{
NodeAffinity: &corev1.NodeAffinity{
PreferredDuringSchedulingIgnoredDuringExecution: preferredSchedulingTerms,
},
}
}
}
Loading

0 comments on commit 53c0c27

Please sign in to comment.