Skip to content

Commit

Permalink
scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
lbzss committed Aug 19, 2024
1 parent a98999b commit 41e3f41
Showing 1 changed file with 240 additions and 2 deletions.
242 changes: 240 additions & 2 deletions release 1.30/kube-scheduler/scheduler.md
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,9 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
schedulingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 尝试调度单个pod,具体看下面schedulingCycle
// 如果调度成功,会返回调度结果,其中包含推荐节点信息,pod绑定节点信息等
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
// 如果调度失败,返回
if !status.IsSuccess() {
sched.FailureHandler(schedulingCycleCtx, fwk, assumedPodInfo, status, scheduleResult.nominatingInfo, start)
return
Expand All @@ -706,18 +708,21 @@ func (sched *Scheduler) ScheduleOne(ctx context.Context) {
metrics.Goroutines.WithLabelValues(metrics.Binding).Inc()
defer metrics.Goroutines.WithLabelValues(metrics.Binding).Dec()

// pod 真正绑定到节点,看下面bindingCycle
status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
return
}
// 从优先级队列中删除该pod,仅做二次保证,通常情况下该操作由队列自身的循环函数处理
// Usually, DonePod is called inside the scheduling queue,
// but in this case, we need to call it here because this Pod won't go back to the scheduling queue.
sched.SchedulingQueue.Done(assumedPodInfo.Pod.UID)
}()
}
```
#### schedulingCycle
调度单个pod
```go
func (sched *Scheduler) schedulingCycle(
ctx context.Context,
Expand All @@ -729,16 +734,19 @@ func (sched *Scheduler) schedulingCycle(
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
logger := klog.FromContext(ctx)
pod := podInfo.Pod
// 看下文SchedulePod
// 这里已经通过预选和打分选择到了最合适的一个节点,并且还返回了参与调度的节点数量和通过预选和优选插件的节点数量
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
defer func() {
metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
}()
}()
// 如果错误是没有可用节点,则直接返回
if err == ErrNoNodesAvailable {
status := framework.NewStatus(framework.UnschedulableAndUnresolvable).WithError(err)
return ScheduleResult{nominatingInfo: clearNominatedNode}, podInfo, status
}

// 如果没有节点匹配,也直接返回
fitError, ok := err.(*framework.FitError)
if !ok {
logger.Error(err, "Error selecting node for pod", "pod", klog.KObj(pod))
Expand All @@ -755,6 +763,7 @@ func (sched *Scheduler) schedulingCycle(
return ScheduleResult{}, podInfo, framework.NewStatus(framework.Unschedulable).WithError(err)
}

// 如果本次调度失败,但是有PostFilter插件,那么就尝试运行插件,下次调度的时候优先使用这个提名节点
// Run PostFilter plugins to attempt to make the pod schedulable in a future scheduling cycle.
result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)
msg := status.Message()
Expand All @@ -773,10 +782,12 @@ func (sched *Scheduler) schedulingCycle(
}

metrics.SchedulingAlgorithmLatency.Observe(metrics.SinceInSeconds(start))
// 更新缓存,假定pod已经在推荐节点上运行
// Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet.
// This allows us to keep scheduling without waiting on binding to occur.
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// 更新assumedPod中的NodeName
// assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost
err = sched.assume(logger, assumedPod, scheduleResult.SuggestedHost)
if err != nil {
Expand All @@ -788,8 +799,10 @@ func (sched *Scheduler) schedulingCycle(
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, framework.AsStatus(err)
}

// 当资源已经预留给 Pod 时,会通知插件
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {
// 如果插件验证不通过,则回收之前保留的资源
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
Expand All @@ -810,9 +823,11 @@ func (sched *Scheduler) schedulingCycle(
return ScheduleResult{nominatingInfo: clearNominatedNode}, assumedPodInfo, sts
}

// 准入插件
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
// 如果准入插件验证不通过,则回收之前保留的资源
// trigger un-reserve to clean up state associated with the reserved Pod
fwk.RunReservePluginsUnreserve(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if forgetErr := sched.Cache.ForgetPod(logger, assumedPod); forgetErr != nil {
Expand Down Expand Up @@ -843,4 +858,227 @@ func (sched *Scheduler) schedulingCycle(

return scheduleResult, assumedPodInfo, nil
}
```
#### SchedulePod
```go
// 尝试调度单个pod到node列表中的一个可用节点上。如果调度成功,返回调度结果,否则返回错误
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
trace := utiltrace.New("Scheduling", utiltrace.Field{Key: "namespace", Value: pod.Namespace}, utiltrace.Field{Key: "name", Value: pod.Name})
defer trace.LogIfLong(100 * time.Millisecond)
// 更新节点上的pod调度信息
if err := sched.Cache.UpdateSnapshot(klog.FromContext(ctx), sched.nodeInfoSnapshot); err != nil {
return result, err
}
trace.Step("Snapshotting scheduler cache and node infos done")

// 如果缓存中的node数量为0,则返回错误
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}

// 寻找满足pod调度条件的node列表,看下面findNodesThatFitPod
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
if err != nil {
return result, err
}
trace.Step("Computing predicates done")

// 如果可用node为0,则直接返回错误
if len(feasibleNodes) == 0 {
return result, &framework.FitError{
Pod: pod,
NumAllNodes: sched.nodeInfoSnapshot.NumNodes(),
Diagnosis: diagnosis,
}
}
// 如果可用node为1,就直接使用这个node
// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Node().Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}
// 运行打分插件对通过预选的节点进行打分,对于一个节点来说可能有多个打分插件,会计算加权分数来获取一个优先级列表
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
if err != nil {
return result, err
}
// 根据上面的打分情况获取最优的node,和分数较高的几个node集合,实际上就是对上面的优先级队列进行排序并选择最前面的那一个节点
host, _, err := selectHost(priorityList, numberOfHighestScoredNodesToReport)
trace.Step("Prioritizing done")

return ScheduleResult{
SuggestedHost: host,
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
}
```
#### findNodesThatFitPod
```go
func (sched *Scheduler) findNodesThatFitPod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*framework.NodeInfo, framework.Diagnosis, error) {
logger := klog.FromContext(ctx)
// 获取所有节点列表
allNodes, err := sched.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap),
}, err
}
// 初始化diagnosis,用于记录节点调度情况
diagnosis := framework.Diagnosis{
NodeToStatusMap: make(framework.NodeToStatusMap, len(allNodes)),
}
// 预选阶段,运行预选插件,返回预选结果和状态码,如果预选插件返回的结果不是Success或skip,code将被置为non-success
// 当返回skip状态时,status中的结果和其他字段将被忽略,并且这个调度周期内的其他预选插件也会被跳过。如果返回non-success,调度将被终止
// 插件太多,不展开看
// Run "prefilter" plugins.
preRes, s := fwk.RunPreFilterPlugins(ctx, state, pod)
if !s.IsSuccess() {
if !s.IsRejected() {
return nil, diagnosis, s.AsError()
}
// All nodes in NodeToStatusMap will have the same status so that they can be handled in the preemption.
// Some non trivial refactoring is needed to avoid this copy.
for _, n := range allNodes {
diagnosis.NodeToStatusMap[n.Node().Name] = s
}

// Record the messages from PreFilter in Diagnosis.PreFilterMsg.
msg := s.Message()
diagnosis.PreFilterMsg = msg
logger.V(5).Info("Status after running PreFilter plugins for pod", "pod", klog.KObj(pod), "status", msg)
// 记录插件状态,pod创建后一直处于pending状态就来自于这里,一般是需要的依赖没有准备好,如节点不可用
diagnosis.AddPluginStatus(s)
return nil, diagnosis, nil
}

// 提名节点有可能在上一次调度周期中被隐式设置,所以需要优先尝试这些节点是否满足要求
// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.
// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.
if len(pod.Status.NominatedNodeName) > 0 {
feasibleNodes, err := sched.evaluateNominatedNode(ctx, pod, fwk, state, diagnosis)
if err != nil {
logger.Error(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)
}
// Nominated node passes all the filters, scheduler is good to assign this node to the pod.
if len(feasibleNodes) != 0 {
return feasibleNodes, diagnosis, nil
}
}

// 假设pod可以被调度到所有节点上,但是如果预选结果中并不是所有节点都满足要求,则筛选出那些节点,并且记录不符合预选要求的节点落选原因
nodes := allNodes
if !preRes.AllNodes() {
nodes = make([]*framework.NodeInfo, 0, len(preRes.NodeNames))
for _, n := range allNodes {
if !preRes.NodeNames.Has(n.Node().Name) {
// We consider Nodes that are filtered out by PreFilterResult as rejected via UnschedulableAndUnresolvable.
// We have to record them in NodeToStatusMap so that they won't be considered as candidates in the preemption.
diagnosis.NodeToStatusMap[n.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "node is filtered out by the prefilter result")
continue
}
nodes = append(nodes, n)
}
}
// 在选择节点时并不会对所有节点都打分,会按比例进行分配,比如需要三个节点参与打分,那就不会再去获取其余节点的信息,如果集群中节点数量较多,配置合理的指以提高调度器调度性能
// 用于过滤不能运行 Pod 的节点。 过滤器的调用顺序是可配置的。 如果没有一个节点通过所有过滤器的筛选,Pod 将会被标记为不可调度。
feasibleNodes, err := sched.findNodesThatPassFilters(ctx, fwk, state, pod, &diagnosis, nodes)
// 记录当前参与调度的节点数,以平均各节点被调度到的几率
// always try to update the sched.nextStartNodeIndex regardless of whether an error has occurred
// this is helpful to make sure that all the nodes have a chance to be searched
processedNodes := len(feasibleNodes) + len(diagnosis.NodeToStatusMap)
sched.nextStartNodeIndex = (sched.nextStartNodeIndex + processedNodes) % len(nodes)
if err != nil {
return nil, diagnosis, err
}
// 跟上面一样,运行额外插件,过滤掉不符合要求的节点
feasibleNodesAfterExtender, err := findNodesThatPassExtenders(ctx, sched.Extenders, pod, feasibleNodes, diagnosis.NodeToStatusMap)
if err != nil {
return nil, diagnosis, err
}
// 如果这两组插件匹配的节点数量不同,则在diagnosis的不可调度插件上集合中的所有元素中都新增插件名
if len(feasibleNodesAfterExtender) != len(feasibleNodes) {
// Extenders filtered out some nodes.
//
// Extender doesn't support any kind of requeueing feature like EnqueueExtensions in the scheduling framework.
// When Extenders reject some Nodes and the pod ends up being unschedulable,
// we put framework.ExtenderName to pInfo.UnschedulablePlugins.
// This Pod will be requeued from unschedulable pod pool to activeQ/backoffQ
// by any kind of cluster events.
// https://github.com/kubernetes/kubernetes/issues/122019
if diagnosis.UnschedulablePlugins == nil {
diagnosis.UnschedulablePlugins = sets.New[string]()
}
diagnosis.UnschedulablePlugins.Insert(framework.ExtenderName)
}

return feasibleNodesAfterExtender, diagnosis, nil
}
```
#### bindingCycle
尝试将pod绑定到节点上
```go
func (sched *Scheduler) bindingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *framework.Status {
logger := klog.FromContext(ctx)

assumedPod := assumedPodInfo.Pod

// 之前schedulingCycle中运行的准入插件,读取运行结果,并判断是否进行后续步骤
// Run "permit" plugins.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
if status.IsRejected() {
fitErr := &framework.FitError{
NumAllNodes: 1,
Pod: assumedPodInfo.Pod,
Diagnosis: framework.Diagnosis{
NodeToStatusMap: framework.NodeToStatusMap{scheduleResult.SuggestedHost: status},
UnschedulablePlugins: sets.New(status.Plugin()),
},
}
return framework.NewStatus(status.Code()).WithError(fitErr)
}
return status
}
// 预绑定
// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
return status
}

// 绑定,调用client-go更新pod状态
// Run "bind" plugins.
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status
}

// Calculating nodeResourceString can be heavy. Avoid it if klog verbosity is below 2.
logger.V(2).Info("Successfully bound pod to node", "pod", klog.KObj(assumedPod), "node", scheduleResult.SuggestedHost, "evaluatedNodes", scheduleResult.EvaluatedNodes, "feasibleNodes", scheduleResult.FeasibleNodes)
metrics.PodScheduled(fwk.ProfileName(), metrics.SinceInSeconds(start))
metrics.PodSchedulingAttempts.Observe(float64(assumedPodInfo.Attempts))
if assumedPodInfo.InitialAttemptTimestamp != nil {
metrics.PodSchedulingDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
metrics.PodSchedulingSLIDuration.WithLabelValues(getAttemptsLabel(assumedPodInfo)).Observe(metrics.SinceInSeconds(*assumedPodInfo.InitialAttemptTimestamp))
}
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)

// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(logger, podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}

return nil
}
```

0 comments on commit 41e3f41

Please sign in to comment.