Skip to content

Commit

Permalink
Merge pull request kubernetes#80467 from jfbai/automated-cherry-pick-…
Browse files Browse the repository at this point in the history
…of-#77699-upstream-release-1.14

Automated cherry pick of kubernetes#77699: Reset extended resources only when node is recreated.
  • Loading branch information
k8s-ci-robot authored Nov 9, 2019
2 parents a9c2cd3 + ae2bde5 commit 500f5ab
Show file tree
Hide file tree
Showing 11 changed files with 149 additions and 19 deletions.
4 changes: 4 additions & 0 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ type ContainerManager interface {

// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool
}

type NodeConfig struct {
Expand Down
6 changes: 5 additions & 1 deletion pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"github.com/opencontainers/runc/libcontainer/configs"
"k8s.io/klog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -897,3 +897,7 @@ func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceLi
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return cm.deviceManager.GetDevices(podUID, containerName)
}

func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}
16 changes: 13 additions & 3 deletions pkg/kubelet/cm/container_manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package cm

import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"

"k8s.io/apimachinery/pkg/api/resource"
Expand All @@ -32,7 +32,9 @@ import (
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)

type containerManagerStub struct{}
type containerManagerStub struct {
shouldResetExtendedResourceCapacity bool
}

var _ ContainerManager = &containerManagerStub{}

Expand Down Expand Up @@ -110,6 +112,14 @@ func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.Conta
return nil
}

func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}

func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}

func NewStubContainerManagerWithExtendedResource(shouldResetExtendedResourceCapacity bool) ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: shouldResetExtendedResourceCapacity}
}
6 changes: 5 additions & 1 deletion pkg/kubelet/cm/container_manager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package cm
import (
"fmt"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -171,3 +171,7 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string {
func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}

func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false
}
2 changes: 2 additions & 0 deletions pkg/kubelet/cm/devicemanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
Expand All @@ -29,6 +30,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
Expand Down
18 changes: 17 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ import (
"google.golang.org/grpc"
"k8s.io/klog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
Expand Down Expand Up @@ -832,3 +834,17 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesap
defer m.mutex.Unlock()
return m.podDevices.getContainerDevices(podUID, containerName)
}

// ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
// depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
// has been recreated.
func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
checkpoints, err := m.checkpointManager.ListCheckpoints()
if err != nil {
return false
}
return len(checkpoints) == 0
}
return false
}
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package devicemanager

import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
Expand Down Expand Up @@ -67,3 +67,8 @@ func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}

// ShouldResetExtendedResourceCapacity returns false
func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
return false
}
41 changes: 40 additions & 1 deletion pkg/kubelet/cm/devicemanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -946,6 +946,45 @@ func TestDevicePreStartContainer(t *testing.T) {
as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
}

func TestResetExtendedResource(t *testing.T) {
as := assert.New(t)
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
checkpointManager: ckm,
}

extendedResourceName := "domain.com/resource"
testManager.podDevices.insert("pod", "con", extendedResourceName,
constructDevices([]string{"dev1"}),
constructAllocResp(map[string]string{"/dev/dev1": "/dev/dev1"},
map[string]string{"/home/lib1": "/usr/lib1"}, map[string]string{}))

testManager.healthyDevices[extendedResourceName] = sets.NewString()
testManager.healthyDevices[extendedResourceName].Insert("dev1")
// checkpoint is present, indicating node hasn't been recreated
err = testManager.writeCheckpoint()
as.Nil(err)

as.False(testManager.ShouldResetExtendedResourceCapacity())

// checkpoint is absent, representing node recreation
ckpts, err := ckm.ListCheckpoints()
as.Nil(err)
for _, ckpt := range ckpts {
err = ckm.RemoveCheckpoint(ckpt)
as.Nil(err)
}
as.True(testManager.ShouldResetExtendedResourceCapacity())
}

func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
return func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.ContainerAllocateResponse)
Expand Down
7 changes: 6 additions & 1 deletion pkg/kubelet/cm/devicemanager/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package devicemanager
import (
"time"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
Expand Down Expand Up @@ -58,6 +58,11 @@ type Manager interface {

// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

// ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not,
// depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates
// the node has been recreated.
ShouldResetExtendedResourceCapacity() bool
}

// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
Expand Down
17 changes: 10 additions & 7 deletions pkg/kubelet/kubelet_node_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (

"k8s.io/klog"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -132,12 +132,15 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
// Zeros out extended resource capacity during reconciliation.
func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
requiresUpdate := false
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
klog.Infof("Zero out resource %s capacity in existing node.", k)
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true
// Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available
if kl.containerManager.ShouldResetExtendedResourceCapacity() {
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
klog.Infof("Zero out resource %s capacity in existing node.", k)
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true
}
}
}
return requiresUpdate
Expand Down
44 changes: 41 additions & 3 deletions pkg/kubelet/kubelet_node_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/stretchr/testify/require"

cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
Expand Down Expand Up @@ -1740,17 +1740,21 @@ func TestUpdateDefaultLabels(t *testing.T) {
func TestReconcileExtendedResource(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
testKubelet.kubelet.containerManager = cm.NewStubContainerManagerWithExtendedResource(true /* shouldResetExtendedResourceCapacity*/)
testKubeletNoReset := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
extendedResourceName1 := v1.ResourceName("test.com/resource1")
extendedResourceName2 := v1.ResourceName("test.com/resource2")

cases := []struct {
name string
testKubelet *TestKubelet
existingNode *v1.Node
expectedNode *v1.Node
needsUpdate bool
}{
{
name: "no update needed without extended resource",
name: "no update needed without extended resource",
testKubelet: testKubelet,
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
Expand Down Expand Up @@ -1782,7 +1786,41 @@ func TestReconcileExtendedResource(t *testing.T) {
needsUpdate: false,
},
{
name: "extended resource capacity is zeroed",
name: "extended resource capacity is not zeroed due to presence of checkpoint file",
testKubelet: testKubelet,
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
expectedNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
needsUpdate: false,
},
{
name: "extended resource capacity is zeroed",
testKubelet: testKubeletNoReset,
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
Expand Down

0 comments on commit 500f5ab

Please sign in to comment.