Skip to content

Commit

Permalink
Cancellable leader election with context
Browse files Browse the repository at this point in the history
  • Loading branch information
ash2k committed Jun 7, 2018
1 parent 1d99fff commit dc32a34
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 61 deletions.
12 changes: 8 additions & 4 deletions cmd/cloud-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package app

import (
"context"
"fmt"
"math/rand"
"net"
Expand Down Expand Up @@ -135,7 +136,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
}
}

run := func(stop <-chan struct{}) {
run := func(ctx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
Expand All @@ -151,13 +152,16 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
clientBuilder = rootClientBuilder
}

if err := startControllers(c, rootClientBuilder, clientBuilder, stop, cloud); err != nil {
if err := startControllers(c, rootClientBuilder, clientBuilder, ctx.Done(), cloud); err != nil {
glog.Fatalf("error running controllers: %v", err)
}
}

runCtx, cancel := context.WithCancel(context.Background())
defer cancel()

if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
run(runCtx)
panic("unreachable")
}

Expand All @@ -183,7 +187,7 @@ func Run(c *cloudcontrollerconfig.CompletedConfig) error {
}

// Try and become the leader and start cloud controller manager loops
leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{
leaderelection.RunOrDie(runCtx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,
Expand Down
12 changes: 8 additions & 4 deletions cmd/kube-controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ limitations under the License.
package app

import (
"context"
"fmt"
"io/ioutil"
"math/rand"
Expand Down Expand Up @@ -144,7 +145,7 @@ func Run(c *config.CompletedConfig) error {
}
}

run := func(stop <-chan struct{}) {
run := func(runCtx context.Context) {
rootClientBuilder := controller.SimpleControllerClientBuilder{
ClientConfig: c.Kubeconfig,
}
Expand All @@ -164,7 +165,7 @@ func Run(c *config.CompletedConfig) error {
} else {
clientBuilder = rootClientBuilder
}
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, stop)
ctx, err := CreateControllerContext(c, rootClientBuilder, clientBuilder, runCtx.Done())
if err != nil {
glog.Fatalf("error building controller context: %v", err)
}
Expand All @@ -180,8 +181,11 @@ func Run(c *config.CompletedConfig) error {
select {}
}

runCtx, cancel := context.WithCancel(context.Background())
defer cancel()

if !c.ComponentConfig.GenericComponent.LeaderElection.LeaderElect {
run(wait.NeverStop)
run(runCtx)
panic("unreachable")
}

Expand All @@ -204,7 +208,7 @@ func Run(c *config.CompletedConfig) error {
glog.Fatalf("error creating lock: %v", err)
}

leaderelection.RunOrDie(wait.NeverStop, leaderelection.LeaderElectionConfig{
leaderelection.RunOrDie(runCtx, leaderelection.LeaderElectionConfig{
Lock: rl,
LeaseDuration: c.ComponentConfig.GenericComponent.LeaderElection.LeaseDuration.Duration,
RenewDeadline: c.ComponentConfig.GenericComponent.LeaderElection.RenewDeadline.Duration,
Expand Down
7 changes: 5 additions & 2 deletions cmd/kube-scheduler/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ limitations under the License.
package app

import (
"context"
"fmt"
"io"
"io/ioutil"
Expand Down Expand Up @@ -189,7 +190,9 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
// If leader election is enabled, run via LeaderElector until done and exit.
if c.LeaderElection != nil {
c.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
OnStartedLeading: run,
OnStartedLeading: func(ctx context.Context) {
run(ctx.Done())
},
OnStoppedLeading: func() {
utilruntime.HandleError(fmt.Errorf("lost master"))
},
Expand All @@ -199,7 +202,7 @@ func Run(c schedulerserverconfig.CompletedConfig, stopCh <-chan struct{}) error
return fmt.Errorf("couldn't create leader elector: %v", err)
}

leaderElector.Run(stopCh)
leaderElector.Run(context.TODO())

return fmt.Errorf("lost lease")
}
Expand Down
76 changes: 25 additions & 51 deletions staging/src/k8s.io/client-go/tools/leaderelection/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ limitations under the License.
package leaderelection

import (
"context"
"fmt"
"reflect"
"sync"
"time"

"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -120,7 +120,7 @@ type LeaderElectionConfig struct {
// * OnChallenge()
type LeaderCallbacks struct {
// OnStartedLeading is called when a LeaderElector client starts leading
OnStartedLeading func(stop <-chan struct{})
OnStartedLeading func(context.Context)
// OnStoppedLeading is called when a LeaderElector client stops leading
OnStoppedLeading func()
// OnNewLeader is called when the client observes a leader that is
Expand All @@ -146,28 +146,28 @@ type LeaderElector struct {
}

// Run starts the leader election loop
func (le *LeaderElector) Run(stop <-chan struct{}) {
func (le *LeaderElector) Run(ctx context.Context) {
defer func() {
runtime.HandleCrash()
le.config.Callbacks.OnStoppedLeading()
}()
if !le.acquire(stop) {
return // stop signalled done
if !le.acquire(ctx) {
return // ctx signalled done
}
internalStop := make(chan struct{})
defer close(internalStop)
go le.config.Callbacks.OnStartedLeading(internalStop)
le.renew(stop)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go le.config.Callbacks.OnStartedLeading(ctx)
le.renew(ctx)
}

// RunOrDie starts a client with the provided config or panics if the config
// fails to validate.
func RunOrDie(stop <-chan struct{}, lec LeaderElectionConfig) {
func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
le, err := NewLeaderElector(lec)
if err != nil {
panic(err)
}
le.Run(stop)
le.Run(ctx)
}

// GetLeader returns the identity of the last observed leader or returns the empty string if
Expand All @@ -182,17 +182,10 @@ func (le *LeaderElector) IsLeader() bool {
}

// acquire loops calling tryAcquireOrRenew and returns true immediately when tryAcquireOrRenew succeeds.
// Returns false if stop signals done.
func (le *LeaderElector) acquire(stop <-chan struct{}) bool {
tmpStop := make(chan struct{})
once := sync.Once{}
go func() {
select {
case <-stop:
once.Do(func() { close(tmpStop) })
case <-tmpStop:
}
}()
// Returns false if ctx signals done.
func (le *LeaderElector) acquire(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
succeeded := false
desc := le.config.Lock.Describe()
glog.Infof("attempting to acquire leader lease %v...", desc)
Expand All @@ -205,41 +198,22 @@ func (le *LeaderElector) acquire(stop <-chan struct{}) bool {
}
le.config.Lock.RecordEvent("became leader")
glog.Infof("successfully acquired lease %v", desc)
once.Do(func() { close(tmpStop) })
}, le.config.RetryPeriod, JitterFactor, true, tmpStop)
cancel()
}, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
return succeeded
}

// renew loops calling tryAcquireOrRenew and returns immediately when tryAcquireOrRenew fails or ctx signals done.
func (le *LeaderElector) renew(stop <-chan struct{}) {
tmpStop := make(chan struct{})
once := sync.Once{}
go func() {
select {
case <-stop:
once.Do(func() { close(tmpStop) })
case <-tmpStop:
}
}()
func (le *LeaderElector) renew(ctx context.Context) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
wait.Until(func() {
// PollUntil() sleeps for "interval" duration before calling the function so we need to increase the timeout by le.config.RetryPeriod
t := time.NewTimer(le.config.RetryPeriod + le.config.RenewDeadline)
defer t.Stop()
internalStop := make(chan struct{})
internalOnce := sync.Once{}
defer internalOnce.Do(func() { close(internalStop) })
go func() {
select {
case <-tmpStop:
internalOnce.Do(func() { close(internalStop) })
case <-t.C:
internalOnce.Do(func() { close(internalStop) })
case <-internalStop:
}
}()
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RetryPeriod+le.config.RenewDeadline)
defer timeoutCancel()
err := wait.PollUntil(le.config.RetryPeriod, func() (bool, error) {
return le.tryAcquireOrRenew(), nil
}, internalStop)
}, timeoutCtx.Done())
le.maybeReportTransition()
desc := le.config.Lock.Describe()
if err == nil {
Expand All @@ -248,8 +222,8 @@ func (le *LeaderElector) renew(stop <-chan struct{}) {
}
le.config.Lock.RecordEvent("stopped leading")
glog.Infof("failed to renew lease %v: %v", desc, err)
once.Do(func() { close(tmpStop) })
}, 0, tmpStop)
cancel()
}, 0, ctx.Done())
}

// tryAcquireOrRenew tries to acquire a leader lease if it is not already acquired,
Expand Down

0 comments on commit dc32a34

Please sign in to comment.