Skip to content

Commit

Permalink
Merge pull request samuel#30 from samuel/error-on-connect-fail
Browse files Browse the repository at this point in the history
Return an error for pending request on failed connect
  • Loading branch information
samuel committed May 26, 2014
2 parents 0718845 + 39be937 commit 0417003
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 0 deletions.
15 changes: 15 additions & 0 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Possible watcher events:
import (
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
"io"
"log"
Expand All @@ -22,6 +23,8 @@ import (
"time"
)

var ErrNoServer = errors.New("zk: could not connect to a server")

const (
bufferSize = 1536 * 1024
eventChanSize = 6
Expand Down Expand Up @@ -181,6 +184,7 @@ func (c *Conn) connect() {

c.serverIndex = (c.serverIndex + 1) % len(c.servers)
if c.serverIndex == startIndex {
c.flushUnsentRequests(ErrNoServer)
time.Sleep(time.Second)
}
}
Expand Down Expand Up @@ -248,6 +252,17 @@ func (c *Conn) loop() {
}
}

func (c *Conn) flushUnsentRequests(err error) {
for {
select {
default:
return
case req := <-c.sendChan:
req.recvChan <- response{-1, err}
}
}
}

// Send error to all pending requests and clear request map
func (c *Conn) flushRequests(err error) {
c.requestsLock.Lock()
Expand Down
25 changes: 25 additions & 0 deletions zk/zk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,3 +371,28 @@ func TestExpiringWatch(t *testing.T) {
t.Fatal("Child watcher timed out")
}
}

func TestRequestFail(t *testing.T) {
// If connecting fails to all servers in the list then pending requests
// should be errored out so they don't hang forever.

zk, _, err := Connect([]string{"127.0.0.1:32444"}, time.Second*15)
if err != nil {
t.Fatal(err)
}
defer zk.Close()

ch := make(chan error)
go func() {
_, _, err := zk.Get("/blah")
ch <- err
}()
select {
case err := <-ch:
if err == nil {
t.Fatal("Expected non-nil error on failed request due to connection failure")
}
case <-time.After(time.Second * 2):
t.Fatal("Get hung when connection could not be made")
}
}

0 comments on commit 0417003

Please sign in to comment.