From 58dbbb622b1979a687b5f1b9b0a34cb799bd99a2 Mon Sep 17 00:00:00 2001 From: "Derrick J. Wippler" Date: Mon, 13 Feb 2017 12:02:22 -0600 Subject: [PATCH] GetHosts() is now more C* version tolerant --- cassandra_test.go | 8 +- common_test.go | 6 +- conn.go | 6 +- connectionpool.go | 14 +- control.go | 47 +---- control_test.go | 4 +- events.go | 30 +-- filters.go | 4 +- filters_test.go | 6 +- host_source.go | 451 +++++++++++++++++++++++++++++++--------- host_source_test.go | 64 +++++- policies.go | 16 +- policies_test.go | 56 ++--- ring.go | 6 +- ring_test.go | 6 +- session.go | 28 +-- session_connect_test.go | 11 +- token.go | 2 +- token_test.go | 39 ++-- 19 files changed, 537 insertions(+), 267 deletions(-) diff --git a/cassandra_test.go b/cassandra_test.go index 6909dfa09..01c83616a 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -67,7 +67,7 @@ func TestRingDiscovery(t *testing.T) { if *clusterSize != size { for p, pool := range session.pool.hostConnPools { - t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.Peer().String()) + t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String()) } t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size) @@ -577,7 +577,7 @@ func TestReconnection(t *testing.T) { defer session.Close() h := session.ring.allHosts()[0] - session.handleNodeDown(h.Peer(), h.Port()) + session.handleNodeDown(h.ConnectAddress(), h.Port()) if h.State() != NodeDown { t.Fatal("Host should be NodeDown but not.") @@ -2369,8 +2369,8 @@ func TestDiscoverViaProxy(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - if !session.hostSource.localHasRpcAddr { - t.Skip("Target cluster does not have rpc_address in system.local.") + if session.hostSource.localHost.BroadcastAddress() == nil { + t.Skip("Target cluster does not have broadcast_address in system.local.") goto close } diff --git a/common_test.go b/common_test.go index 92cd15aba..27552b8f7 100644 --- a/common_test.go +++ b/common_test.go @@ -4,11 +4,11 @@ import ( "flag" "fmt" "log" + "net" "strings" "sync" "testing" "time" - "net" ) var ( @@ -154,9 +154,9 @@ func createTestSession() *Session { config.IgnorePeerAddr = true config.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy() session := &Session{ - cfg: *config, + cfg: *config, connCfg: &ConnConfig{ - Timeout: 10*time.Millisecond, + Timeout: 10 * time.Millisecond, Keepalive: 0, }, policy: config.PoolConfig.HostSelectionPolicy, diff --git a/conn.go b/conn.go index 6c26005ae..44ab5eae0 100644 --- a/conn.go +++ b/conn.go @@ -156,8 +156,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses // TODO(zariel): remove these if host == nil { panic("host is nil") - } else if len(host.Peer()) == 0 { - panic("host missing peer ip address") + } else if len(host.ConnectAddress()) == 0 { + panic("host missing connect ip address") } else if host.Port() == 0 { panic("host missing port") } @@ -172,7 +172,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses } // TODO(zariel): handle ipv6 zone - translatedPeer, translatedPort := session.cfg.translateAddressPort(host.Peer(), host.Port()) + translatedPeer, translatedPort := session.cfg.translateAddressPort(host.ConnectAddress(), host.Port()) addr := (&net.TCPAddr{IP: translatedPeer, Port: translatedPort}).String() //addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String() diff --git a/connectionpool.go b/connectionpool.go index af2a78b93..3c85b4b42 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -128,7 +128,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) { // don't create a connection pool for a down host continue } - ip := host.Peer().String() + ip := host.ConnectAddress().String() if _, exists := p.hostConnPools[ip]; exists { // still have this host, so don't remove it delete(toRemove, ip) @@ -154,7 +154,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) { createCount-- if pool.Size() > 0 { // add pool onyl if there a connections available - p.hostConnPools[string(pool.host.Peer())] = pool + p.hostConnPools[string(pool.host.ConnectAddress())] = pool } } @@ -177,7 +177,7 @@ func (p *policyConnPool) Size() int { } func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) { - ip := host.Peer().String() + ip := host.ConnectAddress().String() p.mu.RLock() pool, ok = p.hostConnPools[ip] p.mu.RUnlock() @@ -196,7 +196,7 @@ func (p *policyConnPool) Close() { } func (p *policyConnPool) addHost(host *HostInfo) { - ip := host.Peer().String() + ip := host.ConnectAddress().String() p.mu.Lock() pool, ok := p.hostConnPools[ip] if !ok { @@ -274,7 +274,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int, session: session, host: host, port: port, - addr: (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(), + addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(), size: size, keyspace: keyspace, conns: make([]*Conn, 0, size), @@ -398,7 +398,7 @@ func (pool *hostConnPool) fill() { // this is call with the connection pool mutex held, this call will // then recursively try to lock it again. FIXME - go pool.session.handleNodeDown(pool.host.Peer(), pool.port) + go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port) return } @@ -420,7 +420,7 @@ func (pool *hostConnPool) logConnectErr(err error) { // connection refused // these are typical during a node outage so avoid log spam. if gocqlDebug { - Logger.Printf("unable to dial %q: %v\n", pool.host.Peer(), err) + Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err) } } else if err != nil { // unexpected error diff --git a/control.go b/control.go index 7089d76b7..7cd675c6d 100644 --- a/control.go +++ b/control.go @@ -134,7 +134,7 @@ func hostInfo(addr string, defaultPort int) (*HostInfo, error) { } - return &HostInfo{peer: ip, port: port}, nil + return &HostInfo{connectAddress: ip, port: port}, nil } func shuffleHosts(hosts []*HostInfo) []*HostInfo { @@ -163,7 +163,7 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) { return conn, nil } - Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.Peer(), err) + Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.ConnectAddress(), err) } return nil, err @@ -316,7 +316,7 @@ func (c *controlConn) reconnect(refreshring bool) { if err != nil { // host is dead // TODO: this is replicated in a few places - c.session.handleNodeDown(host.Peer(), host.Port()) + c.session.handleNodeDown(host.ConnectAddress(), host.Port()) } else { newConn = conn } @@ -422,52 +422,13 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter return } -func (c *controlConn) fetchHostInfo(ip net.IP, port int) (*HostInfo, error) { - // TODO(zariel): we should probably move this into host_source or atleast - // share code with it. - localHost := c.host() - if localHost == nil { - return nil, errors.New("unable to fetch host info, invalid conn host") - } - - isLocal := localHost.Peer().Equal(ip) - - var fn func(*HostInfo) error - - // TODO(zariel): fetch preferred_ip address (is it >3.x only?) - if isLocal { - fn = func(host *HostInfo) error { - iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.local WHERE key='local'") - iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) - return iter.Close() - } - } else { - fn = func(host *HostInfo) error { - iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.peers WHERE peer=?", ip) - iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) - return iter.Close() - } - } - - host := &HostInfo{ - port: port, - peer: ip, - } - - if err := fn(host); err != nil { - return nil, err - } - - return host, nil -} - func (c *controlConn) awaitSchemaAgreement() error { return c.withConn(func(conn *Conn) *Iter { return &Iter{err: conn.awaitSchemaAgreement()} }).err } -func (c *controlConn) host() *HostInfo { +func (c *controlConn) GetHostInfo() *HostInfo { conn := c.conn.Load().(*Conn) if conn == nil { return nil diff --git a/control_test.go b/control_test.go index c4e965587..a63a8d2ee 100644 --- a/control_test.go +++ b/control_test.go @@ -24,8 +24,8 @@ func TestHostInfo_Lookup(t *testing.T) { continue } - if !host.peer.Equal(test.ip) { - t.Errorf("expected ip %v got %v for addr %q", test.ip, host.peer, test.addr) + if !host.ConnectAddress().Equal(test.ip) { + t.Errorf("expected ip %v got %v for addr %q", test.ip, host.ConnectAddress(), test.addr) } } } diff --git a/events.go b/events.go index 06b1b4acb..d66eb6c63 100644 --- a/events.go +++ b/events.go @@ -171,23 +171,15 @@ func (s *Session) handleNodeEvent(frames []frame) { } func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) { - var hostInfo *HostInfo - if s.control != nil && !s.cfg.IgnorePeerAddr { - var err error - hostInfo, err = s.control.fetchHostInfo(ip, port) - if err != nil { - Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err) - return - } - } else { - hostInfo = &HostInfo{peer: ip, port: port} - } - - if s.cfg.IgnorePeerAddr && hostInfo.Peer().Equal(ip) { - hostInfo.setPeer(ip) + // Get host info and apply any filters to the host + hostInfo, err := s.hostSource.GetHostInfo(ip, port) + if err != nil { + Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err) + return } - if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(hostInfo) { + // If hostInfo is nil, this host was filtered out by cfg.HostFilter + if hostInfo == nil { return } @@ -213,7 +205,7 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) { // we remove all nodes but only add ones which pass the filter host := s.ring.getHost(ip) if host == nil { - host = &HostInfo{peer: ip, port: port} + host = &HostInfo{connectAddress: ip, port: port} } if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { @@ -237,9 +229,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { host := s.ring.getHost(ip) if host != nil { - if s.cfg.IgnorePeerAddr && host.Peer().Equal(ip) { + if s.cfg.IgnorePeerAddr && host.ConnectAddress().Equal(ip) { // TODO: how can this ever be true? - host.setPeer(ip) + host.SetConnectAddress(ip) } if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { @@ -266,7 +258,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) { host := s.ring.getHost(ip) if host == nil { - host = &HostInfo{peer: ip, port: port} + host = &HostInfo{connectAddress: ip, port: port} } if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { diff --git a/filters.go b/filters.go index 807a2cf47..234b94bd6 100644 --- a/filters.go +++ b/filters.go @@ -48,10 +48,10 @@ func WhiteListHostFilter(hosts ...string) HostFilter { m := make(map[string]bool, len(hostInfos)) for _, host := range hostInfos { - m[string(host.peer)] = true + m[string(host.ConnectAddress())] = true } return HostFilterFunc(func(host *HostInfo) bool { - return m[string(host.Peer())] + return m[string(host.ConnectAddress())] }) } diff --git a/filters_test.go b/filters_test.go index 4ce0a6ccf..1ccf1a1ca 100644 --- a/filters_test.go +++ b/filters_test.go @@ -17,7 +17,7 @@ func TestFilter_WhiteList(t *testing.T) { } for i, test := range tests { - if f.Accept(&HostInfo{peer: test.addr}) { + if f.Accept(&HostInfo{connectAddress: test.addr}) { if !test.accept { t.Errorf("%d: should not have been accepted but was", i) } @@ -39,7 +39,7 @@ func TestFilter_AllowAll(t *testing.T) { } for i, test := range tests { - if f.Accept(&HostInfo{peer: test.addr}) { + if f.Accept(&HostInfo{connectAddress: test.addr}) { if !test.accept { t.Errorf("%d: should not have been accepted but was", i) } @@ -61,7 +61,7 @@ func TestFilter_DenyAll(t *testing.T) { } for i, test := range tests { - if f.Accept(&HostInfo{peer: test.addr}) { + if f.Accept(&HostInfo{connectAddress: test.addr}) { if !test.accept { t.Errorf("%d: should not have been accepted but was", i) } diff --git a/host_source.go b/host_source.go index c3944f3cc..e08bcc17c 100644 --- a/host_source.go +++ b/host_source.go @@ -7,8 +7,12 @@ import ( "strings" "sync" "time" + + "github.com/pkg/errors" ) +const assertErrorMsg = "Assertion failed for %s" + type nodeState int32 func (n nodeState) String() string { @@ -98,15 +102,25 @@ func (c cassVersion) nodeUpDelay() time.Duration { type HostInfo struct { // TODO(zariel): reduce locking maybe, not all values will change, but to ensure // that we are thread safe use a mutex to access all fields. - mu sync.RWMutex - peer net.IP - port int - dataCenter string - rack string - hostId string - version cassVersion - state nodeState - tokens []string + mu sync.RWMutex + peer net.IP + broadcastAddress net.IP + listenAddress net.IP + rpcAddress net.IP + preferredIP net.IP + connectAddress net.IP + port int + dataCenter string + rack string + hostId string + workload string + graph bool + dseVersion string + partitioner string + clusterName string + version cassVersion + state nodeState + tokens []string } func (h *HostInfo) Equal(host *HostInfo) bool { @@ -115,7 +129,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool { host.mu.RLock() defer host.mu.RUnlock() - return h.peer.Equal(host.peer) + return h.ConnectAddress().Equal(host.ConnectAddress()) } func (h *HostInfo) Peer() net.IP { @@ -131,6 +145,57 @@ func (h *HostInfo) setPeer(peer net.IP) *HostInfo { return h } +// Returns the address that should be used to connect to the host +// This defaults to 'broadcast_address', then falls back to 'peer' +// This is to maintain existing functionality. If you wish to +// override this, use an AddressTranslator or use a HostFilter +// to SetConnectAddress() +func (h *HostInfo) ConnectAddress() net.IP { + h.mu.RLock() + defer h.mu.RUnlock() + + if h.connectAddress == nil { + if h.broadcastAddress != nil { + return h.broadcastAddress + } + if h.peer != nil { + return h.peer + } + } + return h.connectAddress +} + +func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo { + h.mu.Lock() + defer h.mu.Unlock() + h.connectAddress = address + return h +} + +func (h *HostInfo) BroadcastAddress() net.IP { + h.mu.RLock() + defer h.mu.RUnlock() + return h.broadcastAddress +} + +func (h *HostInfo) ListenAddress() net.IP { + h.mu.RLock() + defer h.mu.RUnlock() + return h.listenAddress +} + +func (h *HostInfo) RPCAddress() net.IP { + h.mu.RLock() + defer h.mu.RUnlock() + return h.rpcAddress +} + +func (h *HostInfo) PreferredIP() net.IP { + h.mu.RLock() + defer h.mu.RUnlock() + return h.preferredIP +} + func (h *HostInfo) DataCenter() string { h.mu.RLock() defer h.mu.RUnlock() @@ -170,6 +235,36 @@ func (h *HostInfo) setHostID(hostID string) *HostInfo { return h } +func (h *HostInfo) WorkLoad() string { + h.mu.RLock() + defer h.mu.RUnlock() + return h.workload +} + +func (h *HostInfo) Graph() bool { + h.mu.RLock() + defer h.mu.RUnlock() + return h.graph +} + +func (h *HostInfo) DSEVersion() string { + h.mu.RLock() + defer h.mu.RUnlock() + return h.dseVersion +} + +func (h *HostInfo) Partitioner() string { + h.mu.RLock() + defer h.mu.RUnlock() + return h.partitioner +} + +func (h *HostInfo) ClusterName() string { + h.mu.RLock() + defer h.mu.RUnlock() + return h.clusterName +} + func (h *HostInfo) Version() cassVersion { h.mu.RLock() defer h.mu.RUnlock() @@ -239,28 +334,27 @@ func (h *HostInfo) IsUp() bool { func (h *HostInfo) String() string { h.mu.RLock() defer h.mu.RUnlock() - return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) + return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+ + "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", + h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, + h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) } // Polls system.peers at a specific interval to find new hosts type ringDescriber struct { - dcFilter string - rackFilter string - session *Session - closeChan chan bool - // indicates that we can use system.local to get the connections remote address - localHasRpcAddr bool - + session *Session mu sync.Mutex prevHosts []*HostInfo + localHost *HostInfo prevPartitioner string } -func checkSystemLocal(control *controlConn) (bool, error) { - iter := control.query("SELECT broadcast_address FROM system.local") +// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces +func checkSystemSchema(control *controlConn) (bool, error) { + iter := control.query("SELECT * FROM system_schema.keyspaces") if err := iter.err; err != nil { if errf, ok := err.(*errorFrame); ok { - if errf.code == errSyntax { + if errf.code == errReadFailure { return false, nil } } @@ -271,106 +365,277 @@ func checkSystemLocal(control *controlConn) (bool, error) { return true, nil } -// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces -func checkSystemSchema(control *controlConn) (bool, error) { - iter := control.query("SELECT * FROM system_schema.keyspaces") - if err := iter.err; err != nil { - if errf, ok := err.(*errorFrame); ok { - if errf.code == errReadFailure { - return false, nil +// Given a map that represents a row from either system.local or system.peers +// return as much information as we can in *HostInfo +func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (error, *HostInfo) { + host := HostInfo{} + var ok bool + + for key, value := range row { + switch key { + case "data_center": + host.dataCenter, ok = value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "data_center"), nil + } + case "rack": + host.rack, ok = value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "rack"), nil + } + case "host_id": + hostId, ok := value.(UUID) + if !ok { + return fmt.Errorf(assertErrorMsg, "host_id"), nil + } + host.hostId = hostId.String() + case "release_version": + version, ok := value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "release_version"), nil + } + host.version.Set(version) + case "peer": + ip, ok := value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "peer"), nil + } + host.peer = net.ParseIP(ip) + case "cluster_name": + host.clusterName, ok = value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "cluster_name"), nil + } + case "partitioner": + host.partitioner, ok = value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "partitioner"), nil + } + case "broadcast_address": + ip, ok := value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "broadcast_address"), nil + } + host.broadcastAddress = net.ParseIP(ip) + case "preferred_ip": + ip, ok := value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "preferred_ip"), nil + } + host.preferredIP = net.ParseIP(ip) + case "rpc_address": + ip, ok := value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "rpc_address"), nil + } + host.rpcAddress = net.ParseIP(ip) + case "listen_address": + ip, ok := value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "listen_address"), nil + } + host.listenAddress = net.ParseIP(ip) + case "workload": + host.workload, ok = value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "workload"), nil + } + case "graph": + host.graph, ok = value.(bool) + if !ok { + return fmt.Errorf(assertErrorMsg, "graph"), nil + } + case "tokens": + host.tokens, ok = value.([]string) + if !ok { + return fmt.Errorf(assertErrorMsg, "tokens"), nil + } + case "dse_version": + host.dseVersion, ok = value.(string) + if !ok { + return fmt.Errorf(assertErrorMsg, "dse_version"), nil } } + // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete + // Not sure what the port field will be called until the JIRA issue is complete + } - return false, err + // Default to our connected port if the cluster doesn't have port information + if host.port == 0 { + host.port = r.session.cfg.Port } - return true, nil + return nil, &host } -func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) { - r.mu.Lock() - defer r.mu.Unlock() - // we need conn to be the same because we need to query system.peers and system.local - // on the same node to get the whole cluster - - const ( - legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local" - // only supported in 2.2.0, 2.1.6, 2.0.16 - localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local" - ) - - localHost := &HostInfo{} - if r.localHasRpcAddr { - iter := r.session.control.query(localQuery) - if iter == nil { - return r.prevHosts, r.prevPartitioner, nil - } +// Ask the control node for it's local host information +func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) { + row := make(map[string]interface{}) - iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack, - &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version) + // ask the connected node for local host info + it := r.session.control.query("SELECT * FROM system.local WHERE key='local'") + if it == nil { + return nil, errors.New("Attempted to query 'system.local' on a closed control connection") + } - if err = iter.Close(); err != nil { - return nil, "", err - } - } else { - iter := r.session.control.withConn(func(c *Conn) *Iter { - localHost = c.host - return c.query(legacyLocalQuery) - }) + // expect only 1 row + it.MapScan(row) + if err := it.Close(); err != nil { + return nil, err + } - if iter == nil { - return r.prevHosts, r.prevPartitioner, nil - } + // extract all available info about the host + err, host := r.hostInfoFromMap(row) + if err != nil { + return nil, err + } - iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version) + return host, err +} - if err = iter.Close(); err != nil { - return nil, "", err - } +// Given an ip address and port, return a peer that matched the ip address +func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) { + row := make(map[string]interface{}) + + it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip) + if it == nil { + return nil, errors.New("Attempted to query 'system.peers' on a closed control connection") + } + + // expect only 1 row + it.MapScan(row) + if err := it.Close(); err != nil { + return nil, err + } + + // extract all available info about the host + err, host := r.hostInfoFromMap(row) + if err != nil { + return nil, err } - localHost.port = r.session.cfg.Port + return host, err +} - hosts = []*HostInfo{localHost} +// Ask the control node for host info on all it's known peers +func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) { + var hosts []*HostInfo - rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner() - if rows == nil { - return r.prevHosts, r.prevPartitioner, nil + // Ask the node for a list of it's peers + it := r.session.control.query("SELECT * FROM system.peers") + if it == nil { + return nil, errors.New("Attempted to query 'system.peers' on a closed connection") } - for rows.Next() { - host := &HostInfo{port: r.session.cfg.Port} - err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) + for { + row := make(map[string]interface{}) + if !it.MapScan(row) { + break + } + // extract all available info about the peer + err, host := r.hostInfoFromMap(row) if err != nil { - Logger.Println(err) - continue + return nil, err } - if r.matchFilter(host) { - hosts = append(hosts, host) + // If it's not a valid peer + if !r.IsValidPeer(host) { + Logger.Printf("Found invalid peer '%+v' "+ + "Likely due to a gossip or snitch issue, this host will be ignored", host) + continue } + hosts = append(hosts, host) + } + if it.err != nil { + return nil, errors.Wrap(it.err, "GetClusterPeerInfo()") } + return hosts, nil +} + +// Return true if the host is a valid peer +func (r *ringDescriber) IsValidPeer(host *HostInfo) bool { + return !(len(host.RPCAddress()) == 0 || + host.hostId == "" || + host.dataCenter == "" || + host.rack == "" || + len(host.tokens) == 0) +} - if err = rows.Err(); err != nil { - return nil, "", err +// Return a list of hosts the cluster knows about +func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { + r.mu.Lock() + defer r.mu.Unlock() + + // Update the localHost info with data from the connected host + localHost, err := r.GetLocalHostInfo() + if err != nil { + return r.prevHosts, r.prevPartitioner, err + } + + // Update our list of hosts by querying the cluster + hosts, err := r.GetClusterPeerInfo() + if err != nil { + return r.prevHosts, r.prevPartitioner, err } - r.prevHosts = hosts - r.prevPartitioner = partitioner + hosts = append(hosts, localHost) - return hosts, partitioner, nil + // Filter the hosts if filter is provided + var filteredHosts []*HostInfo + if r.session.cfg.HostFilter != nil { + for _, host := range hosts { + if r.session.cfg.HostFilter.Accept(host) { + filteredHosts = append(filteredHosts, host) + } + } + } else { + filteredHosts = hosts + } + + r.prevHosts = filteredHosts + r.prevPartitioner = localHost.partitioner + r.localHost = localHost + + return filteredHosts, localHost.partitioner, nil } -func (r *ringDescriber) matchFilter(host *HostInfo) bool { - if r.dcFilter != "" && r.dcFilter != host.DataCenter() { - return false +// Given an ip/port return HostInfo for the specified ip/port +func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) { + var host *HostInfo + var err error + + // TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup? + // TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true? + + // Ignore the port and connect address and use the address/port we already have + if r.session.control == nil || r.session.cfg.IgnorePeerAddr { + return &HostInfo{connectAddress: ip, port: port}, nil + } + + // Attempt to get the host info for our control connection + controlHost := r.session.control.GetHostInfo() + if controlHost == nil { + return nil, errors.New("invalid control connection") + } + + // If we are asking about the same node our control connection has a connection too + if controlHost.ConnectAddress().Equal(ip) { + host, err = r.GetLocalHostInfo() + } else { + host, err = r.GetPeerHostInfo(ip, port) + } + + // No host was found matching this ip/port + if err != nil { + return nil, err } - if r.rackFilter != "" && r.rackFilter != host.Rack() { - return false + // Apply host filter to the result + if r.session.cfg.HostFilter != nil && r.session.cfg.HostFilter.Accept(host) != true { + return nil, err } - return true + return host, err } func (r *ringDescriber) refreshRing() error { @@ -386,12 +651,10 @@ func (r *ringDescriber) refreshRing() error { // TODO: move this to session // TODO: handle removing hosts here for _, h := range hosts { - if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) { - if host, ok := r.session.ring.addHostIfMissing(h); !ok { - r.session.pool.addHost(h) - } else { - host.update(h) - } + if host, ok := r.session.ring.addHostIfMissing(h); !ok { + r.session.pool.addHost(h) + } else { + host.update(h) } } diff --git a/host_source_test.go b/host_source_test.go index 4131122aa..eddb67f32 100644 --- a/host_source_test.go +++ b/host_source_test.go @@ -1,6 +1,12 @@ +// +build all integration + package gocql -import "testing" +import ( + "fmt" + "net" + "testing" +) func TestUnmarshalCassVersion(t *testing.T) { tests := [...]struct { @@ -42,3 +48,59 @@ func TestCassVersionBefore(t *testing.T) { } } + +func TestIsValidPeer(t *testing.T) { + ring := ringDescriber{} + host := &HostInfo{ + rpcAddress: net.ParseIP("0.0.0.0"), + rack: "myRack", + hostId: "0", + dataCenter: "datacenter", + tokens: []string{"0", "1"}, + } + + if !ring.IsValidPeer(host) { + t.Errorf("expected %+v to be a valid peer", host) + } + + host.rack = "" + if ring.IsValidPeer(host) { + t.Errorf("expected %+v to NOT be a valid peer", host) + } +} + +func TestGetHosts(t *testing.T) { + cluster := createCluster() + session := createSessionFromCluster(cluster, t) + + hosts, partitioner, err := session.hostSource.GetHosts() + + assertTrue(t, "err == nil", err == nil) + assertTrue(t, "len(hosts) == 3", len(hosts) == 3) + assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0) + +} + +func TestGetHostsWithFilter(t *testing.T) { + filterHostIP := net.ParseIP("127.0.0.3") + cluster := createCluster() + + // Filter to remove one of the localhost nodes + cluster.HostFilter = HostFilterFunc(func(host *HostInfo) bool { + if host.ConnectAddress().Equal(filterHostIP) { + return false + } + return true + }) + session := createSessionFromCluster(cluster, t) + + hosts, partitioner, err := session.hostSource.GetHosts() + assertTrue(t, "err == nil", err == nil) + assertTrue(t, "len(hosts) == 2", len(hosts) == 2) + assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0) + for _, host := range hosts { + if host.ConnectAddress().Equal(filterHostIP) { + t.Fatal(fmt.Sprintf("Did not expect to see '%q' in host list", filterHostIP)) + } + } +} diff --git a/policies.go b/policies.go index eb549a15a..6be8bb8a4 100644 --- a/policies.go +++ b/policies.go @@ -102,7 +102,7 @@ func (c *cowHostList) remove(ip net.IP) bool { found := false newL := make([]*HostInfo, 0, size) for i := 0; i < len(l); i++ { - if !l[i].Peer().Equal(ip) { + if !l[i].ConnectAddress().Equal(ip) { newL = append(newL, l[i]) } else { found = true @@ -236,7 +236,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) { } func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) { - r.hosts.remove(host.Peer()) + r.hosts.remove(host.ConnectAddress()) } func (r *roundRobinHostPolicy) HostUp(host *HostInfo) { @@ -279,7 +279,7 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { } func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { - t.hosts.remove(host.Peer()) + t.hosts.remove(host.ConnectAddress()) t.fallback.RemoveHost(host) t.resetTokenRing() @@ -393,7 +393,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { hostMap := make(map[string]*HostInfo, len(hosts)) for i, host := range hosts { - ip := host.Peer().String() + ip := host.ConnectAddress().String() peers[i] = ip hostMap[ip] = host } @@ -405,7 +405,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { } func (r *hostPoolHostPolicy) AddHost(host *HostInfo) { - ip := host.Peer().String() + ip := host.ConnectAddress().String() r.mu.Lock() defer r.mu.Unlock() @@ -426,7 +426,7 @@ func (r *hostPoolHostPolicy) AddHost(host *HostInfo) { } func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) { - ip := host.Peer().String() + ip := host.ConnectAddress().String() r.mu.Lock() defer r.mu.Unlock() @@ -438,7 +438,7 @@ func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) { delete(r.hostMap, ip) hosts := make([]string, 0, len(r.hostMap)) for _, host := range r.hostMap { - hosts = append(hosts, host.Peer().String()) + hosts = append(hosts, host.ConnectAddress().String()) } r.hp.SetHosts(hosts) @@ -492,7 +492,7 @@ func (host selectedHostPoolHost) Info() *HostInfo { } func (host selectedHostPoolHost) Mark(err error) { - ip := host.info.Peer().String() + ip := host.info.ConnectAddress().String() host.policy.mu.RLock() defer host.policy.mu.RUnlock() diff --git a/policies_test.go b/policies_test.go index 957e31270..ff43e9927 100644 --- a/policies_test.go +++ b/policies_test.go @@ -17,8 +17,8 @@ func TestRoundRobinHostPolicy(t *testing.T) { policy := RoundRobinHostPolicy() hosts := [...]*HostInfo{ - {hostId: "0", peer: net.IPv4(0, 0, 0, 1)}, - {hostId: "1", peer: net.IPv4(0, 0, 0, 2)}, + {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)}, + {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)}, } for _, host := range hosts { @@ -68,10 +68,10 @@ func TestTokenAwareHostPolicy(t *testing.T) { // set the hosts hosts := [...]*HostInfo{ - {peer: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}}, - {peer: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}}, - {peer: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}}, - {peer: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}}, + {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}}, + {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}}, + {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}}, + {connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}}, } for _, host := range hosts { policy.AddHost(host) @@ -79,13 +79,13 @@ func TestTokenAwareHostPolicy(t *testing.T) { // the token ring is not setup without the partitioner, but the fallback // should work - if actual := policy.Pick(nil)(); !actual.Info().Peer().Equal(hosts[0].peer) { - t.Errorf("Expected peer 0 but was %s", actual.Info().Peer()) + if actual := policy.Pick(nil)(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) { + t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress()) } query.RoutingKey([]byte("30")) - if actual := policy.Pick(query)(); !actual.Info().Peer().Equal(hosts[1].peer) { - t.Errorf("Expected peer 1 but was %s", actual.Info().Peer()) + if actual := policy.Pick(query)(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) { + t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress()) } policy.SetPartitioner("OrderedPartitioner") @@ -93,18 +93,18 @@ func TestTokenAwareHostPolicy(t *testing.T) { // now the token ring is configured query.RoutingKey([]byte("20")) iter = policy.Pick(query) - if actual := iter(); !actual.Info().Peer().Equal(hosts[1].peer) { - t.Errorf("Expected peer 1 but was %s", actual.Info().Peer()) + if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) { + t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress()) } // rest are round robin - if actual := iter(); !actual.Info().Peer().Equal(hosts[2].peer) { - t.Errorf("Expected peer 2 but was %s", actual.Info().Peer()) + if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[2].ConnectAddress()) { + t.Errorf("Expected peer 2 but was %s", actual.Info().ConnectAddress()) } - if actual := iter(); !actual.Info().Peer().Equal(hosts[3].peer) { - t.Errorf("Expected peer 3 but was %s", actual.Info().Peer()) + if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[3].ConnectAddress()) { + t.Errorf("Expected peer 3 but was %s", actual.Info().ConnectAddress()) } - if actual := iter(); !actual.Info().Peer().Equal(hosts[0].peer) { - t.Errorf("Expected peer 0 but was %s", actual.Info().Peer()) + if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) { + t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress()) } } @@ -113,8 +113,8 @@ func TestHostPoolHostPolicy(t *testing.T) { policy := HostPoolHostPolicy(hostpool.New(nil)) hosts := []*HostInfo{ - {hostId: "0", peer: net.IPv4(10, 0, 0, 0)}, - {hostId: "1", peer: net.IPv4(10, 0, 0, 1)}, + {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)}, + {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)}, } // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map @@ -178,10 +178,10 @@ func TestTokenAwareNilHostInfo(t *testing.T) { policy := TokenAwareHostPolicy(RoundRobinHostPolicy()) hosts := [...]*HostInfo{ - {peer: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}}, - {peer: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}}, - {peer: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}}, - {peer: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}}, + {connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}}, + {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}}, + {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}}, + {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}}, } for _, host := range hosts { policy.AddHost(host) @@ -197,8 +197,8 @@ func TestTokenAwareNilHostInfo(t *testing.T) { t.Fatal("got nil host") } else if v := next.Info(); v == nil { t.Fatal("got nil HostInfo") - } else if !v.Peer().Equal(hosts[1].peer) { - t.Fatalf("expected peer 1 got %v", v.Peer()) + } else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) { + t.Fatalf("expected peer 1 got %v", v.ConnectAddress()) } // Empty the hosts to trigger the panic when using the fallback. @@ -221,7 +221,7 @@ func TestCOWList_Add(t *testing.T) { toAdd := [...]net.IP{net.IPv4(0, 0, 0, 0), net.IPv4(1, 0, 0, 0), net.IPv4(2, 0, 0, 0)} for _, addr := range toAdd { - if !cow.add(&HostInfo{peer: addr}) { + if !cow.add(&HostInfo{connectAddress: addr}) { t.Fatal("did not add peer which was not in the set") } } @@ -233,7 +233,7 @@ func TestCOWList_Add(t *testing.T) { set := make(map[string]bool) for _, host := range hosts { - set[string(host.Peer())] = true + set[string(host.ConnectAddress())] = true } for _, addr := range toAdd { diff --git a/ring.go b/ring.go index 45ace957f..2789e9ebe 100644 --- a/ring.go +++ b/ring.go @@ -53,7 +53,7 @@ func (r *ring) allHosts() []*HostInfo { } func (r *ring) addHost(host *HostInfo) bool { - ip := host.Peer().String() + ip := host.ConnectAddress().String() r.mu.Lock() if r.hosts == nil { @@ -79,7 +79,7 @@ func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { } func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { - ip := host.Peer().String() + ip := host.ConnectAddress().String() r.mu.Lock() if r.hosts == nil { @@ -106,7 +106,7 @@ func (r *ring) removeHost(ip net.IP) bool { _, ok := r.hosts[k] if ok { for i, host := range r.hostList { - if host.Peer().Equal(ip) { + if host.ConnectAddress().Equal(ip) { r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) break } diff --git a/ring_test.go b/ring_test.go index feea8d2ca..88e593a32 100644 --- a/ring_test.go +++ b/ring_test.go @@ -8,7 +8,7 @@ import ( func TestRing_AddHostIfMissing_Missing(t *testing.T) { ring := &ring{} - host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)} + host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)} h1, ok := ring.addHostIfMissing(host) if ok { t.Fatal("host was reported as already existing") @@ -22,10 +22,10 @@ func TestRing_AddHostIfMissing_Missing(t *testing.T) { func TestRing_AddHostIfMissing_Existing(t *testing.T) { ring := &ring{} - host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)} + host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)} ring.addHostIfMissing(host) - h2 := &HostInfo{peer: net.IPv4(1, 1, 1, 1)} + h2 := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)} h1, ok := ring.addHostIfMissing(h2) if !ok { diff --git a/session.go b/session.go index 8cb1310eb..5b6b6dca7 100644 --- a/session.go +++ b/session.go @@ -111,8 +111,7 @@ func NewSession(cfg ClusterConfig) (*Session, error) { s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo) s.hostSource = &ringDescriber{ - session: s, - closeChan: make(chan bool), + session: s, } if cfg.PoolConfig.HostSelectionPolicy == nil { @@ -174,26 +173,19 @@ func (s *Session) init() error { return err } - // need to setup host source to check for broadcast_address in system.local - localHasRPCAddr, _ := checkSystemLocal(s.control) - s.hostSource.localHasRpcAddr = localHasRPCAddr - if !s.cfg.DisableInitialHostLookup { - // TODO(zariel): we need to get the partitioner from here - var p string - hosts, p, err = s.hostSource.GetHosts() + var partitioner string + hosts, partitioner, err = s.hostSource.GetHosts() if err != nil { return err } - s.policy.SetPartitioner(p) + s.policy.SetPartitioner(partitioner) } } for _, host := range hosts { - if s.cfg.HostFilter == nil || s.cfg.HostFilter.Accept(host) { - host = s.ring.addOrUpdate(host) - s.handleNodeUp(host.Peer(), host.Port(), false) - } + host = s.ring.addOrUpdate(host) + s.handleNodeUp(host.ConnectAddress(), host.Port(), false) } // TODO(zariel): we probably dont need this any more as we verify that we @@ -234,7 +226,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { if gocqlDebug { buf := bytes.NewBufferString("Session.ring:") for _, h := range hosts { - buf.WriteString("[" + h.Peer().String() + ":" + h.State().String() + "]") + buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") } Logger.Println(buf.String()) } @@ -243,7 +235,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { if h.IsUp() { continue } - s.handleNodeUp(h.Peer(), h.Port(), true) + s.handleNodeUp(h.ConnectAddress(), h.Port(), true) } case <-s.quit: return @@ -344,10 +336,6 @@ func (s *Session) Close() { s.pool.Close() } - if s.hostSource != nil { - close(s.hostSource.closeChan) - } - if s.control != nil { s.control.close() } diff --git a/session_connect_test.go b/session_connect_test.go index 5b1629506..eecbf803e 100644 --- a/session_connect_test.go +++ b/session_connect_test.go @@ -1,12 +1,13 @@ package gocql import ( - "golang.org/x/net/context" "net" "strconv" "sync" "testing" "time" + + "golang.org/x/net/context" ) type OneConnTestServer struct { @@ -103,8 +104,8 @@ func TestSession_connect_WithNoTranslator(t *testing.T) { go srvr.Serve() Connect(&HostInfo{ - peer: srvr.Addr, - port: srvr.Port, + connectAddress: srvr.Addr, + port: srvr.Port, }, session.connCfg, testConnErrorHandler(t), session) assertConnectionEventually(t, 500*time.Millisecond, srvr) @@ -123,8 +124,8 @@ func TestSession_connect_WithTranslator(t *testing.T) { // the provided address will be translated Connect(&HostInfo{ - peer: net.ParseIP("10.10.10.10"), - port: 5432, + connectAddress: net.ParseIP("10.10.10.10"), + port: 5432, }, session.connCfg, testConnErrorHandler(t), session) assertConnectionEventually(t, 500*time.Millisecond, srvr) diff --git a/token.go b/token.go index 1113b9276..edc37302b 100644 --- a/token.go +++ b/token.go @@ -184,7 +184,7 @@ func (t *tokenRing) String() string { buf.WriteString("]") buf.WriteString(t.tokens[i].String()) buf.WriteString(":") - buf.WriteString(t.hosts[i].Peer().String()) + buf.WriteString(t.hosts[i].ConnectAddress().String()) } buf.WriteString("\n}") return string(buf.Bytes()) diff --git a/token_test.go b/token_test.go index 9d78daa85..b71ff74cc 100644 --- a/token_test.go +++ b/token_test.go @@ -232,8 +232,8 @@ func hostsForTests(n int) []*HostInfo { hosts := make([]*HostInfo, n) for i := 0; i < n; i++ { host := &HostInfo{ - peer: net.IPv4(1, 1, 1, byte(n)), - tokens: []string{fmt.Sprintf("%d", n)}, + connectAddress: net.IPv4(1, 1, 1, byte(n)), + tokens: []string{fmt.Sprintf("%d", n)}, } hosts[i] = host @@ -254,19 +254,20 @@ func TestMurmur3TokenRing(t *testing.T) { for _, host := range hosts { actual := ring.GetHostForToken(p.ParseString(host.tokens[0])) - if !actual.Peer().Equal(host.peer) { - t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer) + if !actual.ConnectAddress().Equal(host.ConnectAddress()) { + t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(), + host.tokens[0], actual.ConnectAddress()) } } actual := ring.GetHostForToken(p.ParseString("12")) - if !actual.Peer().Equal(hosts[1].peer) { - t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer()) + if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) { + t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress()) } actual = ring.GetHostForToken(p.ParseString("24324545443332")) - if !actual.Peer().Equal(hosts[0].peer) { - t.Errorf("Expected peer 0 for token \"24324545443332\", but was %s", actual.Peer()) + if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) { + t.Errorf("Expected address 0 for token \"24324545443332\", but was %s", actual.ConnectAddress()) } } @@ -285,19 +286,20 @@ func TestOrderedTokenRing(t *testing.T) { var actual *HostInfo for _, host := range hosts { actual = ring.GetHostForToken(p.ParseString(host.tokens[0])) - if !actual.Peer().Equal(host.peer) { - t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer) + if !actual.ConnectAddress().Equal(host.ConnectAddress()) { + t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(), + host.tokens[0], actual.ConnectAddress()) } } actual = ring.GetHostForToken(p.ParseString("12")) if !actual.peer.Equal(hosts[1].peer) { - t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer()) + t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress()) } actual = ring.GetHostForToken(p.ParseString("24324545443332")) - if !actual.peer.Equal(hosts[1].peer) { - t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer()) + if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) { + t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress()) } } @@ -315,18 +317,19 @@ func TestRandomTokenRing(t *testing.T) { var actual *HostInfo for _, host := range hosts { actual = ring.GetHostForToken(p.ParseString(host.tokens[0])) - if !actual.Peer().Equal(host.peer) { - t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer) + if !actual.ConnectAddress().Equal(host.ConnectAddress()) { + t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(), + host.tokens[0], actual.ConnectAddress()) } } actual = ring.GetHostForToken(p.ParseString("12")) if !actual.peer.Equal(hosts[1].peer) { - t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer()) + t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress()) } actual = ring.GetHostForToken(p.ParseString("24324545443332")) - if !actual.peer.Equal(hosts[0].peer) { - t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer()) + if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) { + t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress()) } }