Skip to content

Commit

Permalink
PR feedback - Pawel
Browse files Browse the repository at this point in the history
  • Loading branch information
mhrabovcin committed Jan 30, 2018
1 parent 3491e2a commit b86dade
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 18 deletions.
14 changes: 11 additions & 3 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ type Conn struct {
setWatchLimit int
setWatchCallback func([]*setWatchesRequest)

// Debug (for recurring re-auth hang)
// Debug (for recurring re-auth hang) test
// These variables shouldn't be used or modified as part of normal
// operation.
// See `TestRecurringReAuthHang`
debugCloseRecvLoop int32
debugReauthDone chan struct{}

Expand Down Expand Up @@ -432,7 +435,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {

for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel rer-submitting credentials")
c.logger.Printf("Cancel re-submitting credentials")
return
}
resChan, err := c.sendRequest(
Expand Down Expand Up @@ -522,10 +525,15 @@ func (c *Conn) loop() {
<-reauthChan
// This condition exists for signaling purposes, that the test
// `TestRecurringReAuthHang` was successful. The previous call
// `<-reauthChan` was not blocking. That means the
// `<-reauthChan` did not block. That means the
// `resendZkAuth` didn't block even on read loop error.
// See `TestRecurringReAuthHang`
if c.shouldDebugCloseRecvLoop() {
// It is possible that during the test the ZK conn will try
// to reconnect multiple times before cleanly closing the
// test. This select here is to prevent closing
// `c.debugReauthDone` channel twice during the test and
// panic.
select {
case <-c.debugReauthDone:
default:
Expand Down
38 changes: 23 additions & 15 deletions zk/conn_test.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,35 @@
package zk

import (
"context"
"io/ioutil"
"testing"
"time"
)

func TestRecurringReAuthHang(t *testing.T) {
sessionTimeout := 2 * time.Second

finish := make(chan struct{})
defer close(finish)
go func() {
select {
case <-finish:
return
case <-time.After(5 * sessionTimeout):
panic("expected not hang")
}
}()

zkC, err := StartTestCluster(3, ioutil.Discard, ioutil.Discard)
if err != nil {
panic(err)
t.Fatal(err)
}
defer zkC.Stop()

conn, evtC, err := zkC.ConnectAll()
if err != nil {
panic(err)
t.Fatal(err)
}

ctx, cancel := context.WithDeadline(
context.Background(), time.Now().Add(5*time.Second))
defer cancel()
for conn.State() != StateHasSession {
time.Sleep(50 * time.Millisecond)

select {
case <-ctx.Done():
t.Fatal("Failed to connect to ZK")
default:
}
}

go func() {
Expand All @@ -47,9 +45,19 @@ func TestRecurringReAuthHang(t *testing.T) {
currentServer := conn.Server()
conn.setDebugCloseRecvLoop(true)
zkC.StopServer(currentServer)

// wait connect to new zookeeper.
ctx, cancel = context.WithDeadline(
context.Background(), time.Now().Add(5*time.Second))
defer cancel()
for conn.Server() == currentServer && conn.State() != StateHasSession {
time.Sleep(100 * time.Millisecond)

select {
case <-ctx.Done():
t.Fatal("Failed to reconnect ZK next server")
default:
}
}

<-conn.debugReauthDone
Expand Down

0 comments on commit b86dade

Please sign in to comment.