Skip to content

Commit

Permalink
ZK 3.5 part 2 - Adding reconfig and incremental reconfig apis
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffbean authored and nemith committed Jul 27, 2019
1 parent 6a19f63 commit b879e54
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 123 deletions.
6 changes: 3 additions & 3 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ func TestBasicCluster(t *testing.T) {
t.Fatal(err)
}
defer ts.Stop()
zk1, err := ts.Connect(0)
zk1, _, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
defer zk1.Close()
zk2, err := ts.Connect(1)
zk2, _, err := ts.Connect(1)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func TestWaitForClose(t *testing.T) {
t.Fatal(err)
}
defer ts.Stop()
zk, err := ts.Connect(0)
zk, _, err := ts.Connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
Expand Down
62 changes: 48 additions & 14 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,11 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
defer close(reauthReadyChan)

if c.logInfo {
c.logger.Printf("Re-submitting `%d` credentials after reconnect",
len(c.creds))
c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds))
}

for _, cred := range c.creds {
if shouldCancel() {
c.logger.Printf("Cancel rer-submitting credentials")
return
}
resChan, err := c.sendRequest(
Expand All @@ -431,7 +429,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
nil)

if err != nil {
c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err)
c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err)
// FIXME(prozlach): lets ignore errors for now
continue
}
Expand All @@ -440,14 +438,14 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) {
select {
case res = <-resChan:
case <-c.closeChan:
c.logger.Printf("Recv closed, cancel re-submitting credentials")
c.logger.Printf("recv closed, cancel re-submitting credentials")
return
case <-c.shouldQuit:
c.logger.Printf("Should quit, cancel re-submitting credentials")
c.logger.Printf("should quit, cancel re-submitting credentials")
return
}
if res.err != nil {
c.logger.Printf("Credential re-submit failed: %s", res.err)
c.logger.Printf("credential re-submit failed: %s", res.err)
// FIXME(prozlach): lets ignore errors for now
continue
}
Expand Down Expand Up @@ -489,14 +487,14 @@ func (c *Conn) loop() {
err := c.authenticate()
switch {
case err == ErrSessionExpired:
c.logger.Printf("Authentication failed: %s", err)
c.logger.Printf("authentication failed: %s", err)
c.invalidateWatches(err)
case err != nil && c.conn != nil:
c.logger.Printf("Authentication failed: %s", err)
c.logger.Printf("authentication failed: %s", err)
c.conn.Close()
case err == nil:
if c.logInfo {
c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs)
}
c.hostProvider.Connected() // mark success
c.closeChan = make(chan struct{}) // channel to tell send loop stop
Expand All @@ -511,7 +509,7 @@ func (c *Conn) loop() {
}
err := c.sendLoop()
if err != nil || c.logInfo {
c.logger.Printf("Send loop terminated: err=%v", err)
c.logger.Printf("send loop terminated: err=%v", err)
}
c.conn.Close() // causes recv loop to EOF/exit
wg.Done()
Expand All @@ -526,7 +524,7 @@ func (c *Conn) loop() {
err = c.recvLoop(c.conn)
}
if err != io.EOF || c.logInfo {
c.logger.Printf("Recv loop terminated: err=%v", err)
c.logger.Printf("recv loop terminated: err=%v", err)
}
if err == nil {
panic("zk: recvLoop should never return nil error")
Expand Down Expand Up @@ -826,10 +824,12 @@ func (c *Conn) recvLoop(conn net.Conn) error {
buf := make([]byte, sz)
for {
// package length
conn.SetReadDeadline(time.Now().Add(c.recvTimeout))
if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil {
c.logger.Printf("failed to set connection deadline: %v", err)
}
_, err := io.ReadFull(conn, buf[:4])
if err != nil {
return err
return fmt.Errorf("failed to read from connection: %v", err)
}

blen := int(binary.BigEndian.Uint32(buf[:4]))
Expand Down Expand Up @@ -1280,6 +1280,40 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) {
return mr, err
}

// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers
// by lists of members.
// Return the new configuration stats.
// TODO: expose and return the config znode itself like the Java client does.
func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) {
// TODO: validate the shape of the member string to give early feedback.
request := &reconfigRequest{
JoiningServers: []byte(strings.Join(joining, ",")),
LeavingServers: []byte(strings.Join(leaving, ",")),
CurConfigId: version,
}

return c.internalReconfig(request)
}

// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided
// is the entire new member list.
// the optional version allows for conditional reconfigurations, -1 ignores the condition.
// TODO: expose and return the config znode itself like the Java client does.
func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) {
request := &reconfigRequest{
NewMembers: []byte(strings.Join(members, ",")),
CurConfigId: version,
}

return c.internalReconfig(request)
}

func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) {
response := &reconfigReponse{}
_, err := c.request(opReconfig, request, response, nil)
return &response.Stat, err
}

// Server returns the current or last-connected server name.
func (c *Conn) Server() string {
c.serverMu.Lock()
Expand Down
25 changes: 17 additions & 8 deletions constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package zk

import (
"errors"
"fmt"
)

const (
Expand All @@ -25,6 +26,7 @@ const (
opGetChildren2 = 12
opCheck = 13
opMulti = 14
opReconfig = 16
opClose = -11
opSetAuth = 100
opSetWatches = 101
Expand Down Expand Up @@ -92,7 +94,7 @@ func (s State) String() string {
if name := stateNames[s]; name != "" {
return name
}
return "Unknown"
return "unknown state"
}

type ErrCode int32
Expand All @@ -113,8 +115,10 @@ var (
ErrClosing = errors.New("zk: zookeeper is closing")
ErrNothing = errors.New("zk: no server responsees to process")
ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored")

ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled")
ErrBadArguments = errors.New("invalid arguments")
// ErrInvalidCallback = errors.New("zk: invalid callback specified")

errCodeToError = map[ErrCode]error{
0: nil,
errAPIError: ErrAPIError,
Expand All @@ -126,19 +130,21 @@ var (
errNotEmpty: ErrNotEmpty,
errSessionExpired: ErrSessionExpired,
// errInvalidCallback: ErrInvalidCallback,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
errInvalidAcl: ErrInvalidACL,
errAuthFailed: ErrAuthFailed,
errClosing: ErrClosing,
errNothing: ErrNothing,
errSessionMoved: ErrSessionMoved,
errZReconfigDisabled: ErrReconfigDisabled,
errBadArguments: ErrBadArguments,
}
)

func (e ErrCode) toError() error {
if err, ok := errCodeToError[e]; ok {
return err
}
return ErrUnknown
return errors.New(fmt.Sprintf("unknown error: %v", e))
}

const (
Expand Down Expand Up @@ -168,6 +174,8 @@ const (
errClosing ErrCode = -116
errNothing ErrCode = -117
errSessionMoved ErrCode = -118
// Attempts to perform a reconfiguration operation when reconfiguration feature is disabled
errZReconfigDisabled ErrCode = -123
)

// Constants for ACL permissions
Expand Down Expand Up @@ -197,6 +205,7 @@ var (
opGetChildren2: "getChildren2",
opCheck: "check",
opMulti: "multi",
opReconfig: "reconfig",
opClose: "close",
opSetAuth: "setAuth",
opSetWatches: "setWatches",
Expand Down
37 changes: 22 additions & 15 deletions server_help.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ func init() {
}

type TestServer struct {
Port int
Path string
Srv *server
Port int
Path string
Srv *server
Config ServerConfigServer
}

type TestCluster struct {
Path string
Config ServerConfig
Servers []TestServer
}

Expand All @@ -50,7 +52,7 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl
}()

for serverN := 0; serverN < size; serverN++ {
srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN))
srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN+1))
if err := os.Mkdir(srvPath, 0700); err != nil {
requireNoError(t, err, "failed to make server path")
}
Expand All @@ -60,13 +62,16 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl
ClientPort: port,
DataDir: srvPath,
}

for i := 0; i < size; i++ {
cfg.Servers = append(cfg.Servers, ServerConfigServer{
serverNConfig := ServerConfigServer{
ID: i + 1,
Host: "127.0.0.1",
PeerPort: startPort + i*3 + 1,
LeaderElectionPort: startPort + i*3 + 2,
})
}

cfg.Servers = append(cfg.Servers, serverNConfig)
}

cfgPath := filepath.Join(srvPath, _testConfigName)
Expand All @@ -91,13 +96,15 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl
}

cluster.Servers = append(cluster.Servers, TestServer{
Path: srvPath,
Port: cfg.ClientPort,
Srv: srv,
Path: srvPath,
Port: cfg.ClientPort,
Srv: srv,
Config: cfg.Servers[serverN],
})
cluster.Config = cfg
}

if err := cluster.waitForStart(20, time.Second); err != nil {
if err := cluster.waitForStart(30, time.Second); err != nil {
return nil, err
}

Expand All @@ -106,9 +113,8 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl
return cluster, nil
}

func (tc *TestCluster) Connect(idx int) (*Conn, error) {
zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15)
return zk, err
func (tc *TestCluster) Connect(idx int) (*Conn, <-chan Event, error) {
return Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15)
}

func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error) {
Expand Down Expand Up @@ -208,7 +214,7 @@ func (tc *TestCluster) StartAllServers() error {
}
}

if err := tc.waitForStart(5, time.Second); err != nil {
if err := tc.waitForStart(10, time.Second*2); err != nil {
return fmt.Errorf("failed to wait to startup zk servers: %v", err)
}

Expand All @@ -235,6 +241,7 @@ func (tc *TestCluster) StopAllServers() error {

func requireNoError(t *testing.T, err error, msgAndArgs ...interface{}) {
if err != nil {
t.Fatalf(fmt.Sprintf("received unexpected error: %+v", err), msgAndArgs...)
t.Logf("received unexpected error: %v", err)
t.Fatal(msgAndArgs...)
}
}
Loading

0 comments on commit b879e54

Please sign in to comment.