Skip to content

Commit

Permalink
Merge pull request kubernetes#1549 from MaciekPytel/lister_scale_up
Browse files Browse the repository at this point in the history
Use lister in GetNodeInfosForGroups
  • Loading branch information
k8s-ci-robot authored Jan 2, 2019
2 parents f960f95 + 3f0da89 commit 2ff3b86
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 119 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func ScaleUp(context *context.AutoscalingContext, processors *ca_processors.Auto
podsRemainUnschedulable[pod] = make(map[string]status.Reasons)
}
glogx.V(1).Over(loggingQuota).Infof("%v other pods are also unschedulable", -loggingQuota.Left())
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet,
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ListerRegistry,
daemonSets, context.PredicateChecker)
if err != nil {
return &status.ScaleUpStatus{Result: status.ScaleUpError}, err.AddPrefix("failed to build node infos for node groups: ")
Expand Down
106 changes: 23 additions & 83 deletions cluster-autoscaler/core/scale_up_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
ca_processors "k8s.io/autoscaler/cluster-autoscaler/processors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
kube_record "k8s.io/client-go/tools/record"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -363,7 +362,6 @@ func expanderOptionToGroupSizeChange(option expander.Option) groupSizeChange {

func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
expandedGroups := make(chan groupSizeChange, 10)
fakeClient := &fake.Clientset{}

groups := make(map[string][]*apiv1.Node)
nodes := make([]*apiv1.Node, len(config.nodes))
Expand All @@ -379,22 +377,13 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
}
}

pods := make(map[string][]apiv1.Pod)
pods := make([]*apiv1.Pod, 0)
for _, p := range config.pods {
pod := buildTestPod(p)
pods[p.node] = append(pods[p.node], *pod)
pods = append(pods, pod)
}

fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
for _, node := range nodes {
if strings.Contains(fieldstring, node.Name) {
return true, &apiv1.PodList{Items: pods[node.Name]}, nil
}
}
return true, nil, fmt.Errorf("Failed to list: %v", list)
})
podLister := kube_util.NewTestPodLister(pods)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
expandedGroups <- groupSizeChange{groupName: nodeGroup, sizeChange: increase}
Expand All @@ -416,7 +405,7 @@ func simpleScaleUpTest(t *testing.T, config *scaleTestConfig) {
assert.NotNil(t, provider)

// Create context with non-random expander strategy.
context := NewScaleTestAutoscalingContext(config.options, fakeClient, nil, provider)
context := NewScaleTestAutoscalingContext(config.options, &fake.Clientset{}, listers, provider)
expander := assertingStrategy{
initialNodeConfigs: config.nodes,
expectedScaleUpOptions: config.expectedScaleUpOptions,
Expand Down Expand Up @@ -491,18 +480,8 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"

fakeClient := &fake.Clientset{}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
if strings.Contains(fieldstring, "n1") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1}}, nil
}
if strings.Contains(fieldstring, "n2") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p2}}, nil
}
return true, nil, fmt.Errorf("Failed to list: %v", list)
})
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
t.Fatalf("No expansion is expected, but increased %s by %d", nodeGroup, increase)
Expand All @@ -518,7 +497,7 @@ func TestScaleUpNodeComingNoScale(t *testing.T) {
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider)
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

clusterState := clusterstate.NewClusterStateRegistry(
provider,
Expand Down Expand Up @@ -549,18 +528,8 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"

fakeClient := &fake.Clientset{}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
if strings.Contains(fieldstring, "n1") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1}}, nil
}
if strings.Contains(fieldstring, "n2") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p2}}, nil
}
return true, nil, fmt.Errorf("Failed to list: %v", list)
})
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

expandedGroups := make(chan string, 10)
provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
Expand All @@ -572,7 +541,7 @@ func TestScaleUpNodeComingHasScale(t *testing.T) {
provider.AddNode("ng1", n1)
provider.AddNode("ng2", n2)

context := NewScaleTestAutoscalingContext(defaultOptions, fakeClient, nil, provider)
context := NewScaleTestAutoscalingContext(defaultOptions, &fake.Clientset{}, listers, provider)

clusterState := clusterstate.NewClusterStateRegistry(
provider,
Expand Down Expand Up @@ -607,18 +576,8 @@ func TestScaleUpUnhealthy(t *testing.T) {
p1.Spec.NodeName = "n1"
p2.Spec.NodeName = "n2"

fakeClient := &fake.Clientset{}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
if strings.Contains(fieldstring, "n1") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1}}, nil
}
if strings.Contains(fieldstring, "n2") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p2}}, nil
}
return true, nil, fmt.Errorf("Failed to list: %v", list)
})
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
t.Fatalf("No expansion is expected, but increased %s by %d", nodeGroup, increase)
Expand All @@ -634,7 +593,7 @@ func TestScaleUpUnhealthy(t *testing.T) {
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider)
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes([]*apiv1.Node{n1, n2}, time.Now())
Expand All @@ -649,21 +608,14 @@ func TestScaleUpUnhealthy(t *testing.T) {
}

func TestScaleUpNoHelp(t *testing.T) {
fakeClient := &fake.Clientset{}
n1 := BuildTestNode("n1", 100, 1000)
SetNodeReadyState(n1, true, time.Now())

p1 := BuildTestPod("p1", 80, 0)
p1.Spec.NodeName = "n1"

fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
if strings.Contains(fieldstring, "n1") {
return true, &apiv1.PodList{Items: []apiv1.Pod{*p1}}, nil
}
return true, nil, fmt.Errorf("Failed to list: %v", list)
})
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1})
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error {
t.Fatalf("No expansion is expected")
Expand All @@ -678,7 +630,7 @@ func TestScaleUpNoHelp(t *testing.T) {
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider)
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes([]*apiv1.Node{n1}, time.Now())
Expand All @@ -700,7 +652,6 @@ func TestScaleUpNoHelp(t *testing.T) {
}

func TestScaleUpBalanceGroups(t *testing.T) {
fakeClient := &fake.Clientset{}
provider := testprovider.NewTestCloudProvider(func(string, int) error {
return nil
}, nil)
Expand All @@ -714,7 +665,7 @@ func TestScaleUpBalanceGroups(t *testing.T) {
"ng3": {min: 1, max: 5, size: 1},
"ng4": {min: 1, max: 5, size: 3},
}
podMap := make(map[string]*apiv1.Pod, len(testCfg))
podList := make([]*apiv1.Pod, 0, len(testCfg))
nodes := make([]*apiv1.Node, 0)

for gid, gconf := range testCfg {
Expand All @@ -727,33 +678,22 @@ func TestScaleUpBalanceGroups(t *testing.T) {

pod := BuildTestPod(fmt.Sprintf("%v-pod-%v", gid, i), 80, 0)
pod.Spec.NodeName = nodeName
podMap[gid] = pod
podList = append(podList, pod)

provider.AddNode(gid, node)
}
}

fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
list := action.(core.ListAction)
fieldstring := list.GetListRestrictions().Fields.String()
matcher, err := regexp.Compile("ng[0-9]")
if err != nil {
return false, &apiv1.PodList{Items: []apiv1.Pod{}}, err
}
matches := matcher.FindStringSubmatch(fieldstring)
if len(matches) != 1 {
return false, &apiv1.PodList{Items: []apiv1.Pod{}}, fmt.Errorf("parse error")
}
return true, &apiv1.PodList{Items: []apiv1.Pod{*(podMap[matches[0]])}}, nil
})
podLister := kube_util.NewTestPodLister(podList)
listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

options := config.AutoscalingOptions{
EstimatorName: estimator.BinpackingEstimatorName,
BalanceSimilarNodeGroups: true,
MaxCoresTotal: config.DefaultMaxClusterCores,
MaxMemoryTotal: config.DefaultMaxClusterMemory,
}
context := NewScaleTestAutoscalingContext(options, fakeClient, nil, provider)
context := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider)

clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, newBackoff())
clusterState.UpdateNodes(nodes, time.Now())
Expand Down
6 changes: 3 additions & 3 deletions cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
// Scale up.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
allNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(2) // 1 to get pods + 1 per nodegroup when building nodeInfo map
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p2}, nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once()
Expand Down Expand Up @@ -524,7 +524,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) {
// Scale up.
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Times(2) // due to initialized=false
allNodeListerMock.On("List").Return([]*apiv1.Node{n1}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(2) // 1 to get pods + 1 per nodegroup when building nodeInfo map
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p2}, nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once()
Expand Down Expand Up @@ -654,7 +654,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) {
// Scale up
readyNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Times(2) // due to initialized=false
allNodeListerMock.On("List").Return([]*apiv1.Node{n1, n2, n3}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Once()
scheduledPodMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Times(3) // 1 to get pods + 1 per nodegroup when building nodeInfo map
unschedulablePodMock.On("List").Return([]*apiv1.Pod{p4, p5, p6}, nil).Once()
daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once()
onScaleUpMock.On("ScaleUp", "ng2", 1).Return(nil).Once()
Expand Down
5 changes: 2 additions & 3 deletions cluster-autoscaler/core/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
kube_client "k8s.io/client-go/kubernetes"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"

Expand Down Expand Up @@ -218,7 +217,7 @@ func CheckPodsSchedulableOnNode(context *context.AutoscalingContext, pods []*api
// TODO(mwielgus): This returns map keyed by url, while most code (including scheduler) uses node.Name for a key.
//
// TODO(mwielgus): Review error policy - sometimes we may continue with partial errors.
func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface,
func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, listers kube_util.ListerRegistry,
daemonsets []*appsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) (map[string]*schedulercache.NodeInfo, errors.AutoscalerError) {
result := make(map[string]*schedulercache.NodeInfo)

Expand All @@ -234,7 +233,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou
id := nodeGroup.Id()
if _, found := result[id]; !found {
// Build nodeInfo.
nodeInfo, err := simulator.BuildNodeInfoForNode(node, kubeClient)
nodeInfo, err := simulator.BuildNodeInfoForNode(node, listers)
if err != nil {
return false, err
}
Expand Down
15 changes: 6 additions & 9 deletions cluster-autoscaler/core/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,14 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
scheduler_util "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

appsv1 "k8s.io/api/apps/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
kube_record "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/testapi"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
Expand Down Expand Up @@ -337,14 +336,12 @@ func TestGetNodeInfosForGroups(t *testing.T) {
nil, nil)
provider2.AddNodeGroup("n5", 1, 10, 1) // Nodegroup without nodes.

fakeClient := &fake.Clientset{}
fakeClient.Fake.AddReactor("list", "pods", func(action core.Action) (bool, runtime.Object, error) {
return true, &apiv1.PodList{Items: []apiv1.Pod{}}, nil
})
podLister := kube_util.NewTestPodLister([]*apiv1.Pod{})
registry := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil, nil)

predicateChecker := simulator.NewTestPredicateChecker()

res, err := GetNodeInfosForGroups([]*apiv1.Node{n1, n2, n3, n4}, provider1, fakeClient,
res, err := GetNodeInfosForGroups([]*apiv1.Node{n1, n2, n3, n4}, provider1, registry,
[]*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
assert.Equal(t, 4, len(res))
Expand All @@ -357,8 +354,8 @@ func TestGetNodeInfosForGroups(t *testing.T) {
_, found = res["n4"]
assert.True(t, found)

// Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud provider
res, err = GetNodeInfosForGroups([]*apiv1.Node{}, provider2, fakeClient,
// Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder
res, err = GetNodeInfosForGroups([]*apiv1.Node{}, provider2, registry,
[]*appsv1.DaemonSet{}, predicateChecker)
assert.NoError(t, err)
assert.Equal(t, 0, len(res))
Expand Down
21 changes: 9 additions & 12 deletions cluster-autoscaler/simulator/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,25 @@ import (

apiv1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/autoscaler/cluster-autoscaler/utils/drain"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_client "k8s.io/client-go/kubernetes"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
)

// GetRequiredPodsForNode returns a list of pods that would appear on the node if the
// node was just created (like daemonset and manifest-run pods). It reuses kubectl
// drain command to get the list.
func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*apiv1.Pod, errors.AutoscalerError) {

// TODO: we should change this to use informer
podListResult, err := client.CoreV1().Pods(apiv1.NamespaceAll).List(
metav1.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename}).String()})
func GetRequiredPodsForNode(nodename string, listers kube_util.ListerRegistry) ([]*apiv1.Pod, errors.AutoscalerError) {
pods, err := listers.ScheduledPodLister().List()
if err != nil {
return []*apiv1.Pod{}, errors.ToAutoscalerError(errors.ApiCallError, err)
}
allPods := make([]*apiv1.Pod, 0)
for i := range podListResult.Items {
allPods = append(allPods, &podListResult.Items[i])
for _, p := range pods {
if p.Spec.NodeName == nodename {
allPods = append(allPods, p)
}
}

podsToRemoveList, err := drain.GetPodsForDeletionOnNodeDrain(
Expand Down Expand Up @@ -78,8 +75,8 @@ func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*a
}

// BuildNodeInfoForNode build a NodeInfo structure for the given node as if the node was just created.
func BuildNodeInfoForNode(node *apiv1.Node, client kube_client.Interface) (*schedulercache.NodeInfo, errors.AutoscalerError) {
requiredPods, err := GetRequiredPodsForNode(node.Name, client)
func BuildNodeInfoForNode(node *apiv1.Node, listers kube_util.ListerRegistry) (*schedulercache.NodeInfo, errors.AutoscalerError) {
requiredPods, err := GetRequiredPodsForNode(node.Name, listers)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 2ff3b86

Please sign in to comment.