Skip to content

Commit

Permalink
[Federation] Make the hpa scale time window configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
irfanurrehman committed Aug 5, 2017
1 parent 0bea0ca commit 2be69a5
Show file tree
Hide file tree
Showing 17 changed files with 72 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,11 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
go serviceController.Run(s.ConcurrentServiceSyncs, wait.NeverStop)
}

adapterSpecificArgs := make(map[string]interface{})
adapterSpecificArgs[federatedtypes.HpaKind] = &s.HpaScaleForbiddenWindow
for kind, federatedType := range federatedtypes.FederatedTypes() {
if controllerEnabled(s.Controllers, serverResources, federatedType.ControllerName, federatedType.RequiredResources, true) {
synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency)
synccontroller.StartFederationSyncController(kind, federatedType.AdapterFactory, restClientCfg, stopChan, minimizeLatency, adapterSpecificArgs)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ type ControllerManagerConfiguration struct {
ContentType string `json:"contentType"`
// ConfigurationMap determining which controllers should be enabled or disabled
Controllers utilflag.ConfigurationMap `json:"controllers"`
// HpaScaleForbiddenWindow is the duration used by federation hpa controller to
// determine if it can move max and/or min replicas around (or not), of a cluster local
// hpa object, by comparing current time with the last scaled time of that cluster local hpa.
// Lower value will result in faster response to scalibility conditions achieved
// by cluster local hpas on local replicas, but too low a value can result in thrashing.
// Higher values will result in slower response to scalibility conditions on local replicas.
HpaScaleForbiddenWindow metav1.Duration `json:"HpaScaleForbiddenWindow"`
}

// CMServer is the main context object for the controller manager.
Expand Down Expand Up @@ -100,6 +107,7 @@ func NewCMServer() *CMServer {
APIServerBurst: 30,
LeaderElection: leaderelectionconfig.DefaultLeaderElectionConfiguration(),
Controllers: make(utilflag.ConfigurationMap),
HpaScaleForbiddenWindow: metav1.Duration{Duration: 2 * time.Minute},
},
}
return &s
Expand All @@ -125,6 +133,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.APIServerBurst, "federated-api-burst", s.APIServerBurst, "Burst to use while talking with federation apiserver")
fs.StringVar(&s.DnsProvider, "dns-provider", s.DnsProvider, "DNS provider. Valid values are: "+fmt.Sprintf("%q", dnsprovider.RegisteredDnsProviders()))
fs.StringVar(&s.DnsConfigFile, "dns-provider-config", s.DnsConfigFile, "Path to config file for configuring DNS provider.")
fs.DurationVar(&s.HpaScaleForbiddenWindow.Duration, "hpa-scale-forbidden-window", s.HpaScaleForbiddenWindow.Duration, "The time window wrt cluster local hpa lastscale time, during which federated hpa would not move the hpa max/min replicas around")
fs.Var(&s.Controllers, "controllers", ""+
"A set of key=value pairs that describe controller configuration "+
"to enable/disable specific controllers. Key should be the resource name (like services) and value should be true or false. "+
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type FederatedTypeAdapter interface {
// that create instances of FederatedTypeAdapter. Such methods should
// be registered with RegisterAdapterFactory to ensure the type
// adapter is discoverable.
type AdapterFactory func(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter
type AdapterFactory func(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter

// SetAnnotation sets the given key and value in the given object's ObjectMeta.Annotations map
func SetAnnotation(adapter FederatedTypeAdapter, obj pkgruntime.Object, key, value string) {
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type ConfigMapAdapter struct {
client federationclientset.Interface
}

func NewConfigMapAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewConfigMapAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
return &ConfigMapAdapter{client: client}
}

Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type DaemonSetAdapter struct {
client federationclientset.Interface
}

func NewDaemonSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewDaemonSetAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
return &DaemonSetAdapter{client: client}
}

Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type DeploymentAdapter struct {
client federationclientset.Interface
}

func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewDeploymentAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
schedulingAdapter := replicaSchedulingAdapter{
preferencesAnnotationName: FedDeploymentPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {
Expand Down
69 changes: 40 additions & 29 deletions federation/pkg/federatedtypes/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,26 +36,37 @@ import (
const (
HpaKind = "horizontalpodautoscaler"
HpaControllerName = "horizontalpodautoscalers"
// This is a tunable which does not change replica nums
// on an existing local hpa, before this timeout, if it
// did scale already (avoids thrashing of replicas around).
scaleForbiddenWindow = 5 * time.Minute
// This is used as the default min for hpa object submitted
// to federation, in a situation where the default is for
// some reason not present (Spec.MinReplicas == nil)
hpaMinReplicaDefault = int32(1)
// This is a tunable which does not change replica nums
// on an existing local hpa, before this timeout, if it
// did scale already (avoids thrashing of replicas around).
ScaleForbiddenWindow = 2 * time.Minute
)

func init() {
RegisterFederatedType(HpaKind, HpaControllerName, []schema.GroupVersionResource{autoscalingv1.SchemeGroupVersion.WithResource(HpaControllerName)}, NewHpaAdapter)
}

type HpaAdapter struct {
client federationclientset.Interface
client federationclientset.Interface
scaleForbiddenWindow time.Duration
}

func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
return &HpaAdapter{client: client}
func NewHpaAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
var scaleForbiddenWindow time.Duration
if adapterSpecificArgs != nil && adapterSpecificArgs[HpaKind] != nil {
scaleForbiddenWindow = adapterSpecificArgs[HpaKind].(*metav1.Duration).Duration
} else {
scaleForbiddenWindow = ScaleForbiddenWindow
}

return &HpaAdapter{
client: client,
scaleForbiddenWindow: scaleForbiddenWindow,
}
}

func (a *HpaAdapter) Kind() string {
Expand Down Expand Up @@ -237,7 +248,7 @@ func (a *HpaAdapter) GetSchedule(obj pkgruntime.Object, key string, clusters []*
}

return &hpaSchedulingInfo{
scheduleState: getHpaScheduleState(obj, currentClusterObjs),
scheduleState: a.getHpaScheduleState(obj, currentClusterObjs),
fedStatus: fedStatus,
}, nil
}
Expand Down Expand Up @@ -288,7 +299,7 @@ func getCurrentClusterObjs(informer fedutil.FederatedInformer, key string, clust
//
// The above algorithm is run to first distribute max and then distribute min to those clusters
// which get max.
func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums {
func (a *HpaAdapter) getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgruntime.Object) map[string]*replicaNums {
fedHpa := fedObj.(*autoscalingv1.HorizontalPodAutoscaler)
requestedMin := hpaMinReplicaDefault
if fedHpa.Spec.MinReplicas != nil {
Expand Down Expand Up @@ -322,7 +333,7 @@ func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgrun
// beyond min/max limits.
// schedStatus currently have status of existing hpas.
// It will eventually have desired status for this reconcile.
clusterLists, currentReplicas, scheduleState := prepareForScheduling(currentObjs)
clusterLists, currentReplicas, scheduleState := a.prepareForScheduling(currentObjs)

remainingReplicas := replicaNums{
min: requestedReplicas.min - currentReplicas.min,
Expand Down Expand Up @@ -362,7 +373,7 @@ func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgrun
// We then go ahead to give the replicas to those which do not
// have any hpa. In this pass however we try to ensure that all
// our Max are consumed in this reconcile.
distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState)
a.distributeMaxReplicas(toDistribute.max, clusterLists, rdc, currentObjs, scheduleState)

// We distribute min to those clusters which:
// 1 - can adjust min (our increase step would be only 1)
Expand All @@ -371,7 +382,7 @@ func getHpaScheduleState(fedObj pkgruntime.Object, currentObjs map[string]pkgrun
// some clusters still needing them. We adjust this in finalise by
// assigning min replicas to 1 into those clusters which got max
// but min remains 0.
distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState)
a.distributeMinReplicas(toDistribute.min, clusterLists, rdc, currentObjs, scheduleState)

return finaliseScheduleState(scheduleState)
}
Expand Down Expand Up @@ -474,7 +485,7 @@ func updateStatus(fedHpa *autoscalingv1.HorizontalPodAutoscaler, newStatus hpaFe
// existing objs.
// currentObjs has the list of all clusters, with obj as nil
// for those clusters which do not have hpa yet.
func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) {
func (a *HpaAdapter) prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, replicaNums, map[string]*replicaNums) {
lists := hpaLists{
availableMax: sets.NewString(),
availableMin: sets.NewString(),
Expand All @@ -493,10 +504,10 @@ func prepareForScheduling(currentObjs map[string]pkgruntime.Object) (hpaLists, r
continue
}

if maxReplicasReducible(obj) {
if a.maxReplicasReducible(obj) {
lists.availableMax.Insert(cluster)
}
if minReplicasReducible(obj) {
if a.minReplicasReducible(obj) {
lists.availableMin.Insert(cluster)
}

Expand Down Expand Up @@ -609,7 +620,7 @@ func reduceMaxReplicas(excessMax int32, availableMaxList sets.String, scheduled
// rdc: replicadistributioncount for max and min.
// currentObjs: list of current cluster hpas.
// scheduled: schedule state which will be updated in place.
func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums,
func (a *HpaAdapter) distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNums,
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
for cluster, replicas := range scheduled {
if toDistributeMax == 0 {
Expand All @@ -618,7 +629,7 @@ func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNum
if replicas == nil {
continue
}
if maxReplicasNeeded(currentObjs[cluster]) {
if a.maxReplicasNeeded(currentObjs[cluster]) {
replicas.max++
if lists.availableMax.Len() > 0 {
popped, notEmpty := lists.availableMax.PopAny()
Expand Down Expand Up @@ -708,7 +719,7 @@ func distributeMaxReplicas(toDistributeMax int32, lists hpaLists, rdc replicaNum
// rdc: replicadistributioncount for max and min.
// currentObjs: list of current cluster hpas.
// scheduled: schedule state which will be updated in place.
func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums,
func (a *HpaAdapter) distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNums,
currentObjs map[string]pkgruntime.Object, scheduled map[string]*replicaNums) int32 {
for cluster, replicas := range scheduled {
if toDistributeMin == 0 {
Expand All @@ -719,7 +730,7 @@ func distributeMinReplicas(toDistributeMin int32, lists hpaLists, rdc replicaNum
if replicas == nil || currentObjs[cluster] == nil {
continue
}
if minReplicasIncreasable(currentObjs[cluster]) {
if a.minReplicasIncreasable(currentObjs[cluster]) {
if lists.availableMin.Len() > 0 {
popped, notEmpty := lists.availableMin.PopAny()
if notEmpty {
Expand Down Expand Up @@ -842,18 +853,18 @@ func isPristine(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {

// isScaleable tells if it already has been a reasonable amount of
// time since this hpa scaled. Its used to avoid fast thrashing.
func isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
func (a *HpaAdapter) isScaleable(hpa *autoscalingv1.HorizontalPodAutoscaler) bool {
if hpa.Status.LastScaleTime == nil {
return false
}
t := hpa.Status.LastScaleTime.Add(scaleForbiddenWindow)
t := hpa.Status.LastScaleTime.Add(a.scaleForbiddenWindow)
if t.After(time.Now()) {
return false
}
return true
}

func maxReplicasReducible(obj pkgruntime.Object) bool {
func (a *HpaAdapter) maxReplicasReducible(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if (hpa.Spec.MinReplicas != nil) &&
(((hpa.Spec.MaxReplicas - 1) - *hpa.Spec.MinReplicas) < 0) {
Expand All @@ -862,7 +873,7 @@ func maxReplicasReducible(obj pkgruntime.Object) bool {
if isPristine(hpa) {
return true
}
if !isScaleable(hpa) {
if !a.isScaleable(hpa) {
return false
}
if (hpa.Status.DesiredReplicas < hpa.Status.CurrentReplicas) ||
Expand All @@ -879,14 +890,14 @@ func maxReplicasReducible(obj pkgruntime.Object) bool {
// are not being used here, the max adjustment will lead it to become equal to min,
// but will not be able to scale down further and offer max to some other cluster
// which needs replicas.
func minReplicasReducible(obj pkgruntime.Object) bool {
func (a *HpaAdapter) minReplicasReducible(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if isPristine(hpa) && (hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas > 1) &&
(*hpa.Spec.MinReplicas <= hpa.Spec.MaxReplicas) {
return true
}
if !isScaleable(hpa) {
if !a.isScaleable(hpa) {
return false
}
if (hpa.Spec.MinReplicas != nil) &&
Expand All @@ -898,9 +909,9 @@ func minReplicasReducible(obj pkgruntime.Object) bool {
return false
}

func maxReplicasNeeded(obj pkgruntime.Object) bool {
func (a *HpaAdapter) maxReplicasNeeded(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if !isScaleable(hpa) {
if !a.isScaleable(hpa) {
return false
}

Expand All @@ -911,9 +922,9 @@ func maxReplicasNeeded(obj pkgruntime.Object) bool {
return false
}

func minReplicasIncreasable(obj pkgruntime.Object) bool {
func (a *HpaAdapter) minReplicasIncreasable(obj pkgruntime.Object) bool {
hpa := obj.(*autoscalingv1.HorizontalPodAutoscaler)
if !isScaleable(hpa) ||
if !a.isScaleable(hpa) ||
((hpa.Spec.MinReplicas != nil) &&
(*hpa.Spec.MinReplicas) >= hpa.Spec.MaxReplicas) {
return false
Expand Down
10 changes: 6 additions & 4 deletions federation/pkg/federatedtypes/hpa_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package federatedtypes

import (
"testing"
"time"

autoscalingv1 "k8s.io/api/autoscaling/v1"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -191,12 +190,15 @@ func TestGetHpaScheduleState(t *testing.T) {
},
}

adapter := &HpaAdapter{
scaleForbiddenWindow: ScaleForbiddenWindow,
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
if testCase.fedHpa == nil {
testCase.fedHpa = defaultFedHpa
}
scheduledState := getHpaScheduleState(testCase.fedHpa, testCase.localHpas)
scheduledState := adapter.getHpaScheduleState(testCase.fedHpa, testCase.localHpas)
checkClusterConditions(t, testCase.fedHpa, scheduledState)
if testCase.expectedReplicas != nil {
for cluster, replicas := range testCase.expectedReplicas {
Expand All @@ -216,8 +218,8 @@ func updateHpaStatus(hpa *autoscalingv1.HorizontalPodAutoscaler, currentUtilisat
now := metav1.Now()
scaledTime := now
if scaleable {
// definitely more then 5 minutes ago
scaledTime = metav1.NewTime(now.Time.Add(-6 * time.Minute))
// definitely more then ScaleForbiddenWindow time ago
scaledTime = metav1.NewTime(now.Time.Add(-2 * ScaleForbiddenWindow))
}
hpa.Status.LastScaleTime = &scaledTime
return hpa
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type NamespaceAdapter struct {
deleter deletion.NamespacedResourcesDeleterInterface
}

func NewNamespaceAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewNamespaceAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
dynamicClientPool := dynamic.NewDynamicClientPool(config)
discoverResourcesFunc := client.Discovery().ServerPreferredNamespacedResources
deleter := deletion.NewNamespacedResourcesDeleter(
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/replicaset.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type ReplicaSetAdapter struct {
client federationclientset.Interface
}

func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewReplicaSetAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
replicaSchedulingAdapter := replicaSchedulingAdapter{
preferencesAnnotationName: FedReplicaSetPreferencesAnnotation,
updateStatusFunc: func(obj pkgruntime.Object, schedulingInfo interface{}) error {
Expand Down
2 changes: 1 addition & 1 deletion federation/pkg/federatedtypes/secret.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type SecretAdapter struct {
client federationclientset.Interface
}

func NewSecretAdapter(client federationclientset.Interface, config *restclient.Config) FederatedTypeAdapter {
func NewSecretAdapter(client federationclientset.Interface, config *restclient.Config, adapterSpecificArgs map[string]interface{}) FederatedTypeAdapter {
return &SecretAdapter{client: client}
}

Expand Down
4 changes: 2 additions & 2 deletions federation/pkg/federation-controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,10 @@ type FederationSyncController struct {
}

// StartFederationSyncController starts a new sync controller for a type adapter
func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool) {
func StartFederationSyncController(kind string, adapterFactory federatedtypes.AdapterFactory, config *restclient.Config, stopChan <-chan struct{}, minimizeLatency bool, adapterSpecificArgs map[string]interface{}) {
restclient.AddUserAgent(config, fmt.Sprintf("federation-%s-controller", kind))
client := federationclientset.NewForConfigOrDie(config)
adapter := adapterFactory(client, config)
adapter := adapterFactory(client, config, adapterSpecificArgs)
controller := newFederationSyncController(client, adapter)
if minimizeLatency {
controller.minimizeLatency()
Expand Down
1 change: 1 addition & 0 deletions hack/verify-flags/known-flags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ host-network-sources
host-pid-sources
host-port-endpoints
host-system-namespace
hpa-scale-forbidden-window
http-check-frequency
http-port
ignore-daemonsets
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_federation/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var _ = framework.KubeDescribe("Federated types [Feature:Federation][Experimenta
if clusterClients == nil {
clusterClients = f.GetClusterClients()
}
adapter := fedType.AdapterFactory(f.FederationClientset, f.FederationConfig)
adapter := fedType.AdapterFactory(f.FederationClientset, f.FederationConfig, nil)
crudTester := fedframework.NewFederatedTypeCRUDTester(adapter, clusterClients)
obj := adapter.NewTestObject(f.FederationNamespace.Name)
crudTester.CheckLifecycle(obj)
Expand Down
Loading

0 comments on commit 2be69a5

Please sign in to comment.