Skip to content

Commit

Permalink
[occm] add a node selector support for loadbalancer services (kuberne…
Browse files Browse the repository at this point in the history
…tes#2601)

* POC of TargetNodeLabels selector on OpenStack LB

* Fix type errors

* Update implementation of getKeyValuePropertiesFromServiceAnnotation

* gofmt -w -s ./pkg

* Polish the code and add documentation

---------

Co-authored-by: Ririko Nakamura <[email protected]>
  • Loading branch information
kayrus and ririko-nakamura authored May 27, 2024
1 parent 309db6d commit 59efe1c
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,12 @@ Request Body:
This annotation is automatically added and it contains the floating ip address of the load balancer service.
When using `loadbalancer.openstack.org/hostname` annotation it is the only place to see the real address of the load balancer.

- `loadbalancer.openstack.org/node-selector`

A set of key=value annotations used to filter nodes for targeting by the load balancer. When defined, only nodes that match all the specified key=value annotations will be targeted. If an annotation includes only a key without a value, the filter will check only for the existence of the key on the node. If the value is not set, the `node-selector` value defined in the OCCM configuration is applied.

Example: To filter nodes with the labels `env=production` and `region=default`, set the `loadbalancer.openstack.org/node-selector` annotation to `env=production, region=default`

### Switching between Floating Subnets by using preconfigured Classes

If you have multiple `FloatingIPPools` and/or `FloatingIPSubnets` it might be desirable to offer the user logical meanings for `LoadBalancers` like `internetFacing` or `DMZ` instead of requiring the user to select a dedicated network or subnet ID at the service object level as an annotation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ Although the openstack-cloud-controller-manager was initially implemented with N
* `ROUND_ROBIN` (default)
* `LEAST_CONNECTIONS`
* `SOURCE_IP`
If `lb-provider` is set to "ovn" the value must be set to `SOURCE_IP_PORT`.
* `lb-provider`
Expand Down Expand Up @@ -248,6 +248,23 @@ Although the openstack-cloud-controller-manager was initially implemented with N
* `internal-lb`
Determines whether or not to create an internal load balancer (no floating IP) by default. Default: false.
* `node-selector`
A comma separated list of key=value annotations used to filter nodes for targeting by the load balancer. When defined, only nodes that match all the specified key=value annotations will be targeted. If an annotation includes only a key without a value, the filter will check only for the existence of the key on the node. When node-selector is not set (default value), all nodes will be added as members to a load balancer pool.
Note: This configuration option can be overridden with the `loadbalancer.openstack.org/node-selector` service annotation. Refer to [Exposing applications using services of LoadBalancer type](./expose-applications-using-loadbalancer-type-service.md)
Example: To filter nodes with the labels `env=production` and `region=default`, set the `node-selector` as follows:
```
node-selector="env=production, region=default"
```
Example: To filter nodes that have the key `env` with any value and the key `region` specifically set to `default`, set the `node-selector` as follows:
```
node-selector="env, region=default"
```
* `cascade-delete`
Determines whether or not to perform cascade deletion of load balancers. Default: true.
Expand Down
80 changes: 74 additions & 6 deletions pkg/openstack/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ const (
annotationXForwardedFor = "X-Forwarded-For"

ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
ServiceAnnotationLoadBalancerNodeSelector = "loadbalancer.openstack.org/node-selector"
ServiceAnnotationLoadBalancerConnLimit = "loadbalancer.openstack.org/connection-limit"
ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
ServiceAnnotationLoadBalancerFloatingSubnet = "loadbalancer.openstack.org/floating-subnet"
Expand Down Expand Up @@ -119,6 +120,7 @@ type serviceConfig struct {
lbMemberSubnetID string
lbPublicNetworkID string
lbPublicSubnetSpec *floatingSubnetSpec
nodeSelectors map[string]string
keepClientIP bool
enableProxyProtocol bool
timeoutClientData int
Expand Down Expand Up @@ -405,6 +407,14 @@ func nodeAddressForLB(node *corev1.Node, preferredIPFamily corev1.IPFamily) (str
return "", cpoerrors.ErrNoAddressFound
}

// getKeyValueFromServiceAnnotation converts a comma-separated list of key-value
// pairs from the specified annotation into a map or returns the specified
// defaultSetting if the annotation is empty
func getKeyValueFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting string) map[string]string {
annotationValue := getStringFromServiceAnnotation(service, annotationKey, defaultSetting)
return cpoutil.StringToMap(annotationValue)
}

// getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
func getStringFromServiceAnnotation(service *corev1.Service, annotationKey string, defaultSetting string) string {
klog.V(4).Infof("getStringFromServiceAnnotation(%s/%s, %v, %v)", service.Namespace, service.Name, annotationKey, defaultSetting)
Expand Down Expand Up @@ -1229,6 +1239,16 @@ func (lbaas *LbaasV2) checkServiceUpdate(service *corev1.Service, nodes []*corev
svcConf.lbID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerID, "")
svcConf.supportLBTags = openstackutil.IsOctaviaFeatureSupported(lbaas.lb, openstackutil.OctaviaFeatureTags, lbaas.opts.LBProvider)

// Get service node-selector annotations
svcConf.nodeSelectors = getKeyValueFromServiceAnnotation(service, ServiceAnnotationLoadBalancerNodeSelector, lbaas.opts.NodeSelector)
for key, value := range svcConf.nodeSelectors {
if value == "" {
klog.V(3).InfoS("Target node label %s key is set to LoadBalancer service %s", key, serviceName)
} else {
klog.V(3).InfoS("Target node label %s=%s is set to LoadBalancer service %s", key, value, serviceName)
}
}

// Find subnet ID for creating members
memberSubnetID, err := lbaas.getMemberSubnetID(service)
if err != nil {
Expand Down Expand Up @@ -1314,6 +1334,16 @@ func (lbaas *LbaasV2) checkService(service *corev1.Service, nodes []*corev1.Node
svcConf.lbID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerID, "")
svcConf.supportLBTags = openstackutil.IsOctaviaFeatureSupported(lbaas.lb, openstackutil.OctaviaFeatureTags, lbaas.opts.LBProvider)

// Get service node-selector annotations
svcConf.nodeSelectors = getKeyValueFromServiceAnnotation(service, ServiceAnnotationLoadBalancerNodeSelector, lbaas.opts.NodeSelector)
for key, value := range svcConf.nodeSelectors {
if value == "" {
klog.V(3).InfoS("Target node label %s key is set to LoadBalancer service %s", key, serviceName)
} else {
klog.V(3).InfoS("Target node label %s=%s is set to LoadBalancer service %s", key, value, serviceName)
}
}

// If in the config file internal-lb=true, user is not allowed to create external service.
if lbaas.opts.InternalLB {
if !getBoolFromServiceAnnotation(service, ServiceAnnotationLoadBalancerInternal, false) {
Expand Down Expand Up @@ -1602,6 +1632,9 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
return nil, err
}

// apply node-selector to a list of nodes
filteredNodes := filterNodes(nodes, svcConf.nodeSelectors)

// Use more meaningful name for the load balancer but still need to check the legacy name for backward compatibility.
lbName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
svcConf.lbName = lbName
Expand Down Expand Up @@ -1666,7 +1699,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
return nil, fmt.Errorf("error getting loadbalancer for Service %s: %v", serviceName, err)
}
klog.InfoS("Creating loadbalancer", "lbName", lbName, "service", klog.KObj(service))
loadbalancer, err = lbaas.createOctaviaLoadBalancer(lbName, clusterName, service, nodes, svcConf)
loadbalancer, err = lbaas.createOctaviaLoadBalancer(lbName, clusterName, service, filteredNodes, svcConf)
if err != nil {
return nil, fmt.Errorf("error creating loadbalancer %s: %v", lbName, err)
}
Expand Down Expand Up @@ -1712,7 +1745,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
return nil, err
}

pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, lbName), listener, service, port, nodes, svcConf)
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, lbName), listener, service, port, filteredNodes, svcConf)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1765,7 +1798,7 @@ func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName
status := lbaas.createLoadBalancerStatus(service, svcConf, addr)

if lbaas.opts.ManageSecurityGroups {
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, nodes, svcConf)
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, filteredNodes, svcConf)
if err != nil {
return status, fmt.Errorf("failed when reconciling security groups for LB service %v/%v: %v", service.Namespace, service.Name, err)
}
Expand Down Expand Up @@ -1818,8 +1851,11 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
return err
}

// apply node-selector to a list of nodes
filteredNodes := filterNodes(nodes, svcConf.nodeSelectors)

serviceName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
klog.V(2).Infof("Updating %d nodes for Service %s in cluster %s", len(nodes), serviceName, clusterName)
klog.V(2).Infof("Updating %d nodes for Service %s in cluster %s", len(filteredNodes), serviceName, clusterName)

// Get load balancer
var loadbalancer *loadbalancers.LoadBalancer
Expand Down Expand Up @@ -1866,7 +1902,7 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadbalancer.ID, port.Port, port.Protocol)
}

pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, loadbalancer.Name), &listener, service, port, nodes, svcConf)
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cpoutil.Sprintf255(poolFormat, portIndex, loadbalancer.Name), &listener, service, port, filteredNodes, svcConf)
if err != nil {
return err
}
Expand All @@ -1878,7 +1914,7 @@ func (lbaas *LbaasV2) updateOctaviaLoadBalancer(ctx context.Context, clusterName
}

if lbaas.opts.ManageSecurityGroups {
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, nodes, svcConf)
err := lbaas.ensureAndUpdateOctaviaSecurityGroup(clusterName, service, filteredNodes, svcConf)
if err != nil {
return fmt.Errorf("failed to update Security Group for loadbalancer service %s: %v", serviceName, err)
}
Expand Down Expand Up @@ -2184,3 +2220,35 @@ func PreserveGopherError(rawError error) error {
}
return rawError
}

// filterNodes uses node labels to filter the nodes that should be targeted by the LB,
// ensuring that all the labels provided in an annotation are present on the nodes
func filterNodes(nodes []*corev1.Node, filterLabels map[string]string) []*corev1.Node {
if len(filterLabels) == 0 {
return nodes
}

filteredNodes := make([]*corev1.Node, 0, len(nodes))
for _, node := range nodes {
if matchNodeLabels(node, filterLabels) {
filteredNodes = append(filteredNodes, node)
}
}

return filteredNodes
}

// matchNodeLabels checks if a node has all the labels in filterLabels with matching values
func matchNodeLabels(node *corev1.Node, filterLabels map[string]string) bool {
if node == nil || len(node.Labels) == 0 {
return false
}

for k, v := range filterLabels {
if nodeLabelValue, ok := node.Labels[k]; !ok || (v != "" && nodeLabelValue != v) {
return false
}
}

return true
}
101 changes: 101 additions & 0 deletions pkg/openstack/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2452,3 +2452,104 @@ func TestBuildListenerCreateOpt(t *testing.T) {
})
}
}

func TestFilterNodes(t *testing.T) {
tests := []struct {
name string
nodeLabels map[string]string
service *corev1.Service
annotationKey string
defaultSetting map[string]string
nodeFiltered bool
}{
{
name: "when no filter is provided, node should be filtered",
nodeLabels: map[string]string{"k1": "v1"},
service: &corev1.Service{
ObjectMeta: v1.ObjectMeta{},
},
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
defaultSetting: make(map[string]string),
nodeFiltered: true,
},
{
name: "when all key-value filters match, node should be filtered",
nodeLabels: map[string]string{"k1": "v1", "k2": "v2"},
service: &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k1=v1,k2=v2"},
},
},
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
defaultSetting: make(map[string]string),
nodeFiltered: true,
},
{
name: "when all key-value filters match and a key value contains equals sign, node should be filtered",
nodeLabels: map[string]string{"k1": "v1", "k2": "v2=true"},
service: &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k1=v1,k2=v2=true"},
},
},
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
defaultSetting: make(map[string]string),
nodeFiltered: true,
},
{
name: "when all just-key filter match, node should be filtered",
nodeLabels: map[string]string{"k1": "v1", "k2": "v2"},
service: &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k1,k2"},
},
},
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
defaultSetting: make(map[string]string),
nodeFiltered: true,
},
{
name: "when some filters do not match, node should not be filtered",
nodeLabels: map[string]string{"k1": "v1"},
service: &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: " k1=v1, k2 "},
},
},
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
defaultSetting: make(map[string]string),
nodeFiltered: false,
},
{
name: "when no filter matches, node should not be filtered",
nodeLabels: map[string]string{"k1": "v1", "k2": "v2"},
service: &corev1.Service{
ObjectMeta: v1.ObjectMeta{
Annotations: map[string]string{ServiceAnnotationLoadBalancerNodeSelector: "k3=v3"},
},
},
annotationKey: ServiceAnnotationLoadBalancerNodeSelector,
defaultSetting: make(map[string]string),
nodeFiltered: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
node := &corev1.Node{}
node.Labels = test.nodeLabels

// TODO: add testArgs
targetNodeLabels := getKeyValueFromServiceAnnotation(test.service, ServiceAnnotationLoadBalancerNodeSelector, "")

nodes := []*corev1.Node{node}
filteredNodes := filterNodes(nodes, targetNodeLabels)

if test.nodeFiltered {
assert.Equal(t, nodes, filteredNodes)
} else {
assert.Empty(t, filteredNodes)
}
})
}
}
4 changes: 3 additions & 1 deletion pkg/openstack/openstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ type LoadBalancerOpts struct {
MonitorMaxRetries uint `gcfg:"monitor-max-retries"`
MonitorMaxRetriesDown uint `gcfg:"monitor-max-retries-down"`
ManageSecurityGroups bool `gcfg:"manage-security-groups"`
InternalLB bool `gcfg:"internal-lb"` // default false
InternalLB bool `gcfg:"internal-lb"` // default false
NodeSelector string `gcfg:"node-selector"` // If specified, the loadbalancer members will be assined only from nodes list filtered by node-selector labels
CascadeDelete bool `gcfg:"cascade-delete"`
FlavorID string `gcfg:"flavor-id"`
AvailabilityZone string `gcfg:"availability-zone"`
Expand Down Expand Up @@ -222,6 +223,7 @@ func ReadConfig(config io.Reader) (Config, error) {
// Set default values explicitly
cfg.LoadBalancer.Enabled = true
cfg.LoadBalancer.InternalLB = false
cfg.LoadBalancer.NodeSelector = ""
cfg.LoadBalancer.LBProvider = "amphora"
cfg.LoadBalancer.LBMethod = "ROUND_ROBIN"
cfg.LoadBalancer.CreateMonitor = false
Expand Down
26 changes: 26 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
Expand Down Expand Up @@ -77,6 +78,31 @@ func Contains(list []string, strToSearch string) bool {
return false
}

// StringToMap converts a string of comma-separated key-values into a map
func StringToMap(str string) map[string]string {
// break up a "key1=val,key2=val2,key3=,key4" string into a list
values := strings.Split(strings.TrimSpace(str), ",")
keyValues := make(map[string]string, len(values))

for _, kv := range values {
kv := strings.SplitN(strings.TrimSpace(kv), "=", 2)

k := kv[0]
if len(kv) == 1 {
if k != "" {
// process "key=" or "key"
keyValues[k] = ""
}
continue
}

// process "key=val" or "key=val=foo"
keyValues[k] = kv[1]
}

return keyValues
}

// RoundUpSize calculates how many allocation units are needed to accommodate
// a volume of given size. E.g. when user wants 1500MiB volume, while AWS EBS
// allocates volumes in gibibyte-sized chunks,
Expand Down
Loading

0 comments on commit 59efe1c

Please sign in to comment.