Skip to content

Commit

Permalink
Downward API defaults resource limits to node capacity/allocatable
Browse files Browse the repository at this point in the history
  • Loading branch information
derekwaynecarr committed Jun 17, 2016
1 parent 6209b1b commit 18a206a
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 2 deletions.
119 changes: 119 additions & 0 deletions pkg/fieldpath/fieldpath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"strings"
"testing"

"github.com/stretchr/testify/assert"

"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)

func TestExtractFieldPathAsString(t *testing.T) {
Expand Down Expand Up @@ -115,3 +118,119 @@ func TestExtractFieldPathAsString(t *testing.T) {
}
}
}

func getPod(cname, cpuRequest, cpuLimit, memoryRequest, memoryLimit string) *api.Pod {
resources := api.ResourceRequirements{
Limits: make(api.ResourceList),
Requests: make(api.ResourceList),
}
if cpuLimit != "" {
resources.Limits[api.ResourceCPU] = resource.MustParse(cpuLimit)
}
if memoryLimit != "" {
resources.Limits[api.ResourceMemory] = resource.MustParse(memoryLimit)
}
if cpuRequest != "" {
resources.Requests[api.ResourceCPU] = resource.MustParse(cpuRequest)
}
if memoryRequest != "" {
resources.Requests[api.ResourceMemory] = resource.MustParse(memoryRequest)
}
return &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: cname,
Resources: resources,
},
},
},
}
}

func TestExtractResourceValue(t *testing.T) {
cases := []struct {
fs *api.ResourceFieldSelector
pod *api.Pod
cName string
expectedValue string
expectedError error
}{
{
fs: &api.ResourceFieldSelector{
Resource: "limits.cpu",
},
cName: "foo",
pod: getPod("foo", "", "9", "", ""),
expectedValue: "9",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
},
cName: "foo",
pod: getPod("foo", "", "", "", ""),
expectedValue: "0",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
},
cName: "foo",
pod: getPod("foo", "8", "", "", ""),
expectedValue: "8",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
},
cName: "foo",
pod: getPod("foo", "100m", "", "", ""),
expectedValue: "1",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
Divisor: resource.MustParse("100m"),
},
cName: "foo",
pod: getPod("foo", "1200m", "", "", ""),
expectedValue: "12",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.memory",
},
cName: "foo",
pod: getPod("foo", "", "", "100Mi", ""),
expectedValue: "104857600",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.memory",
Divisor: resource.MustParse("1Mi"),
},
cName: "foo",
pod: getPod("foo", "", "", "100Mi", "1Gi"),
expectedValue: "100",
},
{
fs: &api.ResourceFieldSelector{
Resource: "limits.memory",
},
cName: "foo",
pod: getPod("foo", "", "", "10Mi", "100Mi"),
expectedValue: "104857600",
},
}
as := assert.New(t)
for idx, tc := range cases {
actual, err := ExtractResourceValueByContainerName(tc.fs, tc.pod, tc.cName)
if tc.expectedError != nil {
as.Equal(tc.expectedError, err, "expected test case [%d] to fail with error %v; got %v", idx, tc.expectedError, err)
} else {
as.Nil(err, "expected test case [%d] to not return an error; got %v", idx, err)
as.Equal(tc.expectedValue, actual, "expected test case [%d] to return %q; got %q instead", idx, tc.expectedValue, actual)
}
}
}
14 changes: 12 additions & 2 deletions pkg/kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,7 @@ func (kl *Kubelet) initialNodeStatus() (*api.Node, error) {
if err := kl.setNodeStatus(node); err != nil {
return nil, err
}

return node, nil
}

Expand Down Expand Up @@ -1152,6 +1153,7 @@ func (kl *Kubelet) registerWithApiserver() {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue
}

glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) {
Expand Down Expand Up @@ -1560,7 +1562,11 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Contain
return result, err
}
case envVar.ValueFrom.ResourceFieldRef != nil:
runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, pod, container)
defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardApi(pod, container)
if err != nil {
return result, err
}
runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, defaultedContainer)
if err != nil {
return result, err
}
Expand Down Expand Up @@ -1900,7 +1906,11 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
}

// Wait for volumes to attach/mount
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
defaultedPod, _, err := kl.defaultPodLimitsForDownwardApi(pod, nil)
if err != nil {
return err
}
if err := kl.volumeManager.WaitForAttachAndMount(defaultedPod); err != nil {
ref, errGetRef := api.GetReference(pod)
if errGetRef == nil && ref != nil {
kl.recorder.Eventf(ref, api.EventTypeWarning, kubecontainer.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
Expand Down
84 changes: 84 additions & 0 deletions pkg/kubelet/kubelet_resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"fmt"

"k8s.io/kubernetes/pkg/api"
)

// defaultPodLimitsForDownwardApi copies the input pod, and optional container,
// and applies default resource limits. it returns a copy of the input pod,
// and a copy of the input container (if specified) with default limits
// applied. if a container has no limit specified, it will default the limit to
// the node capacity.
// TODO: if/when we have pod level resources, we need to update this function
// to use those limits instead of node capacity.
func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.Container) (*api.Pod, *api.Container, error) {
if pod == nil {
return nil, nil, fmt.Errorf("invalid input, pod cannot be nil")
}

node, err := kl.getNodeAnyWay()
if err != nil {
return nil, nil, fmt.Errorf("failed to find node object, expected a node")
}
capacity := node.Status.Capacity

podCopy, err := api.Scheme.Copy(pod)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of pod object: %v", err)
}
outputPod, ok := podCopy.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("unexpected type returned from deep copy of pod object")
}
for idx := range outputPod.Spec.Containers {
mergeContainerResourceLimitsWithCapacity(&outputPod.Spec.Containers[idx], capacity)
}

var outputContainer *api.Container
if container != nil {
containerCopy, err := api.Scheme.DeepCopy(container)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of container object: %v", err)
}
outputContainer, ok = containerCopy.(*api.Container)
if !ok {
return nil, nil, fmt.Errorf("unexpected type returned from deep copy of container object")
}
mergeContainerResourceLimitsWithCapacity(outputContainer, capacity)
}
return outputPod, outputContainer, nil
}

// mergeContainerResourceLimitsWithCapacity checks if a limit is applied for
// the container, and if not, it sets the limit based on the capacity.
func mergeContainerResourceLimitsWithCapacity(container *api.Container,
capacity api.ResourceList) {
if container.Resources.Limits == nil {
container.Resources.Limits = make(api.ResourceList)
}
for _, resource := range []api.ResourceName{api.ResourceCPU, api.ResourceMemory} {
if quantity, exists := container.Resources.Limits[resource]; !exists || quantity.IsZero() {
if cap, exists := capacity[resource]; exists {
container.Resources.Limits[resource] = *cap.Copy()
}
}
}
}
93 changes: 93 additions & 0 deletions pkg/kubelet/kubelet_resources_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubelet

import (
"testing"

"github.com/stretchr/testify/assert"

cadvisorapi "github.com/google/cadvisor/info/v1"
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)

func TestPodResourceLimitsDefaulting(t *testing.T) {
cpuCores := resource.MustParse("10")
memoryCapacity := resource.MustParse("10Gi")
tk := newTestKubelet(t, true)
tk.fakeCadvisor.On("VersionInfo").Return(&cadvisorapi.VersionInfo{}, nil)
tk.fakeCadvisor.On("MachineInfo").Return(&cadvisorapi.MachineInfo{
NumCores: int(cpuCores.Value()),
MemoryCapacity: uint64(memoryCapacity.Value()),
}, nil)
tk.fakeCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
tk.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
cases := []struct {
pod *api.Pod
expected *api.Pod
}{
{
pod: getPod("0", "0"),
expected: getPod("10", "10Gi"),
},
{
pod: getPod("1", "0"),
expected: getPod("1", "10Gi"),
},
{
pod: getPod("", ""),
expected: getPod("10", "10Gi"),
},
{
pod: getPod("0", "1Mi"),
expected: getPod("10", "1Mi"),
},
}
as := assert.New(t)
for idx, tc := range cases {
actual, _, err := tk.kubelet.defaultPodLimitsForDownwardApi(tc.pod, nil)
as.Nil(err, "failed to default pod limits: %v", err)
if !api.Semantic.DeepEqual(tc.expected, actual) {
as.Fail("test case [%d] failed. Expected: %+v, Got: %+v", idx, tc.expected, actual)
}
}
}

func getPod(cpuLimit, memoryLimit string) *api.Pod {
resources := api.ResourceRequirements{}
if cpuLimit != "" || memoryLimit != "" {
resources.Limits = make(api.ResourceList)
}
if cpuLimit != "" {
resources.Limits[api.ResourceCPU] = resource.MustParse(cpuLimit)
}
if memoryLimit != "" {
resources.Limits[api.ResourceMemory] = resource.MustParse(memoryLimit)
}
return &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
Resources: resources,
},
},
},
}
}
Loading

0 comments on commit 18a206a

Please sign in to comment.