From b879e54f187e829902650747ea98bf8f2ea739cd Mon Sep 17 00:00:00 2001 From: Jeff Bean Date: Thu, 27 Dec 2018 14:25:18 -0800 Subject: [PATCH] ZK 3.5 part 2 - Adding reconfig and incremental reconfig apis --- cluster_test.go | 6 +- conn.go | 62 +++++++++--- constants.go | 25 +++-- server_help.go | 37 ++++--- server_java.go | 35 ++++++- structs.go | 14 +++ structs_test.go | 1 + zk_test.go | 257 +++++++++++++++++++++++++++++++++--------------- 8 files changed, 314 insertions(+), 123 deletions(-) diff --git a/cluster_test.go b/cluster_test.go index 181f8158..3091c9b9 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -22,12 +22,12 @@ func TestBasicCluster(t *testing.T) { t.Fatal(err) } defer ts.Stop() - zk1, err := ts.Connect(0) + zk1, _, err := ts.Connect(0) if err != nil { t.Fatalf("Connect returned error: %+v", err) } defer zk1.Close() - zk2, err := ts.Connect(1) + zk2, _, err := ts.Connect(1) if err != nil { t.Fatalf("Connect returned error: %+v", err) } @@ -191,7 +191,7 @@ func TestWaitForClose(t *testing.T) { t.Fatal(err) } defer ts.Stop() - zk, err := ts.Connect(0) + zk, _, err := ts.Connect(0) if err != nil { t.Fatalf("Connect returned error: %+v", err) } diff --git a/conn.go b/conn.go index 2099798f..1094d1b7 100644 --- a/conn.go +++ b/conn.go @@ -412,13 +412,11 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { defer close(reauthReadyChan) if c.logInfo { - c.logger.Printf("Re-submitting `%d` credentials after reconnect", - len(c.creds)) + c.logger.Printf("re-submitting `%d` credentials after reconnect", len(c.creds)) } for _, cred := range c.creds { if shouldCancel() { - c.logger.Printf("Cancel rer-submitting credentials") return } resChan, err := c.sendRequest( @@ -431,7 +429,7 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { nil) if err != nil { - c.logger.Printf("Call to sendRequest failed during credential resubmit: %s", err) + c.logger.Printf("call to sendRequest failed during credential resubmit: %s", err) // FIXME(prozlach): lets ignore errors for now continue } @@ -440,14 +438,14 @@ func (c *Conn) resendZkAuth(reauthReadyChan chan struct{}) { select { case res = <-resChan: case <-c.closeChan: - c.logger.Printf("Recv closed, cancel re-submitting credentials") + c.logger.Printf("recv closed, cancel re-submitting credentials") return case <-c.shouldQuit: - c.logger.Printf("Should quit, cancel re-submitting credentials") + c.logger.Printf("should quit, cancel re-submitting credentials") return } if res.err != nil { - c.logger.Printf("Credential re-submit failed: %s", res.err) + c.logger.Printf("credential re-submit failed: %s", res.err) // FIXME(prozlach): lets ignore errors for now continue } @@ -489,14 +487,14 @@ func (c *Conn) loop() { err := c.authenticate() switch { case err == ErrSessionExpired: - c.logger.Printf("Authentication failed: %s", err) + c.logger.Printf("authentication failed: %s", err) c.invalidateWatches(err) case err != nil && c.conn != nil: - c.logger.Printf("Authentication failed: %s", err) + c.logger.Printf("authentication failed: %s", err) c.conn.Close() case err == nil: if c.logInfo { - c.logger.Printf("Authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) + c.logger.Printf("authenticated: id=%d, timeout=%d", c.SessionID(), c.sessionTimeoutMs) } c.hostProvider.Connected() // mark success c.closeChan = make(chan struct{}) // channel to tell send loop stop @@ -511,7 +509,7 @@ func (c *Conn) loop() { } err := c.sendLoop() if err != nil || c.logInfo { - c.logger.Printf("Send loop terminated: err=%v", err) + c.logger.Printf("send loop terminated: err=%v", err) } c.conn.Close() // causes recv loop to EOF/exit wg.Done() @@ -526,7 +524,7 @@ func (c *Conn) loop() { err = c.recvLoop(c.conn) } if err != io.EOF || c.logInfo { - c.logger.Printf("Recv loop terminated: err=%v", err) + c.logger.Printf("recv loop terminated: err=%v", err) } if err == nil { panic("zk: recvLoop should never return nil error") @@ -826,10 +824,12 @@ func (c *Conn) recvLoop(conn net.Conn) error { buf := make([]byte, sz) for { // package length - conn.SetReadDeadline(time.Now().Add(c.recvTimeout)) + if err := conn.SetReadDeadline(time.Now().Add(c.recvTimeout)); err != nil { + c.logger.Printf("failed to set connection deadline: %v", err) + } _, err := io.ReadFull(conn, buf[:4]) if err != nil { - return err + return fmt.Errorf("failed to read from connection: %v", err) } blen := int(binary.BigEndian.Uint32(buf[:4])) @@ -1280,6 +1280,40 @@ func (c *Conn) Multi(ops ...interface{}) ([]MultiResponse, error) { return mr, err } +// IncrementalReconfig is the zookeeper reconfiguration api that allows adding and removing servers +// by lists of members. +// Return the new configuration stats. +// TODO: expose and return the config znode itself like the Java client does. +func (c *Conn) IncrementalReconfig(joining, leaving []string, version int64) (*Stat, error) { + // TODO: validate the shape of the member string to give early feedback. + request := &reconfigRequest{ + JoiningServers: []byte(strings.Join(joining, ",")), + LeavingServers: []byte(strings.Join(leaving, ",")), + CurConfigId: version, + } + + return c.internalReconfig(request) +} + +// Reconfig is the non-incremental update functionality for Zookeeper where the list preovided +// is the entire new member list. +// the optional version allows for conditional reconfigurations, -1 ignores the condition. +// TODO: expose and return the config znode itself like the Java client does. +func (c *Conn) Reconfig(members []string, version int64) (*Stat, error) { + request := &reconfigRequest{ + NewMembers: []byte(strings.Join(members, ",")), + CurConfigId: version, + } + + return c.internalReconfig(request) +} + +func (c *Conn) internalReconfig(request *reconfigRequest) (*Stat, error) { + response := &reconfigReponse{} + _, err := c.request(opReconfig, request, response, nil) + return &response.Stat, err +} + // Server returns the current or last-connected server name. func (c *Conn) Server() string { c.serverMu.Lock() diff --git a/constants.go b/constants.go index 33b5563b..07370bb9 100644 --- a/constants.go +++ b/constants.go @@ -2,6 +2,7 @@ package zk import ( "errors" + "fmt" ) const ( @@ -25,6 +26,7 @@ const ( opGetChildren2 = 12 opCheck = 13 opMulti = 14 + opReconfig = 16 opClose = -11 opSetAuth = 100 opSetWatches = 101 @@ -92,7 +94,7 @@ func (s State) String() string { if name := stateNames[s]; name != "" { return name } - return "Unknown" + return "unknown state" } type ErrCode int32 @@ -113,8 +115,10 @@ var ( ErrClosing = errors.New("zk: zookeeper is closing") ErrNothing = errors.New("zk: no server responsees to process") ErrSessionMoved = errors.New("zk: session moved to another server, so operation is ignored") - + ErrReconfigDisabled = errors.New("attempts to perform a reconfiguration operation when reconfiguration feature is disabled") + ErrBadArguments = errors.New("invalid arguments") // ErrInvalidCallback = errors.New("zk: invalid callback specified") + errCodeToError = map[ErrCode]error{ 0: nil, errAPIError: ErrAPIError, @@ -126,11 +130,13 @@ var ( errNotEmpty: ErrNotEmpty, errSessionExpired: ErrSessionExpired, // errInvalidCallback: ErrInvalidCallback, - errInvalidAcl: ErrInvalidACL, - errAuthFailed: ErrAuthFailed, - errClosing: ErrClosing, - errNothing: ErrNothing, - errSessionMoved: ErrSessionMoved, + errInvalidAcl: ErrInvalidACL, + errAuthFailed: ErrAuthFailed, + errClosing: ErrClosing, + errNothing: ErrNothing, + errSessionMoved: ErrSessionMoved, + errZReconfigDisabled: ErrReconfigDisabled, + errBadArguments: ErrBadArguments, } ) @@ -138,7 +144,7 @@ func (e ErrCode) toError() error { if err, ok := errCodeToError[e]; ok { return err } - return ErrUnknown + return errors.New(fmt.Sprintf("unknown error: %v", e)) } const ( @@ -168,6 +174,8 @@ const ( errClosing ErrCode = -116 errNothing ErrCode = -117 errSessionMoved ErrCode = -118 + // Attempts to perform a reconfiguration operation when reconfiguration feature is disabled + errZReconfigDisabled ErrCode = -123 ) // Constants for ACL permissions @@ -197,6 +205,7 @@ var ( opGetChildren2: "getChildren2", opCheck: "check", opMulti: "multi", + opReconfig: "reconfig", opClose: "close", opSetAuth: "setAuth", opSetWatches: "setWatches", diff --git a/server_help.go b/server_help.go index 6ba3749d..7b26dbda 100644 --- a/server_help.go +++ b/server_help.go @@ -22,13 +22,15 @@ func init() { } type TestServer struct { - Port int - Path string - Srv *server + Port int + Path string + Srv *server + Config ServerConfigServer } type TestCluster struct { Path string + Config ServerConfig Servers []TestServer } @@ -50,7 +52,7 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl }() for serverN := 0; serverN < size; serverN++ { - srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN)) + srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv%d", serverN+1)) if err := os.Mkdir(srvPath, 0700); err != nil { requireNoError(t, err, "failed to make server path") } @@ -60,13 +62,16 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl ClientPort: port, DataDir: srvPath, } + for i := 0; i < size; i++ { - cfg.Servers = append(cfg.Servers, ServerConfigServer{ + serverNConfig := ServerConfigServer{ ID: i + 1, Host: "127.0.0.1", PeerPort: startPort + i*3 + 1, LeaderElectionPort: startPort + i*3 + 2, - }) + } + + cfg.Servers = append(cfg.Servers, serverNConfig) } cfgPath := filepath.Join(srvPath, _testConfigName) @@ -91,13 +96,15 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl } cluster.Servers = append(cluster.Servers, TestServer{ - Path: srvPath, - Port: cfg.ClientPort, - Srv: srv, + Path: srvPath, + Port: cfg.ClientPort, + Srv: srv, + Config: cfg.Servers[serverN], }) + cluster.Config = cfg } - if err := cluster.waitForStart(20, time.Second); err != nil { + if err := cluster.waitForStart(30, time.Second); err != nil { return nil, err } @@ -106,9 +113,8 @@ func StartTestCluster(t *testing.T, size int, stdout, stderr io.Writer) (*TestCl return cluster, nil } -func (tc *TestCluster) Connect(idx int) (*Conn, error) { - zk, _, err := Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15) - return zk, err +func (tc *TestCluster) Connect(idx int) (*Conn, <-chan Event, error) { + return Connect([]string{fmt.Sprintf("127.0.0.1:%d", tc.Servers[idx].Port)}, time.Second*15) } func (tc *TestCluster) ConnectAll() (*Conn, <-chan Event, error) { @@ -208,7 +214,7 @@ func (tc *TestCluster) StartAllServers() error { } } - if err := tc.waitForStart(5, time.Second); err != nil { + if err := tc.waitForStart(10, time.Second*2); err != nil { return fmt.Errorf("failed to wait to startup zk servers: %v", err) } @@ -235,6 +241,7 @@ func (tc *TestCluster) StopAllServers() error { func requireNoError(t *testing.T, err error, msgAndArgs ...interface{}) { if err != nil { - t.Fatalf(fmt.Sprintf("received unexpected error: %+v", err), msgAndArgs...) + t.Logf("received unexpected error: %v", err) + t.Fatal(msgAndArgs...) } } diff --git a/server_java.go b/server_java.go index b93b45c3..a706e7bf 100644 --- a/server_java.go +++ b/server_java.go @@ -17,7 +17,7 @@ func (e ErrMissingServerConfigField) Error() string { } const ( - DefaultServerTickTime = 2000 + DefaultServerTickTime = 500 DefaultServerInitLimit = 10 DefaultServerSyncLimit = 5 DefaultServerAutoPurgeSnapRetainCount = 3 @@ -29,8 +29,8 @@ type server struct { stdout, stderr io.Writer cmdString string cmdArgs []string - - cmd *exec.Cmd + cmdEnv []string + cmd *exec.Cmd // this cancel will kill the command being run in this case the server itself. cancelFunc context.CancelFunc } @@ -47,10 +47,13 @@ func NewIntegrationTestServer(t *testing.T, configPath string, stdout, stderr io return nil, fmt.Errorf("zk: could not find testing zookeeper bin path at %q: %v ", zkPath, err) } } + // password is 'test' + superString := `SERVER_JVMFLAGS=-Dzookeeper.DigestAuthenticationProvider.superDigest=super:D/InIHSb7yEEbrWz8b9l71RjZJU=` return &server{ cmdString: filepath.Join(zkPath, "zkServer.sh"), cmdArgs: []string{"start-foreground", configPath}, + cmdEnv: []string{superString}, stdout: stdout, stderr: stderr, }, nil } @@ -62,6 +65,8 @@ func (srv *server) Start() error { srv.cmd = exec.CommandContext(ctx, srv.cmdString, srv.cmdArgs...) srv.cmd.Stdout = srv.stdout srv.cmd.Stderr = srv.stderr + + srv.cmd.Env = srv.cmdEnv return srv.cmd.Start() } @@ -119,6 +124,11 @@ func (sc ServerConfig) Marshall(w io.Writer) error { fmt.Fprintf(w, "autopurge.snapRetainCount=%d\n", sc.AutoPurgeSnapRetainCount) fmt.Fprintf(w, "autopurge.purgeInterval=%d\n", sc.AutoPurgePurgeInterval) } + // enable reconfig. + // TODO: allow setting this + fmt.Fprintln(w, "reconfigEnabled=true") + fmt.Fprintln(w, "4lw.commands.whitelist=*") + fmt.Fprintln(w, "standaloneEnabled=false") if len(sc.Servers) < 2 { // if we dont have more than 2 servers we just dont specify server list to start in standalone mode @@ -137,3 +147,22 @@ func (sc ServerConfig) Marshall(w io.Writer) error { } return nil } + +// this is a helper to wait for the zk connection to at least get to the HasSession state +func waitForSession(ctx context.Context, eventChan <-chan Event) error { + select { + case event, ok := <-eventChan: + // The eventChan is used solely to determine when the ZK conn has + // stopped. + if !ok { + return fmt.Errorf("connection closed before state reached") + } + if event.State == StateHasSession { + return nil + } + case <-ctx.Done(): + return ctx.Err() + } + + return nil +} diff --git a/structs.go b/structs.go index d4af27de..24db556e 100644 --- a/structs.go +++ b/structs.go @@ -277,6 +277,18 @@ type multiResponse struct { DoneHeader multiHeader } +// zk version 3.5 reconfig API +type reconfigRequest struct { + JoiningServers []byte + LeavingServers []byte + NewMembers []byte + // curConfigId version of the current configuration + // optional - causes reconfiguration to return an error if configuration is no longer current + CurConfigId int64 +} + +type reconfigReponse getDataResponse + func (r *multiRequest) Encode(buf []byte) (int, error) { total := 0 for _, op := range r.Ops { @@ -604,6 +616,8 @@ func requestStructForOp(op int32) interface{} { return &CheckVersionRequest{} case opMulti: return &multiRequest{} + case opReconfig: + return &reconfigRequest{} } return nil } diff --git a/structs_test.go b/structs_test.go index a3f27974..3a38ab45 100644 --- a/structs_test.go +++ b/structs_test.go @@ -15,6 +15,7 @@ func TestEncodeDecodePacket(t *testing.T) { encodeDecodeTest(t, &pathWatchRequest{"path", true}) encodeDecodeTest(t, &pathWatchRequest{"path", false}) encodeDecodeTest(t, &CheckVersionRequest{"/", -1}) + encodeDecodeTest(t, &reconfigRequest{nil, nil, nil, -1}) encodeDecodeTest(t, &multiRequest{Ops: []multiRequestOp{{multiHeader{opCheck, false, -1}, &CheckVersionRequest{"/", -1}}}}) } diff --git a/zk_test.go b/zk_test.go index 586d8cfa..8e79045b 100644 --- a/zk_test.go +++ b/zk_test.go @@ -1,11 +1,15 @@ package zk import ( - "crypto/rand" + "context" "encoding/hex" "fmt" "io" + "io/ioutil" + "math/rand" "net" + "os" + "path/filepath" "reflect" "regexp" "sort" @@ -93,35 +97,128 @@ func TestCreate(t *testing.T) { } } -func TestOpsAfterCloseDontDeadlock(t *testing.T) { - ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "}) +func TestIncrementalReconfig(t *testing.T) { + ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "}) + requireNoError(t, err, "failed to setup test cluster") + defer ts.Stop() + + // start and add a new server. + tmpPath, err := ioutil.TempDir("", "gozk") + requireNoError(t, err, "failed to create tmp dir for test server setup") + defer os.RemoveAll(tmpPath) + + startPort := int(rand.Int31n(6000) + 10000) + + srvPath := filepath.Join(tmpPath, fmt.Sprintf("srv4")) + if err := os.Mkdir(srvPath, 0700); err != nil { + requireNoError(t, err, "failed to make server path") + } + testSrvConfig := ServerConfigServer{ + ID: 4, + Host: "127.0.0.1", + PeerPort: startPort + 1, + LeaderElectionPort: startPort + 2, + } + cfg := ServerConfig{ + ClientPort: startPort, + DataDir: srvPath, + Servers: []ServerConfigServer{testSrvConfig}, + } + + // TODO: clean all this server creating up to a better helper method + cfgPath := filepath.Join(srvPath, _testConfigName) + fi, err := os.Create(cfgPath) + requireNoError(t, err) + + requireNoError(t, cfg.Marshall(fi)) + fi.Close() + + fi, err = os.Create(filepath.Join(srvPath, _testMyIDFileName)) + requireNoError(t, err) + + _, err = fmt.Fprintln(fi, "4") + fi.Close() + requireNoError(t, err) + + testServer, err := NewIntegrationTestServer(t, cfgPath, nil, nil) + requireNoError(t, err) + requireNoError(t, testServer.Start()) + defer testServer.Stop() + + zk, events, err := ts.ConnectAll() + requireNoError(t, err, "failed to connect to cluster") + defer zk.Close() + + err = zk.AddAuth("digest", []byte("super:test")) + requireNoError(t, err, "failed to auth to cluster") + + waitCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = waitForSession(waitCtx, events) + requireNoError(t, err, "failed to wail for session") + + _, _, err = zk.Get("/zookeeper/config") if err != nil { - t.Fatal(err) + t.Fatalf("get config returned error: %+v", err) } + + // initially should be 1<<32, which is 0x100000000. This is the zxid + // of the first NEWLEADER message, used as the inital version + // reflect.DeepEqual(bytes.Split(data, []byte("\n")), []byte("version=100000000")) + + // remove node 3. + _, err = zk.IncrementalReconfig(nil, []string{"3"}, -1) + requireNoError(t, err, "failed to remove node from cluster") + + // add node a new 4th node + server := fmt.Sprintf("server.%d=%s:%d:%d;%d", testSrvConfig.ID, testSrvConfig.Host, testSrvConfig.PeerPort, testSrvConfig.LeaderElectionPort, cfg.ClientPort) + _, err = zk.IncrementalReconfig([]string{server}, nil, -1) + requireNoError(t, err, "failed to add new server to cluster") +} + +func TestReconfg(t *testing.T) { + // This test enures we can do an non-incremental reconfig + ts, err := StartTestCluster(t, 3, nil, logWriter{t: t, p: "[ZKERR] "}) + requireNoError(t, err, "failed to setup test cluster") defer ts.Stop() - zk, _, err := ts.ConnectAll() + + zk, events, err := ts.ConnectAll() + requireNoError(t, err, "failed to connect to cluster") + defer zk.Close() + + err = zk.AddAuth("digest", []byte("super:test")) + requireNoError(t, err, "failed to auth to cluster") + + waitCtx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + + err = waitForSession(waitCtx, events) + requireNoError(t, err, "failed to wail for session") + + _, _, err = zk.Get("/zookeeper/config") if err != nil { - t.Fatalf("Connect returned error: %+v", err) + t.Fatalf("get config returned error: %+v", err) } - zk.Close() - path := "/gozk-test" + // essentially remove the first node + var s []string + for _, host := range ts.Config.Servers[1:] { + s = append(s, fmt.Sprintf("server.%d=%s:%d:%d;%d\n", host.ID, host.Host, host.PeerPort, host.LeaderElectionPort, ts.Config.ClientPort)) + } - ch := make(chan struct{}) - go func() { - defer close(ch) - for range make([]struct{}, 30) { - if _, err := zk.Create(path, []byte{1, 2, 3, 4}, 0, WorldACL(PermAll)); err == nil { - t.Fatal("Create did not return error") - } - } - }() - select { - case <-ch: - // expected - case <-time.After(10 * time.Second): - t.Fatal("ZK connection deadlocked when executing ops after a Close operation") + _, err = zk.Reconfig(s, -1) + requireNoError(t, err, "failed to reconfig cluster") + + // reconfig to all the hosts again + s = []string{} + for _, host := range ts.Config.Servers { + s = append(s, fmt.Sprintf("server.%d=%s:%d:%d;%d\n", host.ID, host.Host, host.PeerPort, host.LeaderElectionPort, ts.Config.ClientPort)) } + + _, err = zk.Reconfig(s, -1) + requireNoError(t, err, "failed to reconfig cluster") + } func TestMulti(t *testing.T) { @@ -767,64 +864,6 @@ func TestSlowServer(t *testing.T) { } } -func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) { - ln, err := net.Listen("tcp", "127.0.0.1:0") - if err != nil { - return "", nil, err - } - tln := &Listener{ - Listener: ln, - Up: up, - Down: down, - } - stopCh := make(chan bool) - go func() { - <-stopCh - tln.Close() - }() - go func() { - for { - cn, err := tln.Accept() - if err != nil { - if !strings.Contains(err.Error(), "use of closed network connection") { - t.Fatalf("Accept failed: %s", err.Error()) - } - return - } - if adj != nil { - adj(tln) - } - go func(cn net.Conn) { - defer cn.Close() - upcn, err := net.Dial("tcp", upstream) - if err != nil { - t.Log(err) - return - } - // This will leave hanging goroutines util stopCh is closed - // but it doesn't matter in the context of running tests. - go func() { - <-stopCh - upcn.Close() - }() - go func() { - if _, err := io.Copy(upcn, cn); err != nil { - if !strings.Contains(err.Error(), "use of closed network connection") { - // log.Printf("Upstream write failed: %s", err.Error()) - } - } - }() - if _, err := io.Copy(cn, upcn); err != nil { - if !strings.Contains(err.Error(), "use of closed network connection") { - // log.Printf("Upstream read failed: %s", err.Error()) - } - } - }(cn) - } - }() - return ln.Addr().String(), stopCh, nil -} - func TestMaxBufferSize(t *testing.T) { ts, err := StartTestCluster(t, 1, nil, logWriter{t: t, p: "[ZKERR] "}) if err != nil { @@ -931,6 +970,64 @@ func TestMaxBufferSize(t *testing.T) { } } +func startSlowProxy(t *testing.T, up, down Rate, upstream string, adj func(ln *Listener)) (string, chan bool, error) { + ln, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return "", nil, err + } + tln := &Listener{ + Listener: ln, + Up: up, + Down: down, + } + stopCh := make(chan bool) + go func() { + <-stopCh + tln.Close() + }() + go func() { + for { + cn, err := tln.Accept() + if err != nil { + if !strings.Contains(err.Error(), "use of closed network connection") { + t.Fatalf("Accept failed: %s", err.Error()) + } + return + } + if adj != nil { + adj(tln) + } + go func(cn net.Conn) { + defer cn.Close() + upcn, err := net.Dial("tcp", upstream) + if err != nil { + t.Log(err) + return + } + // This will leave hanging goroutines util stopCh is closed + // but it doesn't matter in the context of running tests. + go func() { + <-stopCh + upcn.Close() + }() + go func() { + if _, err := io.Copy(upcn, cn); err != nil { + if !strings.Contains(err.Error(), "use of closed network connection") { + // log.Printf("Upstream write failed: %s", err.Error()) + } + } + }() + if _, err := io.Copy(cn, upcn); err != nil { + if !strings.Contains(err.Error(), "use of closed network connection") { + // log.Printf("Upstream read failed: %s", err.Error()) + } + } + }(cn) + } + }() + return ln.Addr().String(), stopCh, nil +} + func expectErr(t *testing.T, err error, expected error) { if err == nil { t.Fatalf("Get for node that is too large should have returned error!")