Skip to content

Commit

Permalink
Merge pull request #87 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Connection logic improvements
  • Loading branch information
samuel committed Oct 1, 2015
2 parents 177002e + e37691e commit 5f85ef9
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 73 deletions.
200 changes: 164 additions & 36 deletions zk/cluster_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
package zk

import (
"fmt"
"strings"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -46,65 +45,125 @@ func TestBasicCluster(t *testing.T) {
}
}

// If the current leader dies, then the session is reestablished with the new one.
func TestClientClusterFailover(t *testing.T) {
ts, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer ts.Stop()
zk, evCh, err := ts.ConnectAll()
defer tc.Stop()
zk, evCh, err := tc.ConnectAll()
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()

hasSession := make(chan string, 1)
go func() {
for ev := range evCh {
if ev.Type == EventSession && ev.State == StateHasSession {
select {
case hasSession <- ev.Server:
default:
}
}
}
}()
sl := NewStateLogger(evCh)

waitSession := func() string {
select {
case srv := <-hasSession:
return srv
case <-time.After(time.Second * 8):
t.Fatal("Failed to connect and get a session")
}
return ""
hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second)
if hasSessionEvent1 == nil {
t.Fatalf("Failed to connect and get session")
}

srv := waitSession()
if _, err := zk.Create("/gozk-test", []byte("foo-cluster"), 0, WorldACL(PermAll)); err != nil {
t.Fatalf("Create failed on node 1: %+v", err)
}

stopped := false
for _, s := range ts.Servers {
if strings.HasSuffix(srv, fmt.Sprintf(":%d", s.Port)) {
s.Srv.Stop()
stopped = true
break
}
}
if !stopped {
t.Fatal("Failed to stop server")
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))

// Kill the current leader
tc.StopServer(hasSessionEvent1.Server)

// Wait for the session to be reconnected with the new leader.
hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionWatcher2 == nil {
t.Fatalf("Failover failed")
}

waitSession()
if by, _, err := zk.Get("/gozk-test"); err != nil {
t.Fatalf("Get failed on node 2: %+v", err)
} else if string(by) != "foo-cluster" {
t.Fatal("Wrong data for node 2")
}
}

// If a ZooKeeper cluster looses quorum then a session is reconnected as soon
// as the quorum is restored.
func TestNoQuorum(t *testing.T) {
tc, err := StartTestCluster(3, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
t.Fatal(err)
}
defer tc.Stop()
zk, evCh, err := tc.ConnectAllTimeout(4 * time.Second)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk.Close()
sl := NewStateLogger(evCh)

// Wait for initial session to be established
hasSessionEvent1 := sl.NewWatcher(sessionStateMatcher(StateHasSession)).Wait(8 * time.Second)
if hasSessionEvent1 == nil {
t.Fatalf("Failed to connect and get session")
}
initialSessionID := zk.sessionID
DefaultLogger.Printf(" Session established: id=%d, timeout=%d", zk.sessionID, zk.sessionTimeoutMs)

// Kill the ZooKeeper leader and wait for the session to reconnect.
DefaultLogger.Printf(" Kill the leader")
hasSessionWatcher2 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StopServer(hasSessionEvent1.Server)
hasSessionEvent2 := hasSessionWatcher2.Wait(8 * time.Second)
if hasSessionEvent2 == nil {
t.Fatalf("Failover failed")
}

// Kill the ZooKeeper leader leaving the cluster without quorum.
DefaultLogger.Printf(" Kill the leader")
tc.StopServer(hasSessionEvent2.Server)

// Make sure that we keep retrying connecting to the only remaining
// ZooKeeper server, but the attempts are being dropped because there is
// no quorum.
DefaultLogger.Printf(" Retrying no luck...")
var firstDisconnect *Event
begin := time.Now()
for time.Now().Sub(begin) < 6*time.Second {
disconnectedEvent := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(4 * time.Second)
if disconnectedEvent == nil {
t.Fatalf("Disconnected event expected")
}
if firstDisconnect == nil {
firstDisconnect = disconnectedEvent
continue
}
if disconnectedEvent.Server != firstDisconnect.Server {
t.Fatalf("Disconnect from wrong server: expected=%s, actual=%s",
firstDisconnect.Server, disconnectedEvent.Server)
}
}

// Start a ZooKeeper node to restore quorum.
hasSessionWatcher3 := sl.NewWatcher(sessionStateMatcher(StateHasSession))
tc.StartServer(hasSessionEvent1.Server)

// Make sure that session is reconnected with the same ID.
hasSessionEvent3 := hasSessionWatcher3.Wait(8 * time.Second)
if hasSessionEvent3 == nil {
t.Fatalf("Session has not been reconnected")
}
if zk.sessionID != initialSessionID {
t.Fatalf("Wrong session ID: expected=%d, actual=%d", initialSessionID, zk.sessionID)
}

// Make sure that the session is not dropped soon after reconnect
e := sl.NewWatcher(sessionStateMatcher(StateDisconnected)).Wait(6 * time.Second)
if e != nil {
t.Fatalf("Unexpected disconnect")
}
}

func TestWaitForClose(t *testing.T) {
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
if err != nil {
Expand Down Expand Up @@ -164,3 +223,72 @@ func TestBadSession(t *testing.T) {
t.Fatalf("Delete returned error: %+v", err)
}
}

type EventLogger struct {
events []Event
watchers []*EventWatcher
lock sync.Mutex
wg sync.WaitGroup
}

func NewStateLogger(eventCh <-chan Event) *EventLogger {
el := &EventLogger{}
el.wg.Add(1)
go func() {
defer el.wg.Done()
for event := range eventCh {
el.lock.Lock()
for _, sw := range el.watchers {
if !sw.triggered && sw.matcher(event) {
sw.triggered = true
sw.matchCh <- event
}
}
DefaultLogger.Printf(" event received: %v\n", event)
el.events = append(el.events, event)
el.lock.Unlock()
}
}()
return el
}

func (el *EventLogger) NewWatcher(matcher func(Event) bool) *EventWatcher {
ew := &EventWatcher{matcher: matcher, matchCh: make(chan Event, 1)}
el.lock.Lock()
el.watchers = append(el.watchers, ew)
el.lock.Unlock()
return ew
}

func (el *EventLogger) Events() []Event {
el.lock.Lock()
transitions := make([]Event, len(el.events))
copy(transitions, el.events)
el.lock.Unlock()
return transitions
}

func (el *EventLogger) Wait4Stop() {
el.wg.Wait()
}

type EventWatcher struct {
matcher func(Event) bool
matchCh chan Event
triggered bool
}

func (ew *EventWatcher) Wait(timeout time.Duration) *Event {
select {
case event := <-ew.matchCh:
return &event
case <-time.After(timeout):
return nil
}
}

func sessionStateMatcher(s State) func(Event) bool {
return func(e Event) bool {
return e.Type == EventSession && e.State == s
}
}
Loading

0 comments on commit 5f85ef9

Please sign in to comment.