Skip to content

Commit

Permalink
Restart zookeeper server for every test.
Browse files Browse the repository at this point in the history
Also, make it easier to start test clusters. This will allow testing of more scenarios.
  • Loading branch information
samuel committed Oct 29, 2013
1 parent 7dcc54b commit 0616a49
Show file tree
Hide file tree
Showing 7 changed files with 289 additions and 16 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.DS_Store
9 changes: 6 additions & 3 deletions zk/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package zk
/*
TODO:
* make sure a ping response comes back in a reasonable time
Possible watcher events:
* Event{Type: EventNotWatching, State: StateDisconnected, Path: path, Err: err}
*/

import (
Expand Down Expand Up @@ -105,7 +108,7 @@ func Connect(servers []string, recvTimeout time.Duration) (*Conn, <-chan Event,
func ConnectWithDialer(servers []string, recvTimeout time.Duration, dialer Dialer) (*Conn, <-chan Event, error) {
for i, addr := range servers {
if !strings.Contains(addr, ":") {
servers[i] = addr + ":" + strconv.Itoa(defaultPort)
servers[i] = addr + ":" + strconv.Itoa(DefaultPort)
}
}
ec := make(chan Event, eventChanSize)
Expand Down Expand Up @@ -608,6 +611,7 @@ func (c *Conn) Get(path string) ([]byte, *Stat, error) {
return res.Data, &res.Stat, err
}

// Get the contents of a znode and set a watch
func (c *Conn) GetW(path string) ([]byte, *Stat, <-chan Event, error) {
var ech <-chan Event
res := &getDataResponse{}
Expand Down Expand Up @@ -652,14 +656,13 @@ func (c *Conn) CreateProtectedEphemeralSequential(path string, data []byte, acl
protectedPath := strings.Join(parts, "/")

var newPath string
var children []string
for i := 0; i < 3; i++ {
newPath, err = c.Create(protectedPath, data, FlagEphemeral|FlagSequence, acl)
switch err {
case ErrSessionExpired:
// No need to search for the node since it can't exist. Just try again.
case ErrConnectionClosed:
children, _, err = c.Children(rootPath)
children, _, err := c.Children(rootPath)
if err != nil {
return "", err
}
Expand Down
2 changes: 1 addition & 1 deletion zk/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
const (
protocolVersion = 0

defaultPort = 2181
DefaultPort = 2181
)

const (
Expand Down
1 change: 1 addition & 0 deletions zk/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (l *Lock) Lock() error {
break
}

// Wait on the node next in line for the lock
_, _, ch, err := l.c.GetW(l.path + "/" + prevSeqPath)
if err != nil && err != ErrNoNode {
return err
Expand Down
14 changes: 12 additions & 2 deletions zk/lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,12 @@ import (
)

func TestLock(t *testing.T) {
zk, _, err := Connect([]string{testAddr}, time.Second*15)
ts, err := startTestCluster(1)
if err != nil {
t.Fatal(err)
}
defer ts.stop()
zk, err := ts.connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
Expand Down Expand Up @@ -59,7 +64,12 @@ func TestLock(t *testing.T) {
// This tests creating a lock with a path that's more than 1 node deep (e.g. "/test-multi-level/lock"),
// when a part of that path already exists (i.e. "/test-multi-level" node already exists).
func TestMultiLevelLock(t *testing.T) {
zk, _, err := Connect([]string{testAddr}, time.Second*15)
ts, err := startTestCluster(1)
if err != nil {
t.Fatal(err)
}
defer ts.stop()
zk, err := ts.connect(0)
if err != nil {
t.Fatalf("Connect returned error: %+v", err)
}
Expand Down
126 changes: 126 additions & 0 deletions zk/server_java.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package zk

import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
)

type ErrMissingServerConfigField string

func (e ErrMissingServerConfigField) Error() string {
return fmt.Sprintf("zk: missing server config field '%s'", string(e))
}

const (
DefaultServerTickTime = 2000
DefaultServerInitLimit = 10
DefaultServerSyncLimit = 5
DefaultServerAutoPurgeSnapRetainCount = 3
DefaultPeerPort = 2888
DefaultLeaderElectionPort = 3888
)

type ServerConfigServer struct {
Id int
Host string
PeerPort int
LeaderElectionPort int
}

type ServerConfig struct {
TickTime int // Number of milliseconds of each tick
InitLimit int // Number of ticks that the initial synchronization phase can take
SyncLimit int // Number of ticks that can pass between sending a request and getting an acknowledgement
DataDir string // Direcrory where the snapshot is stored
ClientPort int // Port at which clients will connect
AutoPurgeSnapRetainCount int // Number of snapshots to retain in dataDir
AutoPurgePurgeInterval int // Purge task internal in hours (0 to disable auto purge)
Servers []ServerConfigServer
}

func (sc ServerConfig) Marshall(w io.Writer) error {
if sc.DataDir == "" {
return ErrMissingServerConfigField("dataDir")
}
fmt.Fprintf(w, "dataDir=%s\n", sc.DataDir)
if sc.TickTime <= 0 {
sc.TickTime = DefaultServerTickTime
}
fmt.Fprintf(w, "tickTime=%d\n", sc.TickTime)
if sc.InitLimit <= 0 {
sc.InitLimit = DefaultServerInitLimit
}
fmt.Fprintf(w, "initLimit=%d\n", sc.InitLimit)
if sc.SyncLimit <= 0 {
sc.SyncLimit = DefaultServerSyncLimit
}
fmt.Fprintf(w, "syncLimit=%d\n", sc.SyncLimit)
if sc.ClientPort <= 0 {
sc.ClientPort = DefaultPort
}
fmt.Fprintf(w, "clientPort=%d\n", sc.ClientPort)
if sc.AutoPurgePurgeInterval > 0 {
if sc.AutoPurgeSnapRetainCount <= 0 {
sc.AutoPurgeSnapRetainCount = DefaultServerAutoPurgeSnapRetainCount
}
fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount)
fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval)
}
if len(sc.Servers) > 0 {
for _, srv := range sc.Servers {
if srv.PeerPort <= 0 {
srv.PeerPort = DefaultPeerPort
}
if srv.LeaderElectionPort <= 0 {
srv.LeaderElectionPort = DefaultLeaderElectionPort
}
fmt.Fprintf(w, "server.%d=%s:%d:%d\n", srv.Id, srv.Host, srv.PeerPort, srv.LeaderElectionPort)
}
}
return nil
}

var jarSearchPaths = []string{
"zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/local/zookeeper-*/contrib/fatjar/zookeeper-*-fatjar.jar",
"/usr/local/Cellar/zookeeper/*/libexec/contrib/fatjar/zookeeper-*-fatjar.jar",
}

func findZookeeperFatJar() string {
for _, path := range jarSearchPaths {
matches, _ := filepath.Glob(path)
// TODO: could sort by version and pick latest
if len(matches) > 0 {
return matches[0]
}
}
return ""
}

type Server struct {
JarPath string
ConfigPath string

cmd *exec.Cmd
}

func (srv *Server) Start() error {
if srv.JarPath == "" {
srv.JarPath = findZookeeperFatJar()
if srv.JarPath == "" {
return fmt.Errorf("zk: unable to find server jar")
}
}
srv.cmd = exec.Command("java", "-jar", srv.JarPath, "server", srv.ConfigPath)
// srv.cmd.Stdout = os.Stdout
// srv.cmd.Stderr = os.Stderr
return srv.cmd.Start()
}

func (srv *Server) Stop() error {
srv.cmd.Process.Signal(os.Kill)
return srv.cmd.Wait()
}
Loading

0 comments on commit 0616a49

Please sign in to comment.