Skip to content

Commit

Permalink
Merge pull request carina-io#150 from fanhaouu/refactor-scheduler-inf…
Browse files Browse the repository at this point in the history
…ormer

refac: modify scheduler informer
  • Loading branch information
carina-ci-bot authored Nov 8, 2022
2 parents a1f80fe + 536843c commit fe6e318
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 28 deletions.
7 changes: 2 additions & 5 deletions pkg/devicemanager/partition/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,12 @@ func parseDiskString(diskString string) []*types.LocalDisk {

func filter(disklist []*types.LocalDisk) (diskList []*types.LocalDisk) {
for _, d := range disklist {
if d.ParentName != "" {
continue
}
if strings.Contains(d.Name, types.KEYWORD) {
continue
}

if d.Readonly || d.Size < 10<<30 || d.Filesystem != "" {
log.Debug("Mismatched disk:" + d.Name + ", filesystem:" + d.Filesystem + ", readonly:" + fmt.Sprintf("%t", d.Readonly) + ", size:" + fmt.Sprintf("%d", d.Size))
if d.Readonly || d.Size < 10<<30 || d.Filesystem != "" || d.MountPoint != "" {
log.Debug("Mismatched disk:" + d.Name + ", filesystem:" + d.Filesystem + ", mountpoint:" + d.MountPoint + ", readonly:" + fmt.Sprintf("%t", d.Readonly) + ", size:" + fmt.Sprintf("%d", d.Size))
continue
}
diskList = append(diskList, d)
Expand Down
2 changes: 2 additions & 0 deletions runners/devicecheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,9 +188,11 @@ func (dc *deviceCheck) addAndRemoveDevice() {
log.Infof("try to remove pv %s from vg %s", pv.PVName, v.VGName)
if err := dc.dm.VolumeManager.RemoveDiskInVg(pv.PVName, v.VGName); err != nil {
log.Errorf("remove pv %s error %v", pv.PVName, err)
continue
}
if err := dc.dm.VolumeManager.GetLv().PartProbe(); err != nil {
log.Errorf("failed partprobe error: %v", err)
continue
}
log.Infof("succeeded in removing pv %s from vg %s", pv.PVName, v.VGName)
}
Expand Down
2 changes: 1 addition & 1 deletion runners/nodestorageresource.go
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ func (r *nodeStorageResourceReconciler) generateDiskStatus(status *carinav1beta1
avail = fs[0].Size()
log.Info("Disk:", disk.Path, " size:", disk.Size, " avail:", avail, " free:", fs)
status.Capacity[fmt.Sprintf("%s%s/%s", carina.DeviceCapacityKeyPrefix, group, disk.Name)] = *resource.NewQuantity(int64(disk.Size>>30), resource.BinarySI)
status.Allocatable[fmt.Sprintf("%s%s/%s", carina.DeviceCapacityKeyPrefix, group, disk.Name)] = *resource.NewQuantity(int64(avail>>30+1), resource.BinarySI)
status.Allocatable[fmt.Sprintf("%s%s/%s", carina.DeviceCapacityKeyPrefix, group, disk.Name)] = *resource.NewQuantity(int64(avail>>30), resource.BinarySI)
}
}
}
Expand Down
80 changes: 61 additions & 19 deletions scheduler/schedulerplugin/localstorage/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package localstorage
import (
"context"
carina "github.com/carina-io/carina/scheduler"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
"path/filepath"

v1 "github.com/carina-io/carina-api/api/v1"
Expand All @@ -19,7 +22,6 @@ import (
)

func newDynamicClientFromConfig() dynamic.Interface {

var kubeconfig string
var config *rest.Config
var err error
Expand All @@ -44,51 +46,91 @@ func newDynamicClientFromConfig() dynamic.Interface {
return dynamicClient
}

func getNodeStorageResource(client dynamic.Interface, nodeName string) (*v1beta1.NodeStorageResource, error) {
func getNodeStorageResource(client dynamic.Interface, nsrLister cache.GenericLister, nodeName string) (*v1beta1.NodeStorageResource, error) {
var gvr = schema.GroupVersionResource{
Group: v1beta1.GroupVersion.Group,
Version: v1beta1.GroupVersion.Version,
Resource: "nodestorageresources",
}
unstructObj, err := client.Resource(gvr).Namespace("").Get(context.TODO(), nodeName, metav1.GetOptions{})
var workloadUnstructured *unstructured.Unstructured
workloadObj, err := nsrLister.Get(nodeName)
if err != nil {
return nil, err
// fall back to call api server in case the cache has not been synchronized yet
klog.Warningf("Failed to get nsr from cache, name: %s. Error: %v. Fall back to call api server", nodeName, err)
workloadUnstructured, err = client.Resource(gvr).Namespace("").Get(context.TODO(), nodeName, metav1.GetOptions{})
if err != nil {
klog.Errorf("Failed to get workload from api server, name: %s. Error: %v", nodeName, err)
return nil, err
}
} else {
workloadUnstructured, err = utils.ToUnstructured(workloadObj)
if err != nil {
klog.Errorf("Failed to convert unstructured from runtime object, name: %s. Error: %v", nodeName, err)
return nil, err
}
}
nsr := &v1beta1.NodeStorageResource{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructObj.UnstructuredContent(), nsr)
err = runtime.DefaultUnstructuredConverter.FromUnstructured(workloadUnstructured.UnstructuredContent(), nsr)
if err != nil {
return nil, err
}
return nsr, nil
}

func getLvExclusivityDisks(client dynamic.Interface, nodeName string) (lvs []string, err error) {
func getLvExclusivityDisks(client dynamic.Interface, lvLister cache.GenericLister, nodeName string) (lvDeviceGroups []string, err error) {
var gvr = schema.GroupVersionResource{
Group: v1.GroupVersion.Group,
Version: v1.GroupVersion.Version,
Resource: "logicvolumes",
}
unstrructObj, err := client.Resource(gvr).Namespace("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, err
}
lvlist := &v1.LogicVolumeList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstrructObj.UnstructuredContent(), lvlist)

workloadObjs, err := lvLister.List(labels.Everything())
var lvs []v1.LogicVolume
if err != nil {
return nil, err
// fall back to call api server in case the cache has not been synchronized yet
klog.Warningf("Failed to get lvs from cache, name: %s. Error: %v. Fall back to call api server", nodeName, err)
workloadUnstructureds, err := client.Resource(gvr).Namespace("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
klog.Errorf("Failed to get workload from api server, name: %s. Error: %v", nodeName, err)
return nil, err
}

lvList := &v1.LogicVolumeList{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(workloadUnstructureds.UnstructuredContent(), lvList)
if err != nil {
return nil, err
}
lvs = lvList.Items
} else {
if workloadObjs == nil || len(workloadObjs) == 0 {
return nil, nil
}
for _, workloadObj := range workloadObjs {
unstructured, err := utils.ToUnstructured(workloadObj)
if err != nil {
klog.Errorf("Failed to convert unstructured from runtime object, name: %s. Error: %v", nodeName, err)
return nil, err
}
lv := &v1.LogicVolume{}
err = runtime.DefaultUnstructuredConverter.FromUnstructured(unstructured.UnstructuredContent(), lv)
if err != nil {
return nil, err
}
lvs = append(lvs, *lv)
}
}
klog.V(3).Infof("Get lvlist: %v", lvlist)
if len(lvlist.Items) == 0 {
return lvs, nil
klog.V(3).Infof("Get logic volumes: %v", lvs)
if lvs == nil || len(lvs) == 0 {
return nil, nil
}
for _, lv := range lvlist.Items {
for _, lv := range lvs {
if lv.Annotations == nil {
continue
}
klog.V(3).Infof("Get lv: %v, exclusivity: %s", lv.Spec.NodeName, lv.Annotations[carina.ExclusivityDisk])
if lv.Spec.NodeName == nodeName && lv.Annotations[carina.ExclusivityDisk] == "true" {
lvs = append(lvs, lv.Spec.DeviceGroup)
lvDeviceGroups = append(lvDeviceGroups, lv.Spec.DeviceGroup)
}
}
return lvs, nil
return lvDeviceGroups, nil
}
18 changes: 16 additions & 2 deletions scheduler/schedulerplugin/localstorage/storage-plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ package localstorage
import (
"context"
"errors"
carinav1 "github.com/carina-io/carina-api/api/v1"
carinav1beta1 "github.com/carina-io/carina-api/api/v1beta1"
carina "github.com/carina-io/carina/scheduler"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"sort"
"strconv"
"strings"
Expand All @@ -45,6 +49,8 @@ type LocalStorage struct {
scLister lstoragev1.StorageClassLister
pvcLister lcorev1.PersistentVolumeClaimLister
pvLister lcorev1.PersistentVolumeLister
lvLister cache.GenericLister
nsrLister cache.GenericLister
dynamicClient dynamic.Interface
}

Expand All @@ -62,11 +68,19 @@ func New(_ runtime.Object, handle framework.Handle) (framework.Plugin, error) {
pvcLister := handle.SharedInformerFactory().Core().V1().PersistentVolumeClaims().Lister()
pvLister := handle.SharedInformerFactory().Core().V1().PersistentVolumes().Lister()
dynamicClient := newDynamicClientFromConfig()
dynamicSharedInformerFactory := dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, 0, v1.NamespaceAll, nil)
lvLister := dynamicSharedInformerFactory.ForResource(carinav1.GroupVersion.WithResource("logicvolumes")).Lister()
nsrLister := dynamicSharedInformerFactory.ForResource(carinav1beta1.GroupVersion.WithResource("nodestorageresources")).Lister()
ctx := context.TODO()
dynamicSharedInformerFactory.Start(ctx.Done())
dynamicSharedInformerFactory.WaitForCacheSync(ctx.Done())
return &LocalStorage{
handle: handle,
pvcLister: pvcLister,
scLister: scLister,
pvLister: pvLister,
lvLister: lvLister,
nsrLister: nsrLister,
dynamicClient: dynamicClient,
}, nil
}
Expand Down Expand Up @@ -287,14 +301,14 @@ func (ls *LocalStorage) getAllocatableMap(useRaw bool, podName, nodeName string)
var err error
allocatableMap := map[string]int64{}
if useRaw {
lvExclusivityDisks, err = getLvExclusivityDisks(ls.dynamicClient, nodeName)
lvExclusivityDisks, err = getLvExclusivityDisks(ls.dynamicClient, ls.lvLister, nodeName)
if err != nil {
klog.V(3).Infof("Failed to obtain node lvs, pod: %s node: %s, err: %s", podName, nodeName, err.Error())
return allocatableMap, errors.New("failed to obtain node lvs, " + err.Error())
}
}

nsr, err := getNodeStorageResource(ls.dynamicClient, nodeName)
nsr, err := getNodeStorageResource(ls.dynamicClient, ls.nsrLister, nodeName)
if err != nil {
klog.V(3).Infof("Failed to obtain node storages, pod: %s node: %s, err: %s", podName, nodeName, err.Error())
return allocatableMap, errors.New("Failed to obtain node storages, " + err.Error())
Expand Down
15 changes: 14 additions & 1 deletion scheduler/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

package utils

import "os"
import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"os"
)

func ContainsString(slice []string, s string) bool {
for _, item := range slice {
Expand All @@ -37,3 +41,12 @@ func Exists(path string) bool {
}
return true
}

// ToUnstructured converts a typed object to an unstructured object.
func ToUnstructured(obj interface{}) (*unstructured.Unstructured, error) {
uncastObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, err
}
return &unstructured.Unstructured{Object: uncastObj}, nil
}

0 comments on commit fe6e318

Please sign in to comment.