Skip to content

Commit

Permalink
Merge pull request kubernetes#64235 from liggitt/gc-resync-reattempt
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 62266, 64351, 64366, 64235, 64560). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Avoid deadlock in gc resync if available resources change during sync

retry GC sync if waiting for cache sync times out, without unpausing workers

viewing ignoring whitespace reveals the actual change:
https://github.com/kubernetes/kubernetes/pull/64235/files?w=1

xref kubernetes#61057 kubernetes#56446 (comment)

```release-note
fixes a potential deadlock in the garbage collection controller
```
  • Loading branch information
Kubernetes Submit Queue authored Jun 5, 2018
2 parents 9237670 + 7da3d65 commit 9fceab1
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 37 deletions.
113 changes: 82 additions & 31 deletions pkg/controller/garbagecollector/garbagecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -170,10 +171,8 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
newResources := GetDeletableResources(discoveryClient)

// This can occur if there is an internal error in GetDeletableResources.
// If the gc attempts to sync with 0 resources it will block forever.
// TODO: Implement a more complete solution for the garbage collector hanging.
if len(newResources) == 0 {
glog.V(5).Infof("no resources reported by discovery, skipping garbage collector sync")
glog.V(2).Infof("no resources reported by discovery, skipping garbage collector sync")
return
}

Expand All @@ -183,39 +182,61 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
return
}

// Something has changed, time to sync.
glog.V(2).Infof("syncing garbage collector with updated resources from discovery: %v", newResources)

// Ensure workers are paused to avoid processing events before informers
// have resynced.
gc.workerLock.Lock()
defer gc.workerLock.Unlock()

// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()

// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// sync period.
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
return
}
// TODO: WaitForCacheSync can block forever during normal operation. Could
// pass a timeout channel, but we have to consider the implications of
// un-pausing the GC with a partially synced graph builder.
if !controller.WaitForCacheSync("garbage collector", stopCh, gc.dependencyGraphBuilder.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync"))
return
}
// Once we get here, we should not unpause workers until we've successfully synced
attempt := 0
wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
attempt++

// On a reattempt, check if available resources have changed
if attempt > 1 {
newResources = GetDeletableResources(discoveryClient)
if len(newResources) == 0 {
glog.V(2).Infof("no resources reported by discovery (attempt %d)", attempt)
return false, nil
}
}

glog.V(2).Infof("syncing garbage collector with updated resources from discovery (attempt %d): %s", attempt, printDiff(oldResources, newResources))

// Resetting the REST mapper will also invalidate the underlying discovery
// client. This is a leaky abstraction and assumes behavior about the REST
// mapper, but we'll deal with it for now.
gc.restMapper.Reset()
glog.V(4).Infof("reset restmapper")

// Perform the monitor resync and wait for controllers to report cache sync.
//
// NOTE: It's possible that newResources will diverge from the resources
// discovered by restMapper during the call to Reset, since they are
// distinct discovery clients invalidated at different times. For example,
// newResources may contain resources not returned in the restMapper's
// discovery call if the resources appeared in-between the calls. In that
// case, the restMapper will fail to map some of newResources until the next
// attempt.
if err := gc.resyncMonitors(newResources); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors (attempt %d): %v", attempt, err))
return false, nil
}
glog.V(4).Infof("resynced monitors")

// wait for caches to fill for a while (our sync period) before attempting to rediscover resources and retry syncing.
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
// note that workers stay paused until we successfully resync.
if !controller.WaitForCacheSync("garbage collector", waitForStopOrTimeout(stopCh, period), gc.dependencyGraphBuilder.IsSynced) {
utilruntime.HandleError(fmt.Errorf("timed out waiting for dependency graph builder sync during GC sync (attempt %d)", attempt))
return false, nil
}

// success, break out of the loop
return true, nil
}, stopCh)

// Finally, keep track of our new state. Do this after all preceding steps
// have succeeded to ensure we'll retry on subsequent syncs if an error
Expand All @@ -225,6 +246,36 @@ func (gc *GarbageCollector) Sync(discoveryClient discovery.ServerResourcesInterf
}, period, stopCh)
}

// printDiff returns a human-readable summary of what resources were added and removed
func printDiff(oldResources, newResources map[schema.GroupVersionResource]struct{}) string {
removed := sets.NewString()
for oldResource := range oldResources {
if _, ok := newResources[oldResource]; !ok {
removed.Insert(fmt.Sprintf("%+v", oldResource))
}
}
added := sets.NewString()
for newResource := range newResources {
if _, ok := oldResources[newResource]; !ok {
added.Insert(fmt.Sprintf("%+v", newResource))
}
}
return fmt.Sprintf("added: %v, removed: %v", added.List(), removed.List())
}

// waitForStopOrTimeout returns a stop channel that closes when the provided stop channel closes or when the specified timeout is reached
func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan struct{} {
stopChWithTimeout := make(chan struct{})
go func() {
select {
case <-stopCh:
case <-time.After(timeout):
}
close(stopChWithTimeout)
}()
return stopChWithTimeout
}

func (gc *GarbageCollector) IsSynced() bool {
return gc.dependencyGraphBuilder.IsSynced()
}
Expand Down
49 changes: 45 additions & 4 deletions pkg/controller/garbagecollector/garbagecollector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -800,6 +800,15 @@ func TestGarbageCollectorSync(t *testing.T) {
},
},
}
unsyncableServerResources := []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
{Name: "pods", Namespaced: true, Kind: "Pod", Verbs: metav1.Verbs{"delete", "list", "watch"}},
{Name: "secrets", Namespaced: true, Kind: "Secret", Verbs: metav1.Verbs{"delete", "list", "watch"}},
},
},
}
fakeDiscoveryClient := &fakeServerResources{
PreferredResources: serverResources,
Error: nil,
Expand All @@ -813,6 +822,10 @@ func TestGarbageCollectorSync(t *testing.T) {
200,
[]byte("{}"),
},
"GET" + "/api/v1/secrets": {
404,
[]byte("{}"),
},
},
}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
Expand Down Expand Up @@ -849,7 +862,7 @@ func TestGarbageCollectorSync(t *testing.T) {
fmt.Printf("Test output")
time.Sleep(1 * time.Second)

err = expectSyncNotBlocked(fakeDiscoveryClient)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to be running but it is blocked: %v", err)
}
Expand All @@ -865,21 +878,49 @@ func TestGarbageCollectorSync(t *testing.T) {
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)

err = expectSyncNotBlocked(fakeDiscoveryClient)
err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}

// Simulate the discovery client returning a resource the restmapper can resolve, but will not sync caches
fakeDiscoveryClient.setPreferredResources(unsyncableServerResources)
fakeDiscoveryClient.setError(nil)

// Wait until sync discovers the change
time.Sleep(1 * time.Second)

// Put the resources back to normal and ensure garbage collector sync recovers
fakeDiscoveryClient.setPreferredResources(serverResources)
fakeDiscoveryClient.setError(nil)

err = expectSyncNotBlocked(fakeDiscoveryClient, &gc.workerLock)
if err != nil {
t.Fatalf("Expected garbagecollector.Sync to still be running but it is blocked: %v", err)
}
}

func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources) error {
func expectSyncNotBlocked(fakeDiscoveryClient *fakeServerResources, workerLock *sync.RWMutex) error {
before := fakeDiscoveryClient.getInterfaceUsedCount()
t := 1 * time.Second
time.Sleep(t)
after := fakeDiscoveryClient.getInterfaceUsedCount()
if before == after {
return fmt.Errorf("discoveryClient.ServerPreferredResources() called %d times over %v", after-before, t)
}
return nil

workerLockAcquired := make(chan struct{})
go func() {
workerLock.Lock()
workerLock.Unlock()
close(workerLockAcquired)
}()
select {
case <-workerLockAcquired:
return nil
case <-time.After(t):
return fmt.Errorf("workerLock blocked for at least %v", t)
}
}

type fakeServerResources struct {
Expand Down
4 changes: 3 additions & 1 deletion pkg/controller/garbagecollector/graph_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,11 +288,13 @@ func (gb *GraphBuilder) IsSynced() bool {
defer gb.monitorLock.Unlock()

if len(gb.monitors) == 0 {
glog.V(4).Info("garbage controller monitor not synced: no monitors")
return false
}

for _, monitor := range gb.monitors {
for resource, monitor := range gb.monitors {
if !monitor.controller.HasSynced() {
glog.V(4).Infof("garbage controller monitor not yet synced: %+v", resource)
return false
}
}
Expand Down
22 changes: 21 additions & 1 deletion staging/src/k8s.io/apimachinery/pkg/util/wait/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,32 @@ func PollImmediateInfinite(interval time.Duration, condition ConditionFunc) erro
// PollUntil tries a condition func until it returns true, an error or stopCh is
// closed.
//
// PolUntil always waits interval before the first run of 'condition'.
// PollUntil always waits interval before the first run of 'condition'.
// 'condition' will always be invoked at least once.
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
return WaitFor(poller(interval, 0), condition, stopCh)
}

// PollImmediateUntil tries a condition func until it returns true, an error or stopCh is closed.
//
// PollImmediateUntil runs the 'condition' before waiting for the interval.
// 'condition' will always be invoked at least once.
func PollImmediateUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
done, err := condition()
if err != nil {
return err
}
if done {
return nil
}
select {
case <-stopCh:
return ErrWaitTimeout
default:
return PollUntil(interval, condition, stopCh)
}
}

// WaitFunc creates a channel that receives an item every time a test
// should be executed and is closed when the last test should be invoked.
type WaitFunc func(done <-chan struct{}) <-chan struct{}
Expand Down

0 comments on commit 9fceab1

Please sign in to comment.