Skip to content

Commit

Permalink
Merge pull request docker-archive#2812 from dperny/fix-no-elected-man…
Browse files Browse the repository at this point in the history
…ager

Fix No Elected Primary Cluster Manager
  • Loading branch information
nishanttotla authored Dec 26, 2017
2 parents e1af606 + b496a39 commit 3ae6a3c
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 10 deletions.
102 changes: 96 additions & 6 deletions api/replica.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
package api

import (
// we have to import this as `gocontext` because there is already a type
// called `context` declared in primary.go
gocontext "context"

"crypto/tls"
"fmt"
"net/http"
"strings"
"sync"
)

var localRoutes = []string{"/_ping", "/info", "/debug"}
Expand All @@ -14,20 +19,98 @@ type Replica struct {
handler http.Handler
tlsConfig *tls.Config
primary string
// the address of this replica
addr string
// mu is the mutex used for locking the primary.
mu sync.RWMutex
// primaryWait is used to block execution of ServeHTTP until the primary is
// ready.
primaryWait *sync.Cond

// capture the hijack function for easier unit testing
hijack func(*tls.Config, string, http.ResponseWriter, *http.Request) error
}

// NewReplica creates a new API replica.
func NewReplica(handler http.Handler, tlsConfig *tls.Config) *Replica {
return &Replica{
func NewReplica(handler http.Handler, tlsConfig *tls.Config, addr string) *Replica {
r := &Replica{
handler: handler,
tlsConfig: tlsConfig,
addr: addr,
hijack: hijack,
}
// This seems to be a big confusing, so here's the explanation:
// A Cond is used to wait for a certain value to happen. You use it by:
//
// 1. Acquiring a lock (cond.L.Lock())
// 2. Checking the synchronized value
// 3. Calling cond.Wait() if the value isn't what you want
// a. wait releases the lock (cond.L.Unlock()) when it is entered
// b. wait reacquires the lock (cond.L.Lock()) when it is left.
// 4. Checking the value again (it may have changed!)
// 5. Doing your business
// 6. Releasing the lock (cond.L.Unlock())
//
// The issue here is that for optimal performance, because the ServeHTTP
// function only reads the primary value, we want to use a RWMutex, so that
// getPrimary can acquire a RLock and not block other concurrent handlers,
// but SetPrimary can acquire a write lock `Lock` to modify that value.
//
// NewCond takes a `Locker` interface, which implements `Lock` and
// `Unlock`, and doesn't have any conception of RLock/RUnlock. But we need
// those methods so that `r.primaryWait.Wait()` releases and reacquires the
// correct lock, the Read lock.
//
// The way we do this is by passing the return of the `RLocker` method.
// This returned object has `Lock` and `Unlock` methods which coorespond to
// the `RLock` and `RUnlock` methods of the RWMutex. This means that calls
// to `Wait` will acquire and release the read lock, not the write lock.
//
// This way, `SetPrimary` acquires and releases the WRITE lock on mu
// (mu.Lock) and `getPrimary` acquires and releases a READ lock on mu
// (r.primaryWait.L.Lock).
r.primaryWait = sync.NewCond(r.mu.RLocker())
return r
}

// SetPrimary sets the address of the primary Swarm manager
func (p *Replica) SetPrimary(primary string) {
// FIXME: We have to kill current connections before doing this.
// this lock on the mutex is a write lock
p.mu.Lock()
defer p.mu.Unlock()
p.primary = primary
if p.primary != "" {
// broadcast to wake everyone waiting on a nonempty primary
p.primaryWait.Broadcast()
}
}

// getPrimary returns the primary if it exists, blocks if it doesn't, and
// returns an error if the context is canceled. It's packed nicely in a method
// to cleanly encapsulate synchronized part of ServeHTTP.
func (p *Replica) getPrimary(ctx gocontext.Context) (string, error) {
// lock a read on primary.
//
// use primaryWait.L just to ensure we're locking and unlocking the correct
// lock, so if we change the lock type later, we can't accidentally forget
// to update which lock is acquired and released by the Wait.
//
// see the description in NewReplica for why this is a read lock
p.primaryWait.L.Lock()
defer p.primaryWait.L.Unlock()
// check if the primary is empty, and launch into a wait if it is
for p.primary == "" {
// give up waiting if the context is canceled, to avoid blocking forever
select {
case <-ctx.Done():
return "", fmt.Errorf("No elected primary cluster manager: %v", ctx.Err())
default:
}
// wait for the primary to become available
p.primaryWait.Wait()
}
return p.primary, nil
}

// ServeHTTP is the http.Handler.
Expand All @@ -41,12 +124,19 @@ func (p *Replica) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// Otherwise, forward.
if p.primary == "" {
httpError(w, "No elected primary cluster manager", http.StatusInternalServerError)
return
primary, err := p.getPrimary(r.Context())
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
}

if err := hijack(p.tlsConfig, p.primary, w, r); err != nil {
// if we've become the primary, go ahead and serve the request ourself.
// this happens if we're waiting for a primary and then we become the
// primary
if primary == p.addr {
p.handler.ServeHTTP(w, r)
return
}
if err := p.hijack(p.tlsConfig, primary, w, r); err != nil {
httpError(w, fmt.Sprintf("Unable to reach primary cluster manager (%s): %v", err, p.primary), http.StatusInternalServerError)
}
}
98 changes: 98 additions & 0 deletions api/replica_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package api

import (
// weird imports for testing
"net/http/httptest"
"sync/atomic"
"testing"

// we need to import context under a different name
gocontext "context"
"crypto/tls"
"net/http"
"time"
)

// ContextCheckedEnsurer is a wrapper around context that allows the user to
// ensure that the Done method has been called
type ContextCheckedEnsurer struct {
gocontext.Context
// checked is the number of times Done has been called
checked int64
}

// Done increments the "checked" value and then passes on the underlying
// context's Done method
func (c *ContextCheckedEnsurer) Done() <-chan struct{} {
// atomic addition so we don't race on done. this isn't actually important
// right now but it's better to do things the right way the first time
atomic.AddInt64(&(c.checked), 1)
return c.Context.Done()
}

// Checked returns the number of times Done has been called
func (c *ContextCheckedEnsurer) Checked() int64 {
return atomic.LoadInt64(&(c.checked))
}

// TestReplicaBlockedPrimary checks that an empty primary causes ServeHTTP to
// block until the primary is no longer empty
func TestReplicaBlockedPrimary(t *testing.T) {
// create a new replica
r := NewReplica(http.NotFoundHandler(), &tls.Config{}, "ouraddr")
hijackedprimary := ""
// set a fake hijack method. add a sentinal so we can know it's been called
r.hijack = func(_ *tls.Config, primary string, _ http.ResponseWriter, _ *http.Request) error {
hijackedprimary = primary
return nil
}

// create a dummy request. we can use a nil body because we never actually
// read the body in ServeHTTP
req, err := http.NewRequest("GET", "/whatever", nil)
if err != nil {
t.Fatal("error creating an HTTP request")
}
rctx, cancel := gocontext.WithCancel(req.Context())
// defer cancel so that we don't leave a dangling goroutine if the tests
// run for a long time
defer cancel()
ctx := &ContextCheckedEnsurer{rctx, 0}
// switch the request context for a ContextCheckedEnsurer so we can
req = req.WithContext(ctx)

rec := httptest.NewRecorder()

// SetPrimary has never been called, so ServeHTTP should block.
// we can pass a nil ResponseWriter because we don't ever write to it
// finished
called := make(chan struct{})
go func() {
r.ServeHTTP(rec, req)
close(called)
}()
// wait until we've checked the context at least once, which tells us we've
// entered the wait on ServeHTTP
for ctx.Checked() == 0 {
time.Sleep(100 * time.Millisecond)
// yeild the proc so that ServeHTTP has a chance to run
}
// make sure hijack hasn't been called
select {
case <-called:
t.Fatal("ServeHTTP did not block")
default:
}

// update the primary and make sure that it has unblocked ServeHTTP
primary := "someprimary"
r.SetPrimary(primary)
select {
case <-called:
if hijackedprimary != primary {
t.Fatalf("expected primary %q to be used, but got %q", primary, hijackedprimary)
}
case <-time.After(10 * time.Second):
t.Fatal("ServeHTTP was still blocked after 10 seconds")
}
}
5 changes: 1 addition & 4 deletions cli/manage.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func getCandidateAndFollower(discovery discovery.Backend, addr string, leaderTTL

func setupReplication(c *cli.Context, cluster cluster.Cluster, server *api.Server, candidate *leadership.Candidate, follower *leadership.Follower, addr string, tlsConfig *tls.Config) {
primary := api.NewPrimary(cluster, tlsConfig, &statusHandler{cluster, candidate, follower}, c.GlobalBool("debug"), c.Bool("cors"))
replica := api.NewReplica(primary, tlsConfig)
replica := api.NewReplica(primary, tlsConfig, addr)

go func() {
for {
Expand Down Expand Up @@ -198,9 +198,6 @@ func follow(follower *leadership.Follower, replica *api.Replica, addr string) {
case leader := <-leaderCh:
if leader == "" {
continue
}
if leader == addr {
replica.SetPrimary("")
} else {
log.Infof("New leader elected: %s", leader)
replica.SetPrimary(leader)
Expand Down

0 comments on commit 3ae6a3c

Please sign in to comment.