Skip to content

Commit

Permalink
support cpu/memory scaler (kedacore#1215)
Browse files Browse the repository at this point in the history
Signed-off-by: silenceper <[email protected]>
  • Loading branch information
silenceper authored Oct 13, 2020
1 parent 3be0c56 commit a17ac2a
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 97 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
- Define KEDA readiness and liveness probes ([#788](https://github.com/kedacore/keda/issues/788))
- KEDA Support for configurable scaling behavior in HPA v2beta2 ([#802](https://github.com/kedacore/keda/issues/802))
- Add External Push scaler ([#820](https://github.com/kedacore/keda/issues/820) | [docs](https://keda.sh/docs/2.0/scalers/external-push/))
- Add Standard Resource metrics to KEDA ([#874](https://github.com/kedacore/keda/pull/874))
- Managed Identity support for Azure Monitor scaler ([#936](https://github.com/kedacore/keda/issues/936))
- Add support for multiple triggers on ScaledObject ([#476](https://github.com/kedacore/keda/issues/476))
- Add consumer offset reset policy option to Kafka scaler ([#925](https://github.com/kedacore/keda/pull/925))
Expand All @@ -31,7 +30,7 @@
- Add support for multiple redis list types in redis list scaler ([#1006](https://github.com/kedacore/keda/pull/1006)) | [docs](https://keda.sh/docs/2.0/scalers/redis-lists/))
- Introduce Azure Log Analytics scaler ([#1061](https://github.com/kedacore/keda/issues/1061)) | [docs](https://keda.sh/docs/2.0/scalers/azure-log-analytics/))
- Add Metrics API Scaler ([#1026](https://github.com/kedacore/keda/pull/1026))

- Add cpu/memory Scaler ([#1215](https://github.com/kedacore/keda/pull/1215))

### Improvements

Expand Down
3 changes: 2 additions & 1 deletion api/v1alpha1/scaledobject_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type AdvancedConfig struct {

// HorizontalPodAutoscalerConfig specifies horizontal scale config
type HorizontalPodAutoscalerConfig struct {
ResourceMetrics []*autoscalingv2beta2.ResourceMetricSource `json:"resourceMetrics,omitempty"`
// +optional
Behavior *autoscalingv2beta2.HorizontalPodAutoscalerBehavior `json:"behavior,omitempty"`
}
Expand Down Expand Up @@ -94,6 +93,8 @@ type ScaledObjectStatus struct {
// +optional
ExternalMetricNames []string `json:"externalMetricNames,omitempty"`
// +optional
ResourceMetricNames []string `json:"resourceMetricNames,omitempty"`
// +optional
Conditions Conditions `json:"conditions,omitempty"`
}

Expand Down
16 changes: 5 additions & 11 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

61 changes: 4 additions & 57 deletions config/crd/bases/keda.sh_scaledobjects.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -187,63 +187,6 @@ spec:
type: integer
type: object
type: object
resourceMetrics:
items:
description: ResourceMetricSource indicates how to scale
on a resource metric known to Kubernetes, as specified
in requests and limits, describing each pod in the current
scale target (e.g. CPU or memory). The values will be
averaged together before being compared to the target. Such
metrics are built in to Kubernetes, and have special scaling
options on top of those available to normal per-pod metrics
using the "pods" source. Only one "target" type should
be set.
properties:
name:
description: name is the name of the resource in question.
type: string
target:
description: target specifies the target value for the
given metric
properties:
averageUtilization:
description: averageUtilization is the target value
of the average of the resource metric across all
relevant pods, represented as a percentage of
the requested value of the resource for the pods.
Currently only valid for Resource metric source
type
format: int32
type: integer
averageValue:
anyOf:
- type: integer
- type: string
description: averageValue is the target value of
the average of the metric across all relevant
pods (as a quantity)
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
type:
description: type represents whether the metric
type is Utilization, Value, or AverageValue
type: string
value:
anyOf:
- type: integer
- type: string
description: value is the target value of the metric
(as a quantity).
pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$
x-kubernetes-int-or-string: true
required:
- type
type: object
required:
- name
- target
type: object
type: array
type: object
restoreToOriginalReplicaCount:
type: boolean
Expand Down Expand Up @@ -342,6 +285,10 @@ spec:
originalReplicaCount:
format: int32
type: integer
resourceMetricNames:
items:
type: string
type: array
scaleTargetGVKR:
description: GroupVersionKindResource provides unified structure for
schema.GroupVersionKind and Resource
Expand Down
35 changes: 12 additions & 23 deletions controllers/hpa.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,35 +135,36 @@ func (r *ScaledObjectReconciler) updateHPAIfNeeded(logger logr.Logger, scaledObj
func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) ([]autoscalingv2beta2.MetricSpec, error) {
var scaledObjectMetricSpecs []autoscalingv2beta2.MetricSpec
var externalMetricNames []string
var resourceMetricNames []string

scalers, err := r.scaleHandler.GetScalers(scaledObject)
if err != nil {
logger.Error(err, "Error getting scalers")
return nil, err
}

// Handling the Resource metrics through KEDA
if scaledObject.Spec.Advanced != nil && scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig != nil {
metrics := getResourceMetrics(scaledObject.Spec.Advanced.HorizontalPodAutoscalerConfig.ResourceMetrics)
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metrics...)
}

for _, scaler := range scalers {
metricSpecs := scaler.GetMetricSpecForScaling()

// add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it.
for _, metricSpec := range metricSpecs {
metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name)
if metricSpec.Resource != nil {
resourceMetricNames = append(resourceMetricNames, string(metricSpec.Resource.Name))
}
if metricSpec.External != nil {
// add the scaledObjectName label. This is how the MetricsAdapter will know which scaledobject a metric is for when the HPA queries it.
metricSpec.External.Metric.Selector = &metav1.LabelSelector{MatchLabels: make(map[string]string)}
metricSpec.External.Metric.Selector.MatchLabels["scaledObjectName"] = scaledObject.Name
externalMetricNames = append(externalMetricNames, metricSpec.External.Metric.Name)
}
}
scaledObjectMetricSpecs = append(scaledObjectMetricSpecs, metricSpecs...)
scaler.Close()
}

// store External.MetricNames used by scalers defined in the ScaledObject
// store External.MetricNames,Resource.MetricsNames used by scalers defined in the ScaledObject
status := scaledObject.Status.DeepCopy()
status.ExternalMetricNames = externalMetricNames
status.ResourceMetricNames = resourceMetricNames
err = kedacontrollerutil.UpdateScaledObjectStatus(r.Client, logger, scaledObject, status)
if err != nil {
logger.Error(err, "Error updating scaledObject status with used externalMetricNames")
Expand All @@ -173,18 +174,6 @@ func (r *ScaledObjectReconciler) getScaledObjectMetricSpecs(logger logr.Logger,
return scaledObjectMetricSpecs, nil
}

func getResourceMetrics(resourceMetrics []*autoscalingv2beta2.ResourceMetricSource) []autoscalingv2beta2.MetricSpec {
metrics := make([]autoscalingv2beta2.MetricSpec, 0, len(resourceMetrics))
for _, resourceMetric := range resourceMetrics {
metrics = append(metrics, autoscalingv2beta2.MetricSpec{
Type: "Resource",
Resource: resourceMetric,
})
}

return metrics
}

// checkMinK8sVersionforHPABehavior min version (k8s v1.18) for HPA Behavior
func (r *ScaledObjectReconciler) checkMinK8sVersionforHPABehavior(logger logr.Logger, scaledObject *kedav1alpha1.ScaledObject) {
if r.kubeVersion.MinorVersion < 18 {
Expand Down
101 changes: 101 additions & 0 deletions pkg/scalers/cpu_memory_scaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package scalers

import (
"context"
"fmt"
"strconv"

"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/metrics/pkg/apis/external_metrics"
)

type cpuMemoryScaler struct {
metadata *cpuMemoryMetadata
resourceName v1.ResourceName
}

type cpuMemoryMetadata struct {
Type v2beta2.MetricTargetType
Value *resource.Quantity
AverageValue *resource.Quantity
AverageUtilization *int32
}

// NewCPUMemoryScaler creates a new cpuMemoryScaler
func NewCPUMemoryScaler(resourceName v1.ResourceName, config *ScalerConfig) (Scaler, error) {
meta, parseErr := parseResourceMetadata(config)
if parseErr != nil {
return nil, fmt.Errorf("error parsing %s metadata: %s", resourceName, parseErr)
}

return &cpuMemoryScaler{
metadata: meta,
resourceName: resourceName,
}, nil
}

func parseResourceMetadata(config *ScalerConfig) (*cpuMemoryMetadata, error) {
meta := &cpuMemoryMetadata{}
if val, ok := config.TriggerMetadata["type"]; ok && val != "" {
meta.Type = v2beta2.MetricTargetType(val)
} else {
return nil, fmt.Errorf("no type given")
}

var value string
var ok bool
if value, ok = config.TriggerMetadata["value"]; !ok || value == "" {
return nil, fmt.Errorf("no value given")
}
switch meta.Type {
case v2beta2.ValueMetricType:
valueQuantity := resource.MustParse(value)
meta.Value = &valueQuantity
case v2beta2.AverageValueMetricType:
averageValueQuantity := resource.MustParse(value)
meta.AverageValue = &averageValueQuantity
case v2beta2.UtilizationMetricType:
valueNum, err := strconv.Atoi(value)
if err != nil {
return nil, err
}
utilizationNum := int32(valueNum)
meta.AverageUtilization = &utilizationNum
default:
return nil, fmt.Errorf("unsupport type")
}
return meta, nil
}

// IsActive always return true for cpu/memory scaler
func (s *cpuMemoryScaler) IsActive(ctx context.Context) (bool, error) {
return true, nil
}

//Close no need for cpuMemory scaler
func (s *cpuMemoryScaler) Close() error {
return nil
}

// GetMetricSpecForScaling returns the metric spec for the HPA
func (s *cpuMemoryScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec {
cpuMemoryMetric := &v2beta2.ResourceMetricSource{
Name: s.resourceName,
Target: v2beta2.MetricTarget{
Type: s.metadata.Type,
Value: s.metadata.Value,
AverageUtilization: s.metadata.AverageUtilization,
AverageValue: s.metadata.AverageValue,
},
}
metricSpec := v2beta2.MetricSpec{Resource: cpuMemoryMetric, Type: v2beta2.ResourceMetricSourceType}
return []v2beta2.MetricSpec{metricSpec}
}

// GetMetrics no need for cpu/memory scaler
func (s *cpuMemoryScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) {
return nil, nil
}
57 changes: 57 additions & 0 deletions pkg/scalers/cpu_memory_scaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package scalers

import (
"testing"

"github.com/stretchr/testify/assert"
"k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/core/v1"
)

type parseCPUMemoryMetadataTestData struct {
metadata map[string]string
isError bool
}

// A complete valid metadata example for reference
var validCPUMemoryMetadata = map[string]string{
"type": "Utilization",
"value": "50",
}

var testCPUMemoryMetadata = []parseCPUMemoryMetadataTestData{
{map[string]string{}, true},
{validCPUMemoryMetadata, false},
{map[string]string{"type": "Utilization", "value": "50"}, false},
{map[string]string{"type": "Value", "value": "50"}, false},
{map[string]string{"type": "AverageValue", "value": "50"}, false},
{map[string]string{"type": "AverageValue"}, true},
{map[string]string{"type": "xxx", "value": "50"}, true},
}

func TestCPUMemoryParseMetadata(t *testing.T) {
for _, testData := range testCPUMemoryMetadata {
config := &ScalerConfig{
TriggerMetadata: testData.metadata,
}
_, err := parseResourceMetadata(config)
if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
}
if testData.isError && err == nil {
t.Error("Expected error but got success")
}
}
}

func TestGetMetricSpecForScaling(t *testing.T) {
config := &ScalerConfig{
TriggerMetadata: validCPUMemoryMetadata,
}
scaler, _ := NewCPUMemoryScaler(v1.ResourceCPU, config)
metricSpec := scaler.GetMetricSpecForScaling()

assert.Equal(t, metricSpec[0].Type, v2beta2.ResourceMetricSourceType)
assert.Equal(t, metricSpec[0].Resource.Name, v1.ResourceCPU)
assert.Equal(t, metricSpec[0].Resource.Target.Type, v2beta2.UtilizationMetricType)
}
Loading

0 comments on commit a17ac2a

Please sign in to comment.