Skip to content

Commit

Permalink
Implement graceful shutdown based on Pod priority
Browse files Browse the repository at this point in the history
  • Loading branch information
wzshiming committed Nov 17, 2021
1 parent d82f606 commit 545313b
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 90 deletions.
2 changes: 2 additions & 0 deletions pkg/kubelet/apis/config/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ var (
"SeccompDefault",
"SerializeImagePulls",
"ShowHiddenMetricsForVersion",
"ShutdownGracePeriodByPodPriority[*].Priority",
"ShutdownGracePeriodByPodPriority[*].ShutdownGracePeriodSeconds",
"StreamingConnectionIdleTimeout.Duration",
"SyncFrequency.Duration",
"SystemCgroups",
Expand Down
17 changes: 9 additions & 8 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -867,14 +867,15 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,

// setup node shutdown manager
shutdownManager, shutdownAdmitHandler := nodeshutdown.NewManager(&nodeshutdown.Config{
ProbeManager: klet.probeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
GetPodsFunc: klet.GetActivePods,
KillPodFunc: killPodNow(klet.podWorkers, kubeDeps.Recorder),
SyncNodeStatusFunc: klet.syncNodeStatus,
ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration,
ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
ProbeManager: klet.probeManager,
Recorder: kubeDeps.Recorder,
NodeRef: nodeRef,
GetPodsFunc: klet.GetActivePods,
KillPodFunc: killPodNow(klet.podWorkers, kubeDeps.Recorder),
SyncNodeStatusFunc: klet.syncNodeStatus,
ShutdownGracePeriodRequested: kubeCfg.ShutdownGracePeriod.Duration,
ShutdownGracePeriodCriticalPods: kubeCfg.ShutdownGracePeriodCriticalPods.Duration,
ShutdownGracePeriodByPodPriority: kubeCfg.ShutdownGracePeriodByPodPriority,
})
klet.shutdownManager = shutdownManager
klet.admitHandlers.AddPodAdmitHandler(shutdownAdmitHandler)
Expand Down
20 changes: 11 additions & 9 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

v1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/record"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/prober"
Expand All @@ -36,15 +37,16 @@ type Manager interface {

// Config represents Manager configuration
type Config struct {
ProbeManager prober.Manager
Recorder record.EventRecorder
NodeRef *v1.ObjectReference
GetPodsFunc eviction.ActivePodsFunc
KillPodFunc eviction.KillPodFunc
SyncNodeStatusFunc func()
ShutdownGracePeriodRequested time.Duration
ShutdownGracePeriodCriticalPods time.Duration
Clock clock.Clock
ProbeManager prober.Manager
Recorder record.EventRecorder
NodeRef *v1.ObjectReference
GetPodsFunc eviction.ActivePodsFunc
KillPodFunc eviction.KillPodFunc
SyncNodeStatusFunc func()
ShutdownGracePeriodRequested time.Duration
ShutdownGracePeriodCriticalPods time.Duration
ShutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority
Clock clock.Clock
}

// managerStub is a fake node shutdown managerImpl .
Expand Down
214 changes: 154 additions & 60 deletions pkg/kubelet/nodeshutdown/nodeshutdown_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,22 @@ package nodeshutdown

import (
"fmt"
"sort"
"sync"
"time"

v1 "k8s.io/api/core/v1"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/apis/scheduling"
"k8s.io/kubernetes/pkg/features"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
kubeletevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/nodeshutdown/systemd"
"k8s.io/kubernetes/pkg/kubelet/prober"
kubelettypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/utils/clock"
)

Expand Down Expand Up @@ -66,8 +68,7 @@ type managerImpl struct {
nodeRef *v1.ObjectReference
probeManager prober.Manager

shutdownGracePeriodRequested time.Duration
shutdownGracePeriodCriticalPods time.Duration
shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority

getPods eviction.ActivePodsFunc
killPodFunc eviction.KillPodFunc
Expand All @@ -84,28 +85,46 @@ type managerImpl struct {

// NewManager returns a new node shutdown manager.
func NewManager(conf *Config) (Manager, lifecycle.PodAdmitHandler) {
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) ||
(conf.ShutdownGracePeriodRequested == 0 && conf.ShutdownGracePeriodCriticalPods == 0) {
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdown) {
m := managerStub{}
return m, m
}

shutdownGracePeriodByPodPriority := conf.ShutdownGracePeriodByPodPriority
// Migration from the original configuration
if !utilfeature.DefaultFeatureGate.Enabled(features.GracefulNodeShutdownBasedOnPodPriority) ||
len(shutdownGracePeriodByPodPriority) == 0 {
shutdownGracePeriodByPodPriority = migrateConfig(conf.ShutdownGracePeriodRequested, conf.ShutdownGracePeriodCriticalPods)
}

// Disable if the configuration is empty
if len(shutdownGracePeriodByPodPriority) == 0 {
m := managerStub{}
return m, m
}

// Sort by priority from low to high
sort.Slice(shutdownGracePeriodByPodPriority, func(i, j int) bool {
return shutdownGracePeriodByPodPriority[i].Priority < shutdownGracePeriodByPodPriority[j].Priority
})

if conf.Clock == nil {
conf.Clock = clock.RealClock{}
}
manager := &managerImpl{
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
nodeRef: conf.NodeRef,
getPods: conf.GetPodsFunc,
killPodFunc: conf.KillPodFunc,
syncNodeStatus: conf.SyncNodeStatusFunc,
shutdownGracePeriodRequested: conf.ShutdownGracePeriodRequested,
shutdownGracePeriodCriticalPods: conf.ShutdownGracePeriodCriticalPods,
clock: conf.Clock,
probeManager: conf.ProbeManager,
recorder: conf.Recorder,
nodeRef: conf.NodeRef,
getPods: conf.GetPodsFunc,
killPodFunc: conf.KillPodFunc,
syncNodeStatus: conf.SyncNodeStatusFunc,
shutdownGracePeriodByPodPriority: shutdownGracePeriodByPodPriority,
clock: conf.Clock,
}
klog.InfoS("Creating node shutdown manager",
"shutdownGracePeriodRequested", conf.ShutdownGracePeriodRequested,
"shutdownGracePeriodCriticalPods", conf.ShutdownGracePeriodCriticalPods,
"shutdownGracePeriodByPodPriority", shutdownGracePeriodByPodPriority,
)
return manager, manager
}
Expand Down Expand Up @@ -159,9 +178,9 @@ func (m *managerImpl) start() (chan struct{}, error) {
return nil, err
}

// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than shutdownGracePeriodRequested, attempt to update the value to shutdownGracePeriodRequested.
if m.shutdownGracePeriodRequested > currentInhibitDelay {
err := m.dbusCon.OverrideInhibitDelay(m.shutdownGracePeriodRequested)
// If the logind's InhibitDelayMaxUSec as configured in (logind.conf) is less than periodRequested, attempt to update the value to periodRequested.
if periodRequested := m.periodRequested(); periodRequested > currentInhibitDelay {
err := m.dbusCon.OverrideInhibitDelay(periodRequested)
if err != nil {
return nil, fmt.Errorf("unable to override inhibit delay by shutdown manager: %v", err)
}
Expand All @@ -177,8 +196,8 @@ func (m *managerImpl) start() (chan struct{}, error) {
return nil, err
}

if m.shutdownGracePeriodRequested > updatedInhibitDelay {
return nil, fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", m.shutdownGracePeriodRequested, updatedInhibitDelay)
if periodRequested > updatedInhibitDelay {
return nil, fmt.Errorf("node shutdown manager was unable to update logind InhibitDelayMaxSec to %v (ShutdownGracePeriod), current value of InhibitDelayMaxSec (%v) is less than requested ShutdownGracePeriod", periodRequested, updatedInhibitDelay)
}
}

Expand Down Expand Up @@ -270,58 +289,133 @@ func (m *managerImpl) processShutdownEvent() error {
klog.V(1).InfoS("Shutdown manager processing shutdown event")
activePods := m.getPods()

nonCriticalPodGracePeriod := m.shutdownGracePeriodRequested - m.shutdownGracePeriodCriticalPods
groups := groupByPriority(m.shutdownGracePeriodByPodPriority, activePods)
for _, group := range groups {
// If there are no pods in a particular range,
// then do not wait for pods in that priority range.
if len(group.Pods) == 0 {
continue
}

var wg sync.WaitGroup
wg.Add(len(activePods))
for _, pod := range activePods {
go func(pod *v1.Pod) {
defer wg.Done()
var wg sync.WaitGroup
wg.Add(len(group.Pods))
for _, pod := range group.Pods {
go func(pod *v1.Pod, group podShutdownGroup) {
defer wg.Done()

var gracePeriodOverride int64
if kubelettypes.IsCriticalPod(pod) {
gracePeriodOverride = int64(m.shutdownGracePeriodCriticalPods.Seconds())
m.clock.Sleep(nonCriticalPodGracePeriod)
} else {
gracePeriodOverride = int64(nonCriticalPodGracePeriod.Seconds())
}
gracePeriodOverride := group.ShutdownGracePeriodSeconds

// Stop probes for the pod
m.probeManager.RemovePod(pod)
// Stop probes for the pod
m.probeManager.RemovePod(pod)

// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}
// If the pod's spec specifies a termination gracePeriod which is less than the gracePeriodOverride calculated, use the pod spec termination gracePeriod.
if pod.Spec.TerminationGracePeriodSeconds != nil && *pod.Spec.TerminationGracePeriodSeconds <= gracePeriodOverride {
gracePeriodOverride = *pod.Spec.TerminationGracePeriodSeconds
}

klog.V(1).InfoS("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)
if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason
}); err != nil {
klog.V(1).InfoS("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else {
klog.V(1).InfoS("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
}
}(pod)
}
klog.V(1).InfoS("Shutdown manager killing pod with gracePeriod", "pod", klog.KObj(pod), "gracePeriod", gracePeriodOverride)

c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()
if err := m.killPodFunc(pod, false, &gracePeriodOverride, func(status *v1.PodStatus) {
status.Message = nodeShutdownMessage
status.Reason = nodeShutdownReason
}); err != nil {
klog.V(1).InfoS("Shutdown manager failed killing pod", "pod", klog.KObj(pod), "err", err)
} else {
klog.V(1).InfoS("Shutdown manager finished killing pod", "pod", klog.KObj(pod))
}
}(pod, group)
}

// We want to ensure that inhibitLock is released, so only wait up to the shutdownGracePeriodRequested timeout.
select {
case <-c:
break
case <-time.After(m.shutdownGracePeriodRequested):
klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", m.shutdownGracePeriodRequested)
c := make(chan struct{})
go func() {
defer close(c)
wg.Wait()
}()

select {
case <-c:
case <-time.After(time.Duration(group.ShutdownGracePeriodSeconds) * time.Second):
klog.V(1).InfoS("Shutdown manager pod killing time out", "gracePeriod", group.ShutdownGracePeriodSeconds, "priority", group.Priority)
}
}

m.dbusCon.ReleaseInhibitLock(m.inhibitLock)
klog.V(1).InfoS("Shutdown manager completed processing shutdown event, node will shutdown shortly")

return nil
}

func (m *managerImpl) periodRequested() time.Duration {
var sum int64
for _, period := range m.shutdownGracePeriodByPodPriority {
sum += period.ShutdownGracePeriodSeconds
}
return time.Duration(sum) * time.Second
}

func migrateConfig(shutdownGracePeriodRequested, shutdownGracePeriodCriticalPods time.Duration) []kubeletconfig.ShutdownGracePeriodByPodPriority {
if shutdownGracePeriodRequested == 0 {
return nil
}
defaultPriority := shutdownGracePeriodRequested - shutdownGracePeriodCriticalPods
if defaultPriority < 0 {
return nil
}
criticalPriority := shutdownGracePeriodRequested - defaultPriority
if criticalPriority < 0 {
return nil
}
return []kubeletconfig.ShutdownGracePeriodByPodPriority{
{
Priority: scheduling.DefaultPriorityWhenNoDefaultClassExists,
ShutdownGracePeriodSeconds: int64(defaultPriority / time.Second),
},
{
Priority: scheduling.SystemCriticalPriority,
ShutdownGracePeriodSeconds: int64(criticalPriority / time.Second),
},
}
}

func groupByPriority(shutdownGracePeriodByPodPriority []kubeletconfig.ShutdownGracePeriodByPodPriority, pods []*v1.Pod) []podShutdownGroup {
groups := make([]podShutdownGroup, 0, len(shutdownGracePeriodByPodPriority))
for _, period := range shutdownGracePeriodByPodPriority {
groups = append(groups, podShutdownGroup{
ShutdownGracePeriodByPodPriority: period,
})
}

for _, pod := range pods {
var priority int32
if pod.Spec.Priority != nil {
priority = *pod.Spec.Priority
}

// Find the group index according to the priority.
index := sort.Search(len(groups), func(i int) bool {
return groups[i].Priority >= priority
})

// 1. Those higher than the highest priority default to the highest priority
// 2. Those lower than the lowest priority default to the lowest priority
// 3. Those boundary priority default to the lower priority
// if priority of pod is:
// groups[index-1].Priority <= pod priority < groups[index].Priority
// in which case we want to pick lower one (i.e index-1)
if index == len(groups) {
index = len(groups) - 1
} else if index < 0 {
index = 0
} else if index > 0 && groups[index].Priority > priority {
index--
}

groups[index].Pods = append(groups[index].Pods, pod)
}
return groups
}

type podShutdownGroup struct {
kubeletconfig.ShutdownGracePeriodByPodPriority
Pods []*v1.Pod
}
Loading

0 comments on commit 545313b

Please sign in to comment.