Skip to content

Commit

Permalink
Merge branch from 'develop_v2' into 'master'
Browse files Browse the repository at this point in the history
  • Loading branch information
StarCoral authored and justin0u0 committed Aug 10, 2022
1 parent f7bac1f commit 6eeaa09
Show file tree
Hide file tree
Showing 8 changed files with 24 additions and 54 deletions.
24 changes: 8 additions & 16 deletions pkg/scheduler/config.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
package scheduler

import (
"KubeShare/pkg/lib/queue"
"io/ioutil"
"os"
"reflect"
"strconv"

"github.com/fsnotify/fsnotify"
"KubeShare/pkg/lib/queue"

"github.com/fsnotify/fsnotify"
"github.com/spf13/viper"
"gopkg.in/yaml.v2"
)
Expand Down Expand Up @@ -36,24 +35,23 @@ type CellSpec struct {
}

func (kss *KubeShareScheduler) initRawConfig() *Config {
c := Config{}
var c Config

configPath := kss.args.kubeShareConfig
// convert raw data to yaml
yamlBytes, err := ioutil.ReadFile(configPath)
yamlBytes, err := os.ReadFile(configPath)
if err != nil {
kss.ksl.Errorf("Faild to read config file: %v, %v", configPath, err)
kss.ksl.Errorf("Failed to read config file: %v, %v", configPath, err)
}

err = yaml.Unmarshal(yamlBytes, &c)
if err != nil {
if err := yaml.Unmarshal(yamlBytes, &c); err != nil {
kss.ksl.Errorf("Failed to unmarshal YAML %#v to object: %v", string(yamlBytes), err)
}

if c.CellTypes == nil || c.Cells == nil {
kss.ksl.Warn("The cellTypes and cells in kubeshare config file is nil")

}

kss.checkPhysicalCells(&c)
return &c
}
Expand All @@ -76,12 +74,7 @@ func (kss *KubeShareScheduler) checkPhysicalCells(c *Config) {
}

// infer the child's configuration from the parent's configuration
func inferCellSpec(
spec *CellSpec,
cellTypes map[string]CellTypeSpec,
cellType string,
defaultID int) {

func inferCellSpec(spec *CellSpec, cellTypes map[string]CellTypeSpec, cellType string, defaultID int) {
idQueue := queue.NewQueue()
q := queue.NewQueue()
q.Enqueue(spec)
Expand Down Expand Up @@ -140,5 +133,4 @@ func (kss *KubeShareScheduler) watchConfig(c *Config) {
os.Exit(0)
}
})

}
1 change: 0 additions & 1 deletion pkg/scheduler/gpu.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ func (kss *KubeShareScheduler) getGPUByNode(nodeName string) {
})
}
kss.gpuInfos[nodeName] = gpuInfos

}
1 change: 0 additions & 1 deletion pkg/scheduler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ func (kss *KubeShareScheduler) addNode(obj interface{}) {
} else {
kss.setNodeStatus(name, false)
}

}

func (kss *KubeShareScheduler) updateNode(oldObj, newObj interface{}) {
Expand Down
2 changes: 0 additions & 2 deletions pkg/scheduler/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func (kss *KubeShareScheduler) addPod(obj interface{}) {
}
kss.ksl.Infof("[Sync Resource] add pod %v/%v(%v) to bound queue", pod.Namespace, pod.Name, pod.UID)
kss.boundPodQueue[pod.Spec.NodeName].Enqueue(pod)

}
}

Expand Down Expand Up @@ -325,7 +324,6 @@ func (kss *KubeShareScheduler) getPodLabels(pod *v1.Pod) (string, bool, *PodStat

kss.podStatus[key] = ps
return "", true, ps

}

// delete pod status by namespace/name of pod
Expand Down
27 changes: 10 additions & 17 deletions pkg/scheduler/pod_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,11 @@ package scheduler

import (
"fmt"
"math"
"strconv"
"time"
"math"

v1 "k8s.io/api/core/v1"
//podutil "k8s.io/kubernetes/pkg/api/v1/pod"
)

type PodGroupInfo struct {
Expand Down Expand Up @@ -39,7 +38,7 @@ type PodGroupInfo struct {
// => it stores the created PodGroup in PodGroupInfo
// => it also returns the pod's PodGroupMinAvailable (0 if not specified).
func (kss *KubeShareScheduler) getOrCreatePodGroupInfo(pod *v1.Pod, ts time.Time) *PodGroupInfo {
podGroupName, PodGroupHeadcount, PodGroupThreshold, podMinAvailable := kss.getPodGroupLabels(pod)
podGroupName, podGroupHeadcount, podGroupThreshold, podMinAvailable := kss.getPodGroupLabels(pod)

var pgKey string
if len(PodGroupName) > 0 && podMinAvailable > 0 {
Expand All @@ -49,10 +48,8 @@ func (kss *KubeShareScheduler) getOrCreatePodGroupInfo(pod *v1.Pod, ts time.Time
kss.podGroupMutex.Lock()
defer kss.podGroupMutex.Unlock()
// If it is a PodGroup and present in PodGroupInfos, return it.
if len(pgKey) != 0 {

pgInfo, exist := kss.podGroupInfos[pgKey]
if exist {
if pgKey != "" {
if pgInfo, ok := kss.podGroupInfos[pgKey]; ok {
// If the deleteTimestamp isn't nil,
// it means that the PodGroup is marked as expired before.
// So we need to set the deleteTimestamp as nil again to mark the PodGroup active.
Expand All @@ -73,20 +70,20 @@ func (kss *KubeShareScheduler) getOrCreatePodGroupInfo(pod *v1.Pod, ts time.Time
priority: priority, // podutil.GetPodPriority(pod) + kss.getPodPrioriy(pod)
timestamp: ts,
minAvailable: podMinAvailable,
headCount: PodGroupHeadcount,
threshold: PodGroupThreshold,
headCount: podGroupHeadcount,
threshold: podGroupThreshold,
}
// If it's not a regular Pod, store the PodGroup in PodGroupInfos
if len(pgKey) > 0 {
if pgKey != "" {
kss.podGroupInfos[pgKey] = pgInfo
}
return pgInfo
}

// checks if the pod belongs to a PodGroup.
// If so, it will return the podGroupName, headcount, threshold, minAvailable.
// If so, it will return the podGroupName, headcount, threshold, minAvailable.
// If not, it will return "" and 0.
func (kss *KubeShareScheduler) getPodGroupLabels(pod *v1.Pod) (string, int, float64, int) {
func (kss *KubeShareScheduler) getPodGroupLabels(pod *v1.Pod) (string, int, float64, int) {
podGroupName, ok := pod.Labels[PodGroupName]
if !ok || len(podGroupName) == 0 {
return "", 0, 0.0, 0
Expand Down Expand Up @@ -114,8 +111,7 @@ func (kss *KubeShareScheduler) getPodGroupLabels(pod *v1.Pod) (string, int, flo
return "", 0, 0.0, 0
}


minAvailable := int(math.Floor(thres*float64(headcnt)+0.5))
minAvailable := int(math.Floor(thres*float64(headcnt) + 0.5))

return podGroupName, headcnt, thres, minAvailable
}
Expand All @@ -126,11 +122,8 @@ func (kss *KubeShareScheduler) podGroupInfoGC() {

for key, pgInfo := range kss.podGroupInfos {
if pgInfo.deletionTimestamp != nil && pgInfo.deletionTimestamp.Add(time.Duration(kss.args.PodGroupExpirationTimeSeconds)*time.Second).Before(kss.clock.Now()) {

kss.ksl.Warn(key, " is out of date and has been deleted in PodGroup GarbegeCollection")
delete(kss.podGroupInfos, key)

}
}

}
18 changes: 3 additions & 15 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package scheduler
import (
"context"
"fmt"

"math"
"os"
"sync"
Expand Down Expand Up @@ -365,16 +364,12 @@ func (kss *KubeShareScheduler) Filter(ctx context.Context, state *framework.Cycl
memory := ps.memory

gpuModelInfos := kss.gpuInfos[nodeName]
model := ps.model
assignedGPU := false
if model != "" {
assignedGPU = true
}

kss.cellMutex.RLock()
defer kss.cellMutex.RUnlock()

// check if the node has the specified gpu or not
if assignedGPU {
if model := ps.model; model != "" {
kss.ksl.Infof("[Filter] Pod %v/%v(%v) specified gpu %v", pod.Namespace, pod.Name, pod.UID, model)
if _, ok := gpuModelInfos[model]; !ok {
msg := fmt.Sprintf("[Filter] Node %v without the specified gpu %v of pod %v/%v(%v)", nodeName, model, pod.Namespace, pod.Name, pod.UID)
Expand All @@ -383,24 +378,21 @@ func (kss *KubeShareScheduler) Filter(ctx context.Context, state *framework.Cycl
}

// check the specified gpu has sufficient gpu resource
fit, _, _ := kss.filterNode(nodeName, model, request, memory)
if fit {
if fit, _, _ := kss.filterNode(nodeName, model, request, memory); fit {
kss.ksl.Infof("[Filter] Node %v meet the gpu requirement of pod %v/%v(%v)", nodeName, pod.Namespace, pod.Name, pod.UID)
return framework.NewStatus(framework.Success, "")
} else {
msg := fmt.Sprintf("[Filter] Node %v doesn't meet the gpu request of pod %v/%v(%v)", nodeName, pod.Namespace, pod.Name, pod.UID)
kss.ksl.Infof(msg)
return framework.NewStatus(framework.Unschedulable, msg)
}

}

// filter the node according to its gpu resource
ok := false
available := 0.0
freeMemory := int64(0)
for model := range gpuModelInfos {

fit, currentAvailable, currentMemory := kss.filterNode(nodeName, model, request, memory)
available += currentAvailable
freeMemory += currentMemory
Expand Down Expand Up @@ -444,7 +436,6 @@ func (kss *KubeShareScheduler) Score(ctx context.Context, state *framework.Cycle
}

func (kss *KubeShareScheduler) ScoreExtensions() framework.ScoreExtensions {

kss.ksl.Infof("[ScoreExtensions]")
return kss
}
Expand Down Expand Up @@ -473,7 +464,6 @@ func (kss *KubeShareScheduler) NormalizeScore(ctx context.Context, state *framew
}
maxScore += reverse
minScore = 0

}

if maxScore <= 100 && maxScore >= 0 && minScore <= 100 && minScore >= 0 {
Expand Down Expand Up @@ -518,7 +508,6 @@ func (kss *KubeShareScheduler) Reserve(ctx context.Context, state *framework.Cyc
var podCopy *v1.Pod
if multiGPU {
podCopy = kss.newAssumedMultiGPUPod(pod, nodeName)

} else {
podCopy = kss.newAssumedSharedGPUPod(pod, nodeName)
}
Expand Down Expand Up @@ -557,7 +546,6 @@ func (kss *KubeShareScheduler) Unreserve(ctx context.Context, state *framework.C
waitingPod.Reject(kss.Name())
}
})

}

func (kss *KubeShareScheduler) Permit(ctx context.Context, state *framework.CycleState, pod *v1.Pod, nodeName string) (*framework.Status, time.Duration) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/score.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (kss *KubeShareScheduler) calculateGuaranteePodScore(nodeName string, podSt
return score
}

// socre = ( cell priority(computation power)
// score = ( cell priority(computation power)
// - gpu resource usage(defragmentation)
// - average locality(placement sensitivity) ) / # of cell
func (kss *KubeShareScheduler) calculateGuaranteePodNodeScore(cellList CellList, podGroup string) float64 {
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func (kss *KubeShareScheduler) printPodStatus(ps *PodStatus) {
// return len(pods)
// }

// calculateTotalPods calculate the number of pods in the given pod group
func (kss *KubeShareScheduler) caculateTotalPods(namespace, podGroupName string) int {
pods, err := kss.podLister.Pods(namespace).List(labels.Set{PodGroupName: podGroupName}.AsSelector())
if err != nil {
kss.ksl.Error(err)
return 0
}
podSet := map[string]bool{}

podSet := map[string]bool{}
for _, pod := range pods {
if pod.Status.Phase == v1.PodFailed {
continue
Expand Down

0 comments on commit 6eeaa09

Please sign in to comment.