diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 0b7ca80a..6061addf 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -52,7 +52,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" @@ -582,13 +581,10 @@ func (qjm *XController) getAppWrapperCompletionStatus(caw *arbv1.AppWrapper) arb for i, genericItem := range caw.Spec.AggrResources.GenericItems { if len(genericItem.CompletionStatus) > 0 { objectName := genericItem.GenericTemplate - var unstruct unstructured.Unstructured - unstruct.Object = make(map[string]interface{}) - var blob interface{} - if err := jsons.Unmarshal(objectName.Raw, &blob); err != nil { - klog.Errorf("[getAppWrapperCompletionStatus] Error unmarshalling, err=%#v", err) + unstruct, err := genericresource.UnmarshalToUnstructured(objectName.Raw) + if err != nil { + klog.Errorf("[getAppWrapperCompletionStatus] Error unmarshalling appwrapper: %v", caw.Name) } - unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling name := "" if md, ok := unstruct.Object["metadata"]; ok { metadata := md.(map[string]interface{}) diff --git a/pkg/controller/queuejobresources/genericresource/genericresource.go b/pkg/controller/queuejobresources/genericresource/genericresource.go index b9686153..362228f4 100644 --- a/pkg/controller/queuejobresources/genericresource/genericresource.go +++ b/pkg/controller/queuejobresources/genericresource/genericresource.go @@ -18,9 +18,10 @@ package genericresource import ( "context" - "encoding/json" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/api/errors" "fmt" - "math" "runtime/debug" "strings" "time" @@ -28,19 +29,13 @@ import ( arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" v1 "k8s.io/api/core/v1" "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" - "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/restmapper" ) var appwrapperJobName = "appwrapper.mcad.ibm.com" @@ -61,16 +56,6 @@ func NewAppWrapperGenericResource(config *rest.Config) *GenericResources { } } -func join(strs ...string) string { - var result string - if strs[0] == "" { - return strs[len(strs)-1] - } - for _, str := range strs { - result += str - } - return result -} func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperGenericResource) (genericResourceName string, groupversionkind *schema.GroupVersionKind, erro error) { var err error @@ -84,38 +69,17 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG } // Default generic resource name name := "" - namespaced := true - // todo:DELETEME dd := common.KubeClient.Discovery() + dd := gr.clients.Discovery() - apigroups, err := restmapper.GetAPIGroupResources(dd) - if err != nil { - klog.Errorf("[Cleanup] Error getting API resources, err=%#v", err) - return name, default_gvk, err - } ext := awr.GenericTemplate - restmapper := restmapper.NewDiscoveryRESTMapper(apigroups) - _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, default_gvk, nil) + gvk, mapping, err := getResourceMapping(dd, ext.Raw, default_gvk) if err != nil { - klog.Errorf("Decoding error, please check your CR! Aborting handling the resource creation, err: `%v`", err) - return name, gvk, err + return name, gvk, err } - mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version) + dclient, err := createDynamicClient(gr, mapping) if err != nil { - klog.Errorf("mapping error from raw object: `%v`", err) - return name, gvk, err - } - - // todo:DELETEME restconfig := common.KubeConfig - restconfig := gr.kubeClientConfig - restconfig.GroupVersion = &schema.GroupVersion{ - Group: mapping.GroupVersionKind.Group, - Version: mapping.GroupVersionKind.Version, - } - dclient, err := dynamic.NewForConfig(restconfig) - if err != nil { - klog.Errorf("[Cleanup] Error creating new dynamic client, err=%#v.", err) return name, gvk, err } @@ -140,31 +104,19 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG } } - // Unmarshal generic item raw object - var unstruct unstructured.Unstructured - unstruct.Object = make(map[string]interface{}) - var blob interface{} - if err = json.Unmarshal(ext.Raw, &blob); err != nil { - klog.Errorf("[Cleanup] Error unmarshalling, err=%#v", err) - return name, gvk, err + unstruct, err := UnmarshalToUnstructured(ext.Raw) + if err != nil { + return name, gvk, err } - unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling namespace := aw.Namespace // only delete resources from AppWrapper namespace - if md, ok := unstruct.Object["metadata"]; ok { - metadata := md.(map[string]interface{}) - if objectName, ok := metadata["name"]; ok { - name = objectName.(string) - } - if objectns, ok := metadata["namespace"]; ok { - if objectns.(string) != namespace { - err := fmt.Errorf("[Cleanup] resource namespace \"%s\" is different from AppWrapper namespace \"%s\"", objectns.(string), namespace) - return name, gvk, err - } - } + name, err = retrieveName(aw.Namespace, unstruct, "Cleanup") + if err != nil { + return name, gvk, err } + // Get the resource to see if it exists in the AppWrapper namespace labelSelector := fmt.Sprintf("%s=%s, %s=%s", appwrapperJobName, aw.Name, resourceName, unstruct.GetName()) inEtcd, err := dclient.Resource(rsrc).Namespace(aw.Namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector}) @@ -174,11 +126,7 @@ func (gr *GenericResources) Cleanup(aw *arbv1.AppWrapper, awr *arbv1.AppWrapperG // Check to see if object already exists in etcd, if not, create the object. if inEtcd != nil || len(inEtcd.Items) > 0 { - newName := name - if len(newName) > 63 { - newName = newName[:63] - } - + newName := truncateName(name) err = deleteObject(namespaced, namespace, newName, rsrc, dclient) if err != nil { if !errors.IsNotFound(err) { @@ -206,91 +154,36 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra }() namespaced := true - // todo:DELETEME dd := common.KubeClient.Discovery() + name := "" + dd := gr.clients.Discovery() - apigroups, err := restmapper.GetAPIGroupResources(dd) - if err != nil { - klog.Errorf("Error getting API resources, err=%#v", err) - return []*v1.Pod{}, err - } ext := awr.GenericTemplate - restmapper := restmapper.NewDiscoveryRESTMapper(apigroups) - // versions := &unstructured.Unstructured{} - // _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, nil, versions) - _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(ext.Raw, nil, nil) + _, mapping, err := getResourceMapping(dd, ext.Raw, nil) if err != nil { - klog.Errorf("Decoding error, please check your CR! Aborting handling the resource creation, err: `%v`", err) - return []*v1.Pod{}, err - } - mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - klog.Errorf("mapping error from raw object: `%v`", err) - return []*v1.Pod{}, err + return []*v1.Pod{}, err } - // todo:DELETEME restconfig := common.KubeConfig - restconfig := gr.kubeClientConfig - restconfig.GroupVersion = &schema.GroupVersion{ - Group: mapping.GroupVersionKind.Group, - Version: mapping.GroupVersionKind.Version, - } - dclient, err := dynamic.NewForConfig(restconfig) + dclient, err := createDynamicClient(gr, mapping) if err != nil { - klog.Errorf("Error creating new dynamic client, err=%#v", err) return []*v1.Pod{}, err } - //TODO: Simplified apiresourcelist discovery, the assumption is we will always deploy namespaced objects - //We dont intend to install CRDs like KubeRay, Spark-Operator etc through MCAD, I think such objects are typically - //cluster scoped. May be for Multi-Cluster or inference use case we need such deep discovery, so for now commenting code. - - // _, apiresourcelist, err := dd.ServerGroupsAndResources() - // if err != nil { - // if derr, ok := err.(*discovery.ErrGroupDiscoveryFailed); ok { - // klog.Warning("Discovery failed for some groups, %d failing: %v", len(derr.Groups), err) - // } else { - // klog.Errorf("Error getting supported groups and resources, err=%#v", err) - // return []*v1.Pod{}, err - // } - // } - rsrc := mapping.Resource - // for _, apiresourcegroup := range apiresourcelist { - // if apiresourcegroup.GroupVersion == join(mapping.GroupVersionKind.Group, "/", mapping.GroupVersionKind.Version) { - // for _, apiresource := range apiresourcegroup.APIResources { - // if apiresource.Name == mapping.Resource.Resource && apiresource.Kind == mapping.GroupVersionKind.Kind { - // rsrc = mapping.Resource - // namespaced = apiresource.Namespaced - // } - // } - // } - // } - var unstruct unstructured.Unstructured - unstruct.Object = make(map[string]interface{}) - var blob interface{} - if err = json.Unmarshal(ext.Raw, &blob); err != nil { - klog.Errorf("Error unmarshalling, err=%#v", err) - return []*v1.Pod{}, err + unstruct, err := UnmarshalToUnstructured(ext.Raw) + if err != nil { + return []*v1.Pod{}, err } + ownerRef := metav1.NewControllerRef(aw, appWrapperKind) - unstruct.Object = blob.(map[string]interface{}) // set object to the content of the blob after Unmarshalling unstruct.SetOwnerReferences(append(unstruct.GetOwnerReferences(), *ownerRef)) namespace := aw.Namespace // only create resources in AppWrapper namespace - name := "" - if md, ok := unstruct.Object["metadata"]; ok { - metadata := md.(map[string]interface{}) - if objectName, ok := metadata["name"]; ok { - name = objectName.(string) - } - if objectns, ok := metadata["namespace"]; ok { - if objectns.(string) != namespace { - err := fmt.Errorf("[SyncQueueJob] resource namespace \"%s\" is different from AppWrapper namespace \"%s\"", objectns.(string), namespace) - return []*v1.Pod{}, err - } - } + name, err = retrieveName(aw.Namespace, unstruct, "SyncQueueJob") + if err != nil { + return []*v1.Pod{}, err } + labels := map[string]string{} if unstruct.GetLabels() == nil { unstruct.SetLabels(labels) @@ -316,13 +209,9 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra // Check to see if object already exists in etcd, if not, create the object. if inEtcd == nil || len(inEtcd.Items) < 1 { - newName := name - if len(newName) > 63 { - newName = newName[:63] - } + newName := truncateName(name) unstruct.SetName(newName) //Asumption object is always namespaced - //Refer to comment on line 238 namespaced = true err = createObject(namespaced, namespace, newName, rsrc, unstruct, dclient) if err != nil { @@ -335,177 +224,9 @@ func (gr *GenericResources) SyncQueueJob(aw *arbv1.AppWrapper, awr *arbv1.AppWra } } - // Get the related resources of created object - // var thisObj *unstructured.Unstructured - // var err1 error - // if namespaced { - // thisObj, err1 = dclient.Resource(rsrc).Namespace(namespace).Get(context.Background(), name, metav1.GetOptions{}) - // } else { - // thisObj, err1 = dclient.Resource(rsrc).Get(context.Background(), name, metav1.GetOptions{}) - // } - // if err1 != nil { - // klog.Errorf("Could not get created resource with error %v", err1) - // return []*v1.Pod{}, err1 - // } - // thisOwnerRef := metav1.NewControllerRef(thisObj, thisObj.GroupVersionKind()) - - // podL, _ := gr.clients.CoreV1().Pods("").List(context.Background(), metav1.ListOptions{}) - // pods := []*v1.Pod{} - // for _, pod := range (*podL).Items { - // parent := metav1.GetControllerOf(&pod) - // if reflect.DeepEqual(thisOwnerRef, parent) { - // pods = append(pods, &pod) - // } - // klog.V(10).Infof("[SyncQueueJob] pod %s created from a Generic Item\n", pod.Name) - // } - // return pods, nil return []*v1.Pod{}, nil } -// checks if object has pod template spec and add new labels -func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map[string]string) (hasFields bool) { - spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") - if !isFound { - klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec' field not found.") - return false - } - template, isFound, _ := unstructured.NestedMap(spec, "template") - if !isFound { - klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template' field not found.") - return false - } - - marshal, _ := json.Marshal(template) - unmarshal := v1.PodTemplateSpec{} - if err := json.Unmarshal(marshal, &unmarshal); err != nil { - klog.Warning(err) - return false - } - existingLabels, isFound, _ := unstructured.NestedStringMap(template, "metadata", "labels") - if !isFound { - klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template.metadata.labels' field not found.") - return false - } - newLength := len(existingLabels) + len(labels) - m := make(map[string]string, newLength) // convert map[string]string into map[string]interface{} - for k, v := range existingLabels { - m[k] = v - } - - for k, v := range labels { - m[k] = v - } - - if err := unstructured.SetNestedStringMap(unstruct.Object, m, "spec", "template", "metadata", "labels"); err != nil { - klog.Warning(err) - return false - } - - return isFound -} - -// checks if object has replicas and containers field -func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, containers []v1.Container) { - var unstruct unstructured.Unstructured - unstruct.Object = make(map[string]interface{}) - var blob interface{} - if err := json.Unmarshal(obj.Raw, &blob); err != nil { - klog.Errorf("Error unmarshalling, err=%#v", err) - return false, 0, nil - } - unstruct.Object = blob.(map[string]interface{}) - spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") - if !isFound { - klog.Warningf("[hasFields] No spec field found in raw object: %#v", unstruct.UnstructuredContent()) - } - - replicas, isFound, _ := unstructured.NestedFloat64(spec, "replicas") - // Set default to 1 if no replicas field is found (handles the case of a single pod creation without replicaset. - if !isFound { - replicas = 1 - } - - template, isFound, _ := unstructured.NestedMap(spec, "template") - // If spec does not contain a podtemplate, check for pod singletons - var subspec map[string]interface{} - if !isFound { - subspec = spec - klog.V(6).Infof("[hasFields] No template field found in raw object: %#v", spec) - } else { - subspec, isFound, _ = unstructured.NestedMap(template, "spec") - } - - containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers") - if !isFound { - klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec) - return false, 0, nil - } - objContainers := make([]v1.Container, len(containerList)) - for _, container := range containerList { - marshal, _ := json.Marshal(container) - unmarshal := v1.Container{} - _ = json.Unmarshal(marshal, &unmarshal) - objContainers = append(objContainers, unmarshal) - } - return isFound, replicas, objContainers -} - -func createObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, unstruct unstructured.Unstructured, dclient dynamic.Interface) (erro error) { - var err error - if !namespaced { - res := dclient.Resource(rsrc) - _, err = res.Create(context.Background(), &unstruct, metav1.CreateOptions{}) - if err != nil { - if errors.IsAlreadyExists(err) { - klog.Errorf("%v\n", err.Error()) - return nil - } else { - klog.Errorf("Error creating the object `%v`, the error is `%v`", name, errors.ReasonForError(err)) - return err - } - } else { - klog.V(4).Infof("Resource `%v` created\n", name) - return nil - } - } else { - res := dclient.Resource(rsrc).Namespace(namespace) - _, err = res.Create(context.Background(), &unstruct, metav1.CreateOptions{}) - if err != nil { - if errors.IsAlreadyExists(err) { - klog.Errorf("%v\n", err.Error()) - return nil - } else { - klog.Errorf("Error creating the object `%v`, the error is `%v`", name, errors.ReasonForError(err)) - return err - } - } else { - klog.V(4).Infof("Resource `%v` created\n", name) - return nil - - } - } -} - -func deleteObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) error { - var err error - backGround := metav1.DeletePropagationBackground - delOptions := metav1.DeleteOptions{PropagationPolicy: &backGround} - if !namespaced { - res := dclient.Resource(rsrc) - err = res.Delete(context.Background(), name, delOptions) - } else { - res := dclient.Resource(rsrc).Namespace(namespace) - err = res.Delete(context.Background(), name, delOptions) - } - - if err != nil && !errors.IsNotFound(err) { - klog.Errorf("[deleteObject] Error deleting the object `%v`, the error is `%v`.", name, errors.ReasonForError(err)) - return err - } else { - klog.V(4).Infof("[deleteObject] Resource `%v` deleted.\n", name) - return nil - } -} func GetListOfPodResourcesFromOneGenericItem(awr *arbv1.AppWrapperGenericResource) (resource []*clusterstateapi.Resource, er error) { var podResourcesList []*clusterstateapi.Resource @@ -573,87 +294,25 @@ func GetResources(awr *arbv1.AppWrapperGenericResource) (resource *clusterstatea return totalresource, err } -func getPodResources(pod arbv1.CustomPodResourceTemplate) (resource *clusterstateapi.Resource) { - replicas := pod.Replicas - req := clusterstateapi.NewResource(pod.Requests) - limit := clusterstateapi.NewResource(pod.Limits) - tolerance := 0.001 - - // Use limit if request is 0 - if diff := math.Abs(req.MilliCPU - float64(0.0)); diff < tolerance { - req.MilliCPU = limit.MilliCPU - } - - if diff := math.Abs(req.Memory - float64(0.0)); diff < tolerance { - req.Memory = limit.Memory - } - - if req.GPU <= 0 { - req.GPU = limit.GPU - } - req.MilliCPU = req.MilliCPU * float64(replicas) - req.Memory = req.Memory * float64(replicas) - req.GPU = req.GPU * int64(replicas) - return req -} - -func getContainerResources(container v1.Container, replicas float64) *clusterstateapi.Resource { - req := clusterstateapi.NewResource(container.Resources.Requests) - limit := clusterstateapi.NewResource(container.Resources.Limits) - - tolerance := 0.001 - // Use limit if request is 0 - if diff := math.Abs(req.MilliCPU - float64(0.0)); diff < tolerance { - req.MilliCPU = limit.MilliCPU - } - - if diff := math.Abs(req.Memory - float64(0.0)); diff < tolerance { - req.Memory = limit.Memory - } - if req.GPU <= 0 { - req.GPU = limit.GPU - } - req.MilliCPU = req.MilliCPU * float64(replicas) - req.Memory = req.Memory * float64(replicas) - req.GPU = req.GPU * int64(replicas) - return req -} // returns status of an item present in etcd func (gr *GenericResources) IsItemCompleted(awgr *arbv1.AppWrapperGenericResource, namespace string, appwrapperName string, genericItemName string) (completed bool) { dd := gr.clients.Discovery() - apigroups, err := restmapper.GetAPIGroupResources(dd) - if err != nil { - klog.Errorf("[IsItemCompleted] Error getting API resources, err=%#v", err) - return false - } - restmapper := restmapper.NewDiscoveryRESTMapper(apigroups) - _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(awgr.GenericTemplate.Raw, nil, nil) - if err != nil { - klog.Errorf("[IsItemCompleted] Decoding error, please check your CR! Aborting handling the resource creation, err: `%v`", err) - return false - } - mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version) + _, mapping, err := getResourceMapping(dd, awgr.GenericTemplate.Raw, nil) if err != nil { - klog.Errorf("[IsItemCompleted] mapping error from raw object: `%v`", err) - return false + return false } - restconfig := gr.kubeClientConfig - restconfig.GroupVersion = &schema.GroupVersion{ - Group: mapping.GroupVersionKind.Group, - Version: mapping.GroupVersionKind.Version, - } - rsrc := mapping.Resource - dclient, err := dynamic.NewForConfig(restconfig) + + dclient, err := createDynamicClient(gr, mapping) if err != nil { - klog.Errorf("[IsItemCompleted] Error creating new dynamic client, err %v", err) return false } + rsrc := mapping.Resource labelSelector := fmt.Sprintf("%s=%s", appwrapperJobName, appwrapperName) inEtcd, err := dclient.Resource(rsrc).Namespace(namespace).List(context.Background(), metav1.ListOptions{LabelSelector: labelSelector}) if err != nil { diff --git a/pkg/controller/queuejobresources/genericresource/helper.go b/pkg/controller/queuejobresources/genericresource/helper.go new file mode 100644 index 00000000..f19d9599 --- /dev/null +++ b/pkg/controller/queuejobresources/genericresource/helper.go @@ -0,0 +1,253 @@ +package genericresource + +import ( + "math" + "fmt" + "encoding/json" + "k8s.io/apimachinery/pkg/runtime" + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/api/errors" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" + v1 "k8s.io/api/core/v1" + + clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api" + "k8s.io/klog/v2" + + "k8s.io/client-go/restmapper" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/dynamic" + +) + +func getPodResources(pod arbv1.CustomPodResourceTemplate) *clusterstateapi.Resource { + req := clusterstateapi.NewResource(pod.Requests) + limit := clusterstateapi.NewResource(pod.Limits) + + calculateResources(req, limit, float64(pod.Replicas)) + + return req +} + +func getContainerResources(container v1.Container, replicas float64) *clusterstateapi.Resource { + req := clusterstateapi.NewResource(container.Resources.Requests) + limit := clusterstateapi.NewResource(container.Resources.Limits) + + calculateResources(req, limit, replicas) + + return req +} + + +func calculateResources(req *clusterstateapi.Resource, limit *clusterstateapi.Resource, replicas float64) { + tolerance := 0.001 + + // Use limit if request is 0 + if diff := math.Abs(req.MilliCPU); diff < tolerance { + req.MilliCPU = limit.MilliCPU + } + + if diff := math.Abs(req.Memory); diff < tolerance { + req.Memory = limit.Memory + } + + if req.GPU <= 0 { + req.GPU = limit.GPU + } + + req.MilliCPU = req.MilliCPU * replicas + req.Memory = req.Memory * replicas + req.GPU = req.GPU * int64(replicas) +} + +func retrieveName(awNamespace string, unstruct unstructured.Unstructured, logContext string) (string, error) { + var name string + + if md, ok := unstruct.Object["metadata"]; ok { + metadata := md.(map[string]interface{}) + if objectName, ok := metadata["name"]; ok { + name = objectName.(string) + } + if objectns, ok := metadata["namespace"]; ok { + if objectns.(string) != awNamespace { + return "", fmt.Errorf("[%s] resource namespace \"%s\" is different from AppWrapper namespace \"%s\"", logContext, objectns.(string), awNamespace) + } + } + } + + return name, nil +} + +func createDynamicClient(gr *GenericResources, mapping *meta.RESTMapping) (dynamic.Interface, error) { + restconfig := gr.kubeClientConfig + restconfig.GroupVersion = &schema.GroupVersion{ + Group: mapping.GroupVersionKind.Group, + Version: mapping.GroupVersionKind.Version, + } + + dclient, err := dynamic.NewForConfig(restconfig) + if err != nil { + klog.Errorf("Error creating new dynamic client, err: %v", err) + return nil, err + } + return dclient, nil +} + +func getResourceMapping(dd discovery.DiscoveryInterface, raw []byte, defaultGVK *schema.GroupVersionKind) (*schema.GroupVersionKind, *meta.RESTMapping, error) { + apigroups, err := restmapper.GetAPIGroupResources(dd) + if err != nil { + klog.Errorf("Error getting API resources, err=%#v", err) + return nil, nil, err + } + + restmapper := restmapper.NewDiscoveryRESTMapper(apigroups) + _, gvk, err := unstructured.UnstructuredJSONScheme.Decode(raw, defaultGVK, nil) + if err != nil { + klog.Errorf("Decoding error, please check your CR! err: `%v`", err) + return nil, nil, err + } + + mapping, err := restmapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + klog.Errorf("Mapping error from raw object: `%v`", err) + return nil, nil, err + } + + return gvk, mapping, nil +} + + +// hasFields checks if obj has replicas and containers field +func hasFields(obj runtime.RawExtension) (hasFields bool, replica float64, containers []v1.Container) { + unstruct, err := UnmarshalToUnstructured(obj.Raw) + if err != nil { + return false, 0, nil + } + + spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") + if !isFound { + klog.Warningf("[hasFields] No spec field found in raw object: %#v", unstruct.UnstructuredContent()) + } + + replicas, isFound, _ := unstructured.NestedFloat64(spec, "replicas") + // Set default to 1 if no replicas field is found (handles the case of a single pod creation without replicaset. + if !isFound { + replicas = 1 + } + + template, isFound, _ := unstructured.NestedMap(spec, "template") + // If spec does not contain a podtemplate, check for pod singletons + var subspec map[string]interface{} + if !isFound { + subspec = spec + klog.V(6).Infof("[hasFields] No template field found in raw object: %#v", spec) + } else { + subspec, isFound, _ = unstructured.NestedMap(template, "spec") + } + + containerList, isFound, _ := unstructured.NestedSlice(subspec, "containers") + if !isFound { + klog.Warningf("[hasFields] No containers field found in raw object: %#v", subspec) + return false, 0, nil + } + objContainers := make([]v1.Container, len(containerList)) + for _, container := range containerList { + marshal, _ := json.Marshal(container) + unmarshal := v1.Container{} + _ = json.Unmarshal(marshal, &unmarshal) + objContainers = append(objContainers, unmarshal) + } + return isFound, replicas, objContainers +} + +// addLabelsToPodTemplateField checks if unstruct has pod template spec and add new labels +func addLabelsToPodTemplateField(unstruct *unstructured.Unstructured, labels map[string]string) (hasFields bool) { + spec, isFound, _ := unstructured.NestedMap(unstruct.UnstructuredContent(), "spec") + if !isFound { + klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec' field not found.") + return false + } + template, isFound, _ := unstructured.NestedMap(spec, "template") + if !isFound { + klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template' field not found.") + return false + } + + marshal, _ := json.Marshal(template) + unmarshal := v1.PodTemplateSpec{} + if err := json.Unmarshal(marshal, &unmarshal); err != nil { + klog.Warning(err) + return false + } + existingLabels, isFound, _ := unstructured.NestedStringMap(template, "metadata", "labels") + if !isFound { + klog.V(10).Infof("[addLabelsToPodTemplateField] 'spec.template.metadata.labels' field not found.") + return false + } + newLength := len(existingLabels) + len(labels) + m := make(map[string]string, newLength) // convert map[string]string into map[string]interface{} + for k, v := range existingLabels { + m[k] = v + } + + for k, v := range labels { + m[k] = v + } + + if err := unstructured.SetNestedStringMap(unstruct.Object, m, "spec", "template", "metadata", "labels"); err != nil { + klog.Warning(err) + return false + } + + return isFound +} + +func createObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, unstruct unstructured.Unstructured, dclient dynamic.Interface) error { + res := getResourceInterface(namespaced, namespace, rsrc, dclient) + + _, err := res.Create(context.Background(), &unstruct, metav1.CreateOptions{}) + if err != nil { + if errors.IsAlreadyExists(err) { + klog.Errorf("[createObject] Object `%v` already exists: %v", name, err) + return nil + } + klog.Errorf("[createObject] Error creating the object `%v`: %v", name, err) + return err + } + + klog.V(4).Infof("[createObject] Resource `%v` created\n", name) + return nil +} + +func deleteObject(namespaced bool, namespace string, name string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) error { + backGround := metav1.DeletePropagationBackground + delOptions := metav1.DeleteOptions{PropagationPolicy: &backGround} + + res := getResourceInterface(namespaced, namespace, rsrc, dclient) + + err := res.Delete(context.Background(), name, delOptions) + if err != nil { + if errors.IsNotFound(err) { + klog.V(4).Infof("[deleteObject] object `%v` not found. No action taken.", name) + return nil + } + klog.Errorf("[deleteObject] Error deleting the object `%v`: %v", name, err) + return err + } + + klog.V(4).Infof("[deleteObject] Resource `%v` deleted.\n", name) + return nil +} + +func getResourceInterface(namespaced bool, namespace string, rsrc schema.GroupVersionResource, dclient dynamic.Interface) dynamic.ResourceInterface { + if namespaced { + return dclient.Resource(rsrc).Namespace(namespace) + } + return dclient.Resource(rsrc) +} diff --git a/pkg/controller/queuejobresources/genericresource/utils.go b/pkg/controller/queuejobresources/genericresource/utils.go new file mode 100644 index 00000000..a4790fed --- /dev/null +++ b/pkg/controller/queuejobresources/genericresource/utils.go @@ -0,0 +1,38 @@ +package genericresource + +import ( + "encoding/json" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func UnmarshalToUnstructured(raw []byte) (unstruct unstructured.Unstructured, err error) { + unstruct.Object = make(map[string]interface{}) + var blob interface{} + err = json.Unmarshal(raw, &blob) + if err != nil { + return unstruct, err + } + unstruct.Object = blob.(map[string]interface{}) + return unstruct, nil +} + +func truncateName(name string) string { + newName := name + if len(newName) > 63 { + newName = newName[:63] + } + return newName +} + +func join(strs ...string) string { + var result string + if strs[0] == "" { + return strs[len(strs)-1] + } + for _, str := range strs { + result += str + } + return result +} +