Skip to content

Commit

Permalink
Merge pull request kubernetes#128929 from orange30/master
Browse files Browse the repository at this point in the history
fix: kube-proxy EndpointSliceCache memory is leaked
  • Loading branch information
k8s-ci-robot authored Dec 12, 2024
2 parents 477d015 + 3195883 commit aa931aa
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/proxy/endpointslicecache.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ func (cache *EndpointSliceCache) checkoutChanges() map[types.NamespacedName]*end
}

delete(esTracker.pending, name)
if len(esTracker.applied) == 0 && len(esTracker.pending) == 0 {
delete(cache.trackerByServiceMap, serviceNN)
}
}

change.current = cache.getEndpointsMap(serviceNN, esTracker.applied)
Expand Down
105 changes: 105 additions & 0 deletions pkg/proxy/endpointslicecache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"reflect"
"testing"
"time"

v1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
Expand Down Expand Up @@ -559,3 +560,107 @@ func (cmc *cacheMutationCheck) Check(t *testing.T) {
}
}
}

func TestEndpointSliceCacheClearedCorrectly(t *testing.T) {
initEndpointsPorts := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
NodeName: ptr.To(testHostname),
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("n1"),
Port: ptr.To[int32](11),
Protocol: ptr.To(v1.ProtocolUDP),
}}
}

testCases := []struct {
name string
currEndpointSlices []*discovery.EndpointSlice
}{
{
name: "one endpoint slice",
currEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts),
},
},
{
name: "two endpoint slices, same namespace",
currEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts),
makeTestEndpointSlice("ns1", "ep2", 2, initEndpointsPorts),
},
},
{
name: "two endpoint slices, same service",
currEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts),
makeTestEndpointSlice("ns1", "ep1", 2, initEndpointsPorts),
},
},
{
name: "two endpoint slices, different namespace",
currEndpointSlices: []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "ep1", 1, initEndpointsPorts),
makeTestEndpointSlice("ns2", "ep1", 2, initEndpointsPorts),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
fp.hostname = testHostname

for _, epSlice := range tc.currEndpointSlices {
fp.addEndpointSlice(epSlice)
}
fp.endpointsMap.Update(fp.endpointsChanges)

for _, epSlice := range tc.currEndpointSlices {
fp.deleteEndpointSlice(epSlice)
}
fp.endpointsMap.Update(fp.endpointsChanges)

if len(fp.endpointsChanges.endpointSliceCache.trackerByServiceMap) != 0 {
t.Errorf("expected: endpointSliceCache does not have any entries, got: %v", fp.endpointsChanges.endpointSliceCache.trackerByServiceMap)
}
})
}
}

func TestSameServiceEndpointSliceCacheClearedCorrectly(t *testing.T) {
initEndpointsPorts := func(eps *discovery.EndpointSlice) {
eps.Endpoints = []discovery.Endpoint{{
Addresses: []string{"1.1.1.1"},
NodeName: ptr.To(testHostname),
}}
eps.Ports = []discovery.EndpointPort{{
Name: ptr.To("n1"),
Port: ptr.To[int32](11),
Protocol: ptr.To(v1.ProtocolUDP),
}}
}

currEndpointSlices := []*discovery.EndpointSlice{
makeTestEndpointSlice("ns1", "svc1", 1, initEndpointsPorts),
makeTestEndpointSlice("ns1", "svc1", 2, initEndpointsPorts),
}

fp := newFakeProxier(v1.IPv4Protocol, time.Time{})
fp.hostname = testHostname

for _, epSlice := range currEndpointSlices {
fp.addEndpointSlice(epSlice)
}
fp.endpointsMap.Update(fp.endpointsChanges)

// only delete the first endpoint slice
fp.deleteEndpointSlice(currEndpointSlices[0])

fp.endpointsMap.Update(fp.endpointsChanges)

if len(fp.endpointsChanges.endpointSliceCache.trackerByServiceMap) != 1 {
t.Errorf("expected: endpointSliceCache to have one entries, got: %v", fp.endpointsChanges.endpointSliceCache.trackerByServiceMap)
}
}
20 changes: 20 additions & 0 deletions pkg/proxy/servicechangetracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,3 +900,23 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) {
t.Errorf("expected healthcheck ports length 0, got %v", healthCheckNodePorts)
}
}

func TestServiceCacheLeaks(t *testing.T) {
fp := newFakeProxier(v1.IPv4Protocol, time.Time{})

service := makeTestService("ns1", "svc1", func(svc *v1.Service) {
svc.Spec.Type = v1.ServiceTypeClusterIP
svc.Spec.ClusterIP = "172.16.55.4"
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p1", "UDP", 1234, 4321, 0)
svc.Spec.Ports = addTestPort(svc.Spec.Ports, "p2", "TCP", 1235, 5321, 0)
})
fp.addService(service)
if len(fp.serviceChanges.items) != 1 {
t.Errorf("Found %d items on the cache, 1 expected", len(fp.serviceChanges.items))
}

fp.deleteService(service)
if len(fp.serviceChanges.items) > 0 {
t.Errorf("Found %d items on the cache, 0 expected", len(fp.serviceChanges.items))
}
}

0 comments on commit aa931aa

Please sign in to comment.