Skip to content

Commit

Permalink
Merge pull request kubernetes#33359 from shashidharatd/federation
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue

Fix goroutine leak in federation service controller

<!--  Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, read our contributor guidelines https://github.com/kubernetes/kubernetes/blob/master/CONTRIBUTING.md and developer guide https://github.com/kubernetes/kubernetes/blob/master/docs/devel/development.md
2. If you want *faster* PR reviews, read how: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/faster_reviews.md
3. Follow the instructions for writing a release note: https://github.com/kubernetes/kubernetes/blob/master/docs/devel/pull-requests.md#release-notes
-->

**What this PR does / why we need it**: Fixes a memory leak

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes kubernetes#33186

**Special notes for your reviewer**: Every second new goroutines are created and are getting blocked waiting for the lock in the event queue. only one worker will get a lock when there are some events to process, so all the goroutines which are created every second waits for the lock forever and causes the memory/goroutine leak.

As a fix the new worker will be created only when there is no worker exist. and only one worker per cluster either waits for the event or processes all the events and goes out of existence.

```release-note
Fixes memory/goroutine leak in Federation Service controller.
```
  • Loading branch information
Kubernetes Submit Queue authored Sep 24, 2016
2 parents 9455b1d + 690a06b commit 46c36fc
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 3 deletions.
23 changes: 22 additions & 1 deletion federation/pkg/federation-controller/service/endpoint_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,34 @@ import (
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterEndpointWorker() {
fedClient := sc.federationClient
// process all pending events in endpointWorkerDoneChan
ForLoop:
for {
select {
case clusterName := <-sc.endpointWorkerDoneChan:
sc.endpointWorkerMap[clusterName] = false
default:
// non-blocking, comes here if all existing events are processed
break ForLoop
}
}

for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, found := sc.endpointWorkerMap[clusterName]
if found && workerExist {
continue
}

// create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient
for {
func() {
key, quit := cache.endpointQueue.Get()
// update endpoint cache
if quit {
// send signal that current worker has finished tasks and is going out of scope
sc.endpointWorkerDoneChan <- clusterName
return
}
defer cache.endpointQueue.Done(key)
Expand All @@ -49,6 +69,7 @@ func (sc *ServiceController) clusterEndpointWorker() {
}()
}
}(cache, clusterName)
sc.endpointWorkerMap[clusterName] = true
}
}

Expand Down
25 changes: 23 additions & 2 deletions federation/pkg/federation-controller/service/service_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,44 @@ import (
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (sc *ServiceController) clusterServiceWorker() {
fedClient := sc.federationClient
// process all pending events in serviceWorkerDoneChan
ForLoop:
for {
select {
case clusterName := <-sc.serviceWorkerDoneChan:
sc.serviceWorkerMap[clusterName] = false
default:
// non-blocking, comes here if all existing events are processed
break ForLoop
}
}

for clusterName, cache := range sc.clusterCache.clientMap {
workerExist, found := sc.serviceWorkerMap[clusterName]
if found && workerExist {
continue
}

// create a worker only if the previous worker has finished and gone out of scope
go func(cache *clusterCache, clusterName string) {
fedClient := sc.federationClient
for {
func() {
key, quit := cache.serviceQueue.Get()
defer cache.serviceQueue.Done(key)
if quit {
// send signal that current worker has finished tasks and is going out of scope
sc.serviceWorkerDoneChan <- clusterName
return
}
defer cache.serviceQueue.Done(key)
err := sc.clusterCache.syncService(key.(string), clusterName, cache, sc.serviceCache, fedClient, sc)
if err != nil {
glog.Errorf("Failed to sync service: %+v", err)
}
}()
}
}(cache, clusterName)
sc.serviceWorkerMap[clusterName] = true
}
}

Expand Down
17 changes: 17 additions & 0 deletions federation/pkg/federation-controller/service/servicecontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ const (
UserAgentName = "federation-service-controller"
KubeAPIQPS = 20.0
KubeAPIBurst = 30

maxNoOfClusters = 100
)

type cachedService struct {
Expand Down Expand Up @@ -118,6 +120,16 @@ type ServiceController struct {
// services that need to be synced
queue *workqueue.Type
knownClusterSet sets.String
// endpoint worker map contains all the clusters registered with an indication that worker exist
// key clusterName
endpointWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
endpointWorkerDoneChan chan string
// service worker map contains all the clusters registered with an indication that worker exist
// key clusterName
serviceWorkerMap map[string]bool
// channel for worker to signal that it is going out of existence
serviceWorkerDoneChan chan string
}

// New returns a new service controller to keep DNS provider service resources
Expand Down Expand Up @@ -205,6 +217,11 @@ func New(federationClient federation_release_1_4.Interface, dns dnsprovider.Inte
},
},
)

s.endpointWorkerMap = make(map[string]bool)
s.serviceWorkerMap = make(map[string]bool)
s.endpointWorkerDoneChan = make(chan string, maxNoOfClusters)
s.serviceWorkerDoneChan = make(chan string, maxNoOfClusters)
return s
}

Expand Down

0 comments on commit 46c36fc

Please sign in to comment.