Skip to content

Commit

Permalink
endpoints controller: don't consider terminal endpoints
Browse files Browse the repository at this point in the history
Terminal pods, whose phase its Failed or Succeeded, are guaranteed
to never regress and to be stopped, so their IPs never should
be published on the Endpoints.
  • Loading branch information
aojea authored and robscott committed May 27, 2022
1 parent 767e6a0 commit dba5200
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 115 deletions.
32 changes: 9 additions & 23 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,6 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
return err
}

// If the user specified the older (deprecated) annotation, we have to respect it.
tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses

// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the endpoints object.
Expand All @@ -416,12 +413,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
var totalNotReadyEps int

for _, pod := range pods {
if len(pod.Status.PodIP) == 0 {
klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
if !endpointutil.ShouldPodBeInEndpointSlice(pod, service.Spec.PublishNotReadyAddresses) {
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}

Expand All @@ -441,7 +434,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
Expand All @@ -455,7 +448,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
epp := endpointPortFromServicePort(servicePort, portNum)

var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
Expand Down Expand Up @@ -591,6 +584,10 @@ func (e *Controller) checkLeftoverEndpoints() {
}
}

// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset.
// The addresses are added to the corresponding field, ready or not ready, depending
// on the pod status and the Service PublishNotReadyAddresses field value.
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpointSlice.
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
var readyEps int
Expand All @@ -605,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
Ports: ports,
})
readyEps++
} else if shouldPodBeInEndpoints(pod) {
} else { // if it is not a ready address it has to be not ready
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Expand All @@ -616,17 +613,6 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
return subsets, readyEps, notReadyEps
}

func shouldPodBeInEndpoints(pod *v1.Pod) bool {
switch pod.Spec.RestartPolicy {
case v1.RestartPolicyNever:
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
case v1.RestartPolicyOnFailure:
return pod.Status.Phase != v1.PodSucceeded
default:
return true
}
}

func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
epp := &v1.EndpointPort{
Name: servicePort.Name,
Expand Down
Loading

0 comments on commit dba5200

Please sign in to comment.