Skip to content

Commit

Permalink
Merge pull request kubernetes#110258 from robscott/automated-cherry-p…
Browse files Browse the repository at this point in the history
…ick-of-#110255-upstream-release-1.24

Automated cherry pick of kubernetes#110255: Endpoints and EndpointSlices should not publish IPs for terminal pods
  • Loading branch information
k8s-ci-robot authored Jun 3, 2022
2 parents 0a52038 + 68e8b66 commit a6b031e
Show file tree
Hide file tree
Showing 10 changed files with 441 additions and 139 deletions.
10 changes: 10 additions & 0 deletions pkg/api/v1/pod/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ func IsPodReady(pod *v1.Pod) bool {
return IsPodReadyConditionTrue(pod.Status)
}

// IsPodTerminal returns true if a pod is terminal, all containers are stopped and cannot ever regress.
func IsPodTerminal(pod *v1.Pod) bool {
return IsPodPhaseTerminal(pod.Status.Phase)
}

// IsPhaseTerminal returns true if the pod's phase is terminal.
func IsPodPhaseTerminal(phase v1.PodPhase) bool {
return phase == v1.PodFailed || phase == v1.PodSucceeded
}

// IsPodReadyConditionTrue returns true if a pod is ready; false otherwise.
func IsPodReadyConditionTrue(status v1.PodStatus) bool {
condition := GetPodReadyCondition(status)
Expand Down
42 changes: 42 additions & 0 deletions pkg/api/v1/pod/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,6 +749,48 @@ func TestIsPodAvailable(t *testing.T) {
}
}

func TestIsPodTerminal(t *testing.T) {
now := metav1.Now()

tests := []struct {
podPhase v1.PodPhase
expected bool
}{
{
podPhase: v1.PodFailed,
expected: true,
},
{
podPhase: v1.PodSucceeded,
expected: true,
},
{
podPhase: v1.PodUnknown,
expected: false,
},
{
podPhase: v1.PodPending,
expected: false,
},
{
podPhase: v1.PodRunning,
expected: false,
},
{
expected: false,
},
}

for i, test := range tests {
pod := newPod(now, true, 0)
pod.Status.Phase = test.podPhase
isTerminal := IsPodTerminal(pod)
if isTerminal != test.expected {
t.Errorf("[tc #%d] expected terminal pod: %t, got: %t", i, test.expected, isTerminal)
}
}
}

func TestGetContainerStatus(t *testing.T) {
type ExpectedStruct struct {
status v1.ContainerStatus
Expand Down
32 changes: 9 additions & 23 deletions pkg/controller/endpoint/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,6 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
return err
}

// If the user specified the older (deprecated) annotation, we have to respect it.
tolerateUnreadyEndpoints := service.Spec.PublishNotReadyAddresses

// We call ComputeEndpointLastChangeTriggerTime here to make sure that the
// state of the trigger time tracker gets updated even if the sync turns out
// to be no-op and we don't update the endpoints object.
Expand All @@ -416,12 +413,8 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
var totalNotReadyEps int

for _, pod := range pods {
if len(pod.Status.PodIP) == 0 {
klog.V(5).Infof("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
if !tolerateUnreadyEndpoints && pod.DeletionTimestamp != nil {
klog.V(5).Infof("Pod is being deleted %s/%s", pod.Namespace, pod.Name)
if !endpointutil.ShouldPodBeInEndpoints(pod, service.Spec.PublishNotReadyAddresses) {
klog.V(5).Infof("Pod %s/%s is not included on endpoints for Service %s/%s", pod.Namespace, pod.Name, service.Namespace, service.Name)
continue
}

Expand All @@ -441,7 +434,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
// Allow headless service not to have ports.
if len(service.Spec.Ports) == 0 {
if service.Spec.ClusterIP == api.ClusterIPNone {
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
subsets, totalReadyEps, totalNotReadyEps = addEndpointSubset(subsets, pod, epa, nil, service.Spec.PublishNotReadyAddresses)
// No need to repack subsets for headless service without ports.
}
} else {
Expand All @@ -455,7 +448,7 @@ func (e *Controller) syncService(ctx context.Context, key string) error {
epp := endpointPortFromServicePort(servicePort, portNum)

var readyEps, notReadyEps int
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
subsets, readyEps, notReadyEps = addEndpointSubset(subsets, pod, epa, epp, service.Spec.PublishNotReadyAddresses)
totalReadyEps = totalReadyEps + readyEps
totalNotReadyEps = totalNotReadyEps + notReadyEps
}
Expand Down Expand Up @@ -591,6 +584,10 @@ func (e *Controller) checkLeftoverEndpoints() {
}
}

// addEndpointSubset add the endpoints addresses and ports to the EndpointSubset.
// The addresses are added to the corresponding field, ready or not ready, depending
// on the pod status and the Service PublishNotReadyAddresses field value.
// The pod passed to this function must have already been filtered through ShouldPodBeInEndpoints.
func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress,
epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) {
var readyEps int
Expand All @@ -605,7 +602,7 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
Ports: ports,
})
readyEps++
} else if shouldPodBeInEndpoints(pod) {
} else { // if it is not a ready address it has to be not ready
klog.V(5).Infof("Pod is out of service: %s/%s", pod.Namespace, pod.Name)
subsets = append(subsets, v1.EndpointSubset{
NotReadyAddresses: []v1.EndpointAddress{epa},
Expand All @@ -616,17 +613,6 @@ func addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.Endpoint
return subsets, readyEps, notReadyEps
}

func shouldPodBeInEndpoints(pod *v1.Pod) bool {
switch pod.Spec.RestartPolicy {
case v1.RestartPolicyNever:
return pod.Status.Phase != v1.PodFailed && pod.Status.Phase != v1.PodSucceeded
case v1.RestartPolicyOnFailure:
return pod.Status.Phase != v1.PodSucceeded
default:
return true
}
}

func endpointPortFromServicePort(servicePort *v1.ServicePort, portNum int) *v1.EndpointPort {
epp := &v1.EndpointPort{
Name: servicePort.Name,
Expand Down
Loading

0 comments on commit a6b031e

Please sign in to comment.