diff --git a/cassandra_test.go b/cassandra_test.go index 5b9f0d622..0de1d2651 100644 --- a/cassandra_test.go +++ b/cassandra_test.go @@ -67,7 +67,7 @@ func TestRingDiscovery(t *testing.T) { if *clusterSize != size { for p, pool := range session.pool.hostConnPools { - t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String()) + t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.Peer().String()) } t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size) @@ -577,7 +577,7 @@ func TestReconnection(t *testing.T) { defer session.Close() h := session.ring.allHosts()[0] - session.handleNodeDown(h.ConnectAddress(), h.Port()) + session.handleNodeDown(h.Peer(), h.Port()) if h.State() != NodeDown { t.Fatal("Host should be NodeDown but not.") @@ -2416,8 +2416,8 @@ func TestDiscoverViaProxy(t *testing.T) { session := createSessionFromCluster(cluster, t) defer session.Close() - if session.hostSource.localHost.BroadcastAddress() == nil { - t.Skip("Target cluster does not have broadcast_address in system.local.") + if !session.hostSource.localHasRpcAddr { + t.Skip("Target cluster does not have rpc_address in system.local.") goto close } diff --git a/common_test.go b/common_test.go index 27552b8f7..92cd15aba 100644 --- a/common_test.go +++ b/common_test.go @@ -4,11 +4,11 @@ import ( "flag" "fmt" "log" - "net" "strings" "sync" "testing" "time" + "net" ) var ( @@ -154,9 +154,9 @@ func createTestSession() *Session { config.IgnorePeerAddr = true config.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy() session := &Session{ - cfg: *config, + cfg: *config, connCfg: &ConnConfig{ - Timeout: 10 * time.Millisecond, + Timeout: 10*time.Millisecond, Keepalive: 0, }, policy: config.PoolConfig.HostSelectionPolicy, diff --git a/conn.go b/conn.go index 8d2be9532..942b08d4a 100644 --- a/conn.go +++ b/conn.go @@ -156,8 +156,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses // TODO(zariel): remove these if host == nil { panic("host is nil") - } else if len(host.ConnectAddress()) == 0 { - panic("host missing connect ip address") + } else if len(host.Peer()) == 0 { + panic("host missing peer ip address") } else if host.Port() == 0 { panic("host missing port") } @@ -172,7 +172,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses } // TODO(zariel): handle ipv6 zone - translatedPeer, translatedPort := session.cfg.translateAddressPort(host.ConnectAddress(), host.Port()) + translatedPeer, translatedPort := session.cfg.translateAddressPort(host.Peer(), host.Port()) addr := (&net.TCPAddr{IP: translatedPeer, Port: translatedPort}).String() //addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String() diff --git a/connectionpool.go b/connectionpool.go index 745698403..2a74184cd 100644 --- a/connectionpool.go +++ b/connectionpool.go @@ -128,7 +128,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) { // don't create a connection pool for a down host continue } - ip := host.ConnectAddress().String() + ip := host.Peer().String() if _, exists := p.hostConnPools[ip]; exists { // still have this host, so don't remove it delete(toRemove, ip) @@ -154,7 +154,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) { createCount-- if pool.Size() > 0 { // add pool only if there a connections available - p.hostConnPools[string(pool.host.ConnectAddress())] = pool + p.hostConnPools[string(pool.host.Peer())] = pool } } @@ -177,7 +177,7 @@ func (p *policyConnPool) Size() int { } func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) { - ip := host.ConnectAddress().String() + ip := host.Peer().String() p.mu.RLock() pool, ok = p.hostConnPools[ip] p.mu.RUnlock() @@ -196,7 +196,7 @@ func (p *policyConnPool) Close() { } func (p *policyConnPool) addHost(host *HostInfo) { - ip := host.ConnectAddress().String() + ip := host.Peer().String() p.mu.Lock() pool, ok := p.hostConnPools[ip] if !ok { @@ -274,7 +274,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int, session: session, host: host, port: port, - addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(), + addr: (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(), size: size, keyspace: keyspace, conns: make([]*Conn, 0, size), @@ -398,7 +398,7 @@ func (pool *hostConnPool) fill() { // this is call with the connection pool mutex held, this call will // then recursively try to lock it again. FIXME - go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port) + go pool.session.handleNodeDown(pool.host.Peer(), pool.port) return } @@ -420,7 +420,7 @@ func (pool *hostConnPool) logConnectErr(err error) { // connection refused // these are typical during a node outage so avoid log spam. if gocqlDebug { - Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err) + Logger.Printf("unable to dial %q: %v\n", pool.host.Peer(), err) } } else if err != nil { // unexpected error diff --git a/control.go b/control.go index 7cd675c6d..7089d76b7 100644 --- a/control.go +++ b/control.go @@ -134,7 +134,7 @@ func hostInfo(addr string, defaultPort int) (*HostInfo, error) { } - return &HostInfo{connectAddress: ip, port: port}, nil + return &HostInfo{peer: ip, port: port}, nil } func shuffleHosts(hosts []*HostInfo) []*HostInfo { @@ -163,7 +163,7 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) { return conn, nil } - Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.ConnectAddress(), err) + Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.Peer(), err) } return nil, err @@ -316,7 +316,7 @@ func (c *controlConn) reconnect(refreshring bool) { if err != nil { // host is dead // TODO: this is replicated in a few places - c.session.handleNodeDown(host.ConnectAddress(), host.Port()) + c.session.handleNodeDown(host.Peer(), host.Port()) } else { newConn = conn } @@ -422,13 +422,52 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter return } +func (c *controlConn) fetchHostInfo(ip net.IP, port int) (*HostInfo, error) { + // TODO(zariel): we should probably move this into host_source or atleast + // share code with it. + localHost := c.host() + if localHost == nil { + return nil, errors.New("unable to fetch host info, invalid conn host") + } + + isLocal := localHost.Peer().Equal(ip) + + var fn func(*HostInfo) error + + // TODO(zariel): fetch preferred_ip address (is it >3.x only?) + if isLocal { + fn = func(host *HostInfo) error { + iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.local WHERE key='local'") + iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) + return iter.Close() + } + } else { + fn = func(host *HostInfo) error { + iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.peers WHERE peer=?", ip) + iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) + return iter.Close() + } + } + + host := &HostInfo{ + port: port, + peer: ip, + } + + if err := fn(host); err != nil { + return nil, err + } + + return host, nil +} + func (c *controlConn) awaitSchemaAgreement() error { return c.withConn(func(conn *Conn) *Iter { return &Iter{err: conn.awaitSchemaAgreement()} }).err } -func (c *controlConn) GetHostInfo() *HostInfo { +func (c *controlConn) host() *HostInfo { conn := c.conn.Load().(*Conn) if conn == nil { return nil diff --git a/control_test.go b/control_test.go index a63a8d2ee..c4e965587 100644 --- a/control_test.go +++ b/control_test.go @@ -24,8 +24,8 @@ func TestHostInfo_Lookup(t *testing.T) { continue } - if !host.ConnectAddress().Equal(test.ip) { - t.Errorf("expected ip %v got %v for addr %q", test.ip, host.ConnectAddress(), test.addr) + if !host.peer.Equal(test.ip) { + t.Errorf("expected ip %v got %v for addr %q", test.ip, host.peer, test.addr) } } } diff --git a/events.go b/events.go index 64f49e6e7..75a12b7bd 100644 --- a/events.go +++ b/events.go @@ -174,15 +174,23 @@ func (s *Session) handleNodeEvent(frames []frame) { } func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) { - // Get host info and apply any filters to the host - hostInfo, err := s.hostSource.GetHostInfo(ip, port) - if err != nil { - Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err) - return + var hostInfo *HostInfo + if s.control != nil && !s.cfg.IgnorePeerAddr { + var err error + hostInfo, err = s.control.fetchHostInfo(ip, port) + if err != nil { + Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err) + return + } + } else { + hostInfo = &HostInfo{peer: ip, port: port} + } + + if s.cfg.IgnorePeerAddr && hostInfo.Peer().Equal(ip) { + hostInfo.setPeer(ip) } - // If hostInfo is nil, this host was filtered out by cfg.HostFilter - if hostInfo == nil { + if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(hostInfo) { return } @@ -208,7 +216,7 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) { // we remove all nodes but only add ones which pass the filter host := s.ring.getHost(ip) if host == nil { - host = &HostInfo{connectAddress: ip, port: port} + host = &HostInfo{peer: ip, port: port} } if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { @@ -232,9 +240,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) { host := s.ring.getHost(ip) if host != nil { - if s.cfg.IgnorePeerAddr && host.ConnectAddress().Equal(ip) { + if s.cfg.IgnorePeerAddr && host.Peer().Equal(ip) { // TODO: how can this ever be true? - host.SetConnectAddress(ip) + host.setPeer(ip) } if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { @@ -261,7 +269,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) { host := s.ring.getHost(ip) if host == nil { - host = &HostInfo{connectAddress: ip, port: port} + host = &HostInfo{peer: ip, port: port} } if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) { diff --git a/filters.go b/filters.go index 234b94bd6..807a2cf47 100644 --- a/filters.go +++ b/filters.go @@ -48,10 +48,10 @@ func WhiteListHostFilter(hosts ...string) HostFilter { m := make(map[string]bool, len(hostInfos)) for _, host := range hostInfos { - m[string(host.ConnectAddress())] = true + m[string(host.peer)] = true } return HostFilterFunc(func(host *HostInfo) bool { - return m[string(host.ConnectAddress())] + return m[string(host.Peer())] }) } diff --git a/filters_test.go b/filters_test.go index 1ccf1a1ca..4ce0a6ccf 100644 --- a/filters_test.go +++ b/filters_test.go @@ -17,7 +17,7 @@ func TestFilter_WhiteList(t *testing.T) { } for i, test := range tests { - if f.Accept(&HostInfo{connectAddress: test.addr}) { + if f.Accept(&HostInfo{peer: test.addr}) { if !test.accept { t.Errorf("%d: should not have been accepted but was", i) } @@ -39,7 +39,7 @@ func TestFilter_AllowAll(t *testing.T) { } for i, test := range tests { - if f.Accept(&HostInfo{connectAddress: test.addr}) { + if f.Accept(&HostInfo{peer: test.addr}) { if !test.accept { t.Errorf("%d: should not have been accepted but was", i) } @@ -61,7 +61,7 @@ func TestFilter_DenyAll(t *testing.T) { } for i, test := range tests { - if f.Accept(&HostInfo{connectAddress: test.addr}) { + if f.Accept(&HostInfo{peer: test.addr}) { if !test.accept { t.Errorf("%d: should not have been accepted but was", i) } diff --git a/host_source.go b/host_source.go index e08bcc17c..c3944f3cc 100644 --- a/host_source.go +++ b/host_source.go @@ -7,12 +7,8 @@ import ( "strings" "sync" "time" - - "github.com/pkg/errors" ) -const assertErrorMsg = "Assertion failed for %s" - type nodeState int32 func (n nodeState) String() string { @@ -102,25 +98,15 @@ func (c cassVersion) nodeUpDelay() time.Duration { type HostInfo struct { // TODO(zariel): reduce locking maybe, not all values will change, but to ensure // that we are thread safe use a mutex to access all fields. - mu sync.RWMutex - peer net.IP - broadcastAddress net.IP - listenAddress net.IP - rpcAddress net.IP - preferredIP net.IP - connectAddress net.IP - port int - dataCenter string - rack string - hostId string - workload string - graph bool - dseVersion string - partitioner string - clusterName string - version cassVersion - state nodeState - tokens []string + mu sync.RWMutex + peer net.IP + port int + dataCenter string + rack string + hostId string + version cassVersion + state nodeState + tokens []string } func (h *HostInfo) Equal(host *HostInfo) bool { @@ -129,7 +115,7 @@ func (h *HostInfo) Equal(host *HostInfo) bool { host.mu.RLock() defer host.mu.RUnlock() - return h.ConnectAddress().Equal(host.ConnectAddress()) + return h.peer.Equal(host.peer) } func (h *HostInfo) Peer() net.IP { @@ -145,57 +131,6 @@ func (h *HostInfo) setPeer(peer net.IP) *HostInfo { return h } -// Returns the address that should be used to connect to the host -// This defaults to 'broadcast_address', then falls back to 'peer' -// This is to maintain existing functionality. If you wish to -// override this, use an AddressTranslator or use a HostFilter -// to SetConnectAddress() -func (h *HostInfo) ConnectAddress() net.IP { - h.mu.RLock() - defer h.mu.RUnlock() - - if h.connectAddress == nil { - if h.broadcastAddress != nil { - return h.broadcastAddress - } - if h.peer != nil { - return h.peer - } - } - return h.connectAddress -} - -func (h *HostInfo) SetConnectAddress(address net.IP) *HostInfo { - h.mu.Lock() - defer h.mu.Unlock() - h.connectAddress = address - return h -} - -func (h *HostInfo) BroadcastAddress() net.IP { - h.mu.RLock() - defer h.mu.RUnlock() - return h.broadcastAddress -} - -func (h *HostInfo) ListenAddress() net.IP { - h.mu.RLock() - defer h.mu.RUnlock() - return h.listenAddress -} - -func (h *HostInfo) RPCAddress() net.IP { - h.mu.RLock() - defer h.mu.RUnlock() - return h.rpcAddress -} - -func (h *HostInfo) PreferredIP() net.IP { - h.mu.RLock() - defer h.mu.RUnlock() - return h.preferredIP -} - func (h *HostInfo) DataCenter() string { h.mu.RLock() defer h.mu.RUnlock() @@ -235,36 +170,6 @@ func (h *HostInfo) setHostID(hostID string) *HostInfo { return h } -func (h *HostInfo) WorkLoad() string { - h.mu.RLock() - defer h.mu.RUnlock() - return h.workload -} - -func (h *HostInfo) Graph() bool { - h.mu.RLock() - defer h.mu.RUnlock() - return h.graph -} - -func (h *HostInfo) DSEVersion() string { - h.mu.RLock() - defer h.mu.RUnlock() - return h.dseVersion -} - -func (h *HostInfo) Partitioner() string { - h.mu.RLock() - defer h.mu.RUnlock() - return h.partitioner -} - -func (h *HostInfo) ClusterName() string { - h.mu.RLock() - defer h.mu.RUnlock() - return h.clusterName -} - func (h *HostInfo) Version() cassVersion { h.mu.RLock() defer h.mu.RUnlock() @@ -334,27 +239,28 @@ func (h *HostInfo) IsUp() bool { func (h *HostInfo) String() string { h.mu.RLock() defer h.mu.RUnlock() - return fmt.Sprintf("[HostInfo connectAddress=%q peer=%q rpc_address=%q broadcast_address=%q "+ - "port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", - h.connectAddress, h.peer, h.rpcAddress, h.broadcastAddress, - h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) + return fmt.Sprintf("[hostinfo peer=%q port=%d data_centre=%q rack=%q host_id=%q version=%q state=%s num_tokens=%d]", h.peer, h.port, h.dataCenter, h.rack, h.hostId, h.version, h.state, len(h.tokens)) } // Polls system.peers at a specific interval to find new hosts type ringDescriber struct { - session *Session + dcFilter string + rackFilter string + session *Session + closeChan chan bool + // indicates that we can use system.local to get the connections remote address + localHasRpcAddr bool + mu sync.Mutex prevHosts []*HostInfo - localHost *HostInfo prevPartitioner string } -// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces -func checkSystemSchema(control *controlConn) (bool, error) { - iter := control.query("SELECT * FROM system_schema.keyspaces") +func checkSystemLocal(control *controlConn) (bool, error) { + iter := control.query("SELECT broadcast_address FROM system.local") if err := iter.err; err != nil { if errf, ok := err.(*errorFrame); ok { - if errf.code == errReadFailure { + if errf.code == errSyntax { return false, nil } } @@ -365,277 +271,106 @@ func checkSystemSchema(control *controlConn) (bool, error) { return true, nil } -// Given a map that represents a row from either system.local or system.peers -// return as much information as we can in *HostInfo -func (r *ringDescriber) hostInfoFromMap(row map[string]interface{}) (error, *HostInfo) { - host := HostInfo{} - var ok bool - - for key, value := range row { - switch key { - case "data_center": - host.dataCenter, ok = value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "data_center"), nil - } - case "rack": - host.rack, ok = value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "rack"), nil - } - case "host_id": - hostId, ok := value.(UUID) - if !ok { - return fmt.Errorf(assertErrorMsg, "host_id"), nil - } - host.hostId = hostId.String() - case "release_version": - version, ok := value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "release_version"), nil - } - host.version.Set(version) - case "peer": - ip, ok := value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "peer"), nil - } - host.peer = net.ParseIP(ip) - case "cluster_name": - host.clusterName, ok = value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "cluster_name"), nil - } - case "partitioner": - host.partitioner, ok = value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "partitioner"), nil - } - case "broadcast_address": - ip, ok := value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "broadcast_address"), nil - } - host.broadcastAddress = net.ParseIP(ip) - case "preferred_ip": - ip, ok := value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "preferred_ip"), nil - } - host.preferredIP = net.ParseIP(ip) - case "rpc_address": - ip, ok := value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "rpc_address"), nil - } - host.rpcAddress = net.ParseIP(ip) - case "listen_address": - ip, ok := value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "listen_address"), nil - } - host.listenAddress = net.ParseIP(ip) - case "workload": - host.workload, ok = value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "workload"), nil - } - case "graph": - host.graph, ok = value.(bool) - if !ok { - return fmt.Errorf(assertErrorMsg, "graph"), nil - } - case "tokens": - host.tokens, ok = value.([]string) - if !ok { - return fmt.Errorf(assertErrorMsg, "tokens"), nil - } - case "dse_version": - host.dseVersion, ok = value.(string) - if !ok { - return fmt.Errorf(assertErrorMsg, "dse_version"), nil +// Returns true if we are using system_schema.keyspaces instead of system.schema_keyspaces +func checkSystemSchema(control *controlConn) (bool, error) { + iter := control.query("SELECT * FROM system_schema.keyspaces") + if err := iter.err; err != nil { + if errf, ok := err.(*errorFrame); ok { + if errf.code == errReadFailure { + return false, nil } } - // TODO(thrawn01): Add 'port'? once CASSANDRA-7544 is complete - // Not sure what the port field will be called until the JIRA issue is complete - } - // Default to our connected port if the cluster doesn't have port information - if host.port == 0 { - host.port = r.session.cfg.Port + return false, err } - return nil, &host + return true, nil } -// Ask the control node for it's local host information -func (r *ringDescriber) GetLocalHostInfo() (*HostInfo, error) { - row := make(map[string]interface{}) - - // ask the connected node for local host info - it := r.session.control.query("SELECT * FROM system.local WHERE key='local'") - if it == nil { - return nil, errors.New("Attempted to query 'system.local' on a closed control connection") - } - - // expect only 1 row - it.MapScan(row) - if err := it.Close(); err != nil { - return nil, err - } - - // extract all available info about the host - err, host := r.hostInfoFromMap(row) - if err != nil { - return nil, err - } +func (r *ringDescriber) GetHosts() (hosts []*HostInfo, partitioner string, err error) { + r.mu.Lock() + defer r.mu.Unlock() + // we need conn to be the same because we need to query system.peers and system.local + // on the same node to get the whole cluster + + const ( + legacyLocalQuery = "SELECT data_center, rack, host_id, tokens, partitioner, release_version FROM system.local" + // only supported in 2.2.0, 2.1.6, 2.0.16 + localQuery = "SELECT broadcast_address, data_center, rack, host_id, tokens, partitioner, release_version FROM system.local" + ) + + localHost := &HostInfo{} + if r.localHasRpcAddr { + iter := r.session.control.query(localQuery) + if iter == nil { + return r.prevHosts, r.prevPartitioner, nil + } - return host, err -} + iter.Scan(&localHost.peer, &localHost.dataCenter, &localHost.rack, + &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version) -// Given an ip address and port, return a peer that matched the ip address -func (r *ringDescriber) GetPeerHostInfo(ip net.IP, port int) (*HostInfo, error) { - row := make(map[string]interface{}) + if err = iter.Close(); err != nil { + return nil, "", err + } + } else { + iter := r.session.control.withConn(func(c *Conn) *Iter { + localHost = c.host + return c.query(legacyLocalQuery) + }) - it := r.session.control.query("SELECT * FROM system.peers WHERE peer=?", ip) - if it == nil { - return nil, errors.New("Attempted to query 'system.peers' on a closed control connection") - } + if iter == nil { + return r.prevHosts, r.prevPartitioner, nil + } - // expect only 1 row - it.MapScan(row) - if err := it.Close(); err != nil { - return nil, err - } + iter.Scan(&localHost.dataCenter, &localHost.rack, &localHost.hostId, &localHost.tokens, &partitioner, &localHost.version) - // extract all available info about the host - err, host := r.hostInfoFromMap(row) - if err != nil { - return nil, err + if err = iter.Close(); err != nil { + return nil, "", err + } } - return host, err -} + localHost.port = r.session.cfg.Port -// Ask the control node for host info on all it's known peers -func (r *ringDescriber) GetClusterPeerInfo() ([]*HostInfo, error) { - var hosts []*HostInfo + hosts = []*HostInfo{localHost} - // Ask the node for a list of it's peers - it := r.session.control.query("SELECT * FROM system.peers") - if it == nil { - return nil, errors.New("Attempted to query 'system.peers' on a closed connection") + rows := r.session.control.query("SELECT rpc_address, data_center, rack, host_id, tokens, release_version FROM system.peers").Scanner() + if rows == nil { + return r.prevHosts, r.prevPartitioner, nil } - for { - row := make(map[string]interface{}) - if !it.MapScan(row) { - break - } - // extract all available info about the peer - err, host := r.hostInfoFromMap(row) + for rows.Next() { + host := &HostInfo{port: r.session.cfg.Port} + err := rows.Scan(&host.peer, &host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version) if err != nil { - return nil, err - } - - // If it's not a valid peer - if !r.IsValidPeer(host) { - Logger.Printf("Found invalid peer '%+v' "+ - "Likely due to a gossip or snitch issue, this host will be ignored", host) + Logger.Println(err) continue } - hosts = append(hosts, host) - } - if it.err != nil { - return nil, errors.Wrap(it.err, "GetClusterPeerInfo()") - } - return hosts, nil -} - -// Return true if the host is a valid peer -func (r *ringDescriber) IsValidPeer(host *HostInfo) bool { - return !(len(host.RPCAddress()) == 0 || - host.hostId == "" || - host.dataCenter == "" || - host.rack == "" || - len(host.tokens) == 0) -} -// Return a list of hosts the cluster knows about -func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) { - r.mu.Lock() - defer r.mu.Unlock() - - // Update the localHost info with data from the connected host - localHost, err := r.GetLocalHostInfo() - if err != nil { - return r.prevHosts, r.prevPartitioner, err - } - - // Update our list of hosts by querying the cluster - hosts, err := r.GetClusterPeerInfo() - if err != nil { - return r.prevHosts, r.prevPartitioner, err - } - - hosts = append(hosts, localHost) - - // Filter the hosts if filter is provided - var filteredHosts []*HostInfo - if r.session.cfg.HostFilter != nil { - for _, host := range hosts { - if r.session.cfg.HostFilter.Accept(host) { - filteredHosts = append(filteredHosts, host) - } + if r.matchFilter(host) { + hosts = append(hosts, host) } - } else { - filteredHosts = hosts } - r.prevHosts = filteredHosts - r.prevPartitioner = localHost.partitioner - r.localHost = localHost - - return filteredHosts, localHost.partitioner, nil -} - -// Given an ip/port return HostInfo for the specified ip/port -func (r *ringDescriber) GetHostInfo(ip net.IP, port int) (*HostInfo, error) { - var host *HostInfo - var err error - - // TODO(thrawn01): Is IgnorePeerAddr still useful now that we have DisableInitialHostLookup? - // TODO(thrawn01): should we also check for DisableInitialHostLookup and return if true? - - // Ignore the port and connect address and use the address/port we already have - if r.session.control == nil || r.session.cfg.IgnorePeerAddr { - return &HostInfo{connectAddress: ip, port: port}, nil + if err = rows.Err(); err != nil { + return nil, "", err } - // Attempt to get the host info for our control connection - controlHost := r.session.control.GetHostInfo() - if controlHost == nil { - return nil, errors.New("invalid control connection") - } + r.prevHosts = hosts + r.prevPartitioner = partitioner - // If we are asking about the same node our control connection has a connection too - if controlHost.ConnectAddress().Equal(ip) { - host, err = r.GetLocalHostInfo() - } else { - host, err = r.GetPeerHostInfo(ip, port) - } + return hosts, partitioner, nil +} - // No host was found matching this ip/port - if err != nil { - return nil, err +func (r *ringDescriber) matchFilter(host *HostInfo) bool { + if r.dcFilter != "" && r.dcFilter != host.DataCenter() { + return false } - // Apply host filter to the result - if r.session.cfg.HostFilter != nil && r.session.cfg.HostFilter.Accept(host) != true { - return nil, err + if r.rackFilter != "" && r.rackFilter != host.Rack() { + return false } - return host, err + return true } func (r *ringDescriber) refreshRing() error { @@ -651,10 +386,12 @@ func (r *ringDescriber) refreshRing() error { // TODO: move this to session // TODO: handle removing hosts here for _, h := range hosts { - if host, ok := r.session.ring.addHostIfMissing(h); !ok { - r.session.pool.addHost(h) - } else { - host.update(h) + if r.session.cfg.HostFilter == nil || r.session.cfg.HostFilter.Accept(h) { + if host, ok := r.session.ring.addHostIfMissing(h); !ok { + r.session.pool.addHost(h) + } else { + host.update(h) + } } } diff --git a/host_source_test.go b/host_source_test.go index eddb67f32..4131122aa 100644 --- a/host_source_test.go +++ b/host_source_test.go @@ -1,12 +1,6 @@ -// +build all integration - package gocql -import ( - "fmt" - "net" - "testing" -) +import "testing" func TestUnmarshalCassVersion(t *testing.T) { tests := [...]struct { @@ -48,59 +42,3 @@ func TestCassVersionBefore(t *testing.T) { } } - -func TestIsValidPeer(t *testing.T) { - ring := ringDescriber{} - host := &HostInfo{ - rpcAddress: net.ParseIP("0.0.0.0"), - rack: "myRack", - hostId: "0", - dataCenter: "datacenter", - tokens: []string{"0", "1"}, - } - - if !ring.IsValidPeer(host) { - t.Errorf("expected %+v to be a valid peer", host) - } - - host.rack = "" - if ring.IsValidPeer(host) { - t.Errorf("expected %+v to NOT be a valid peer", host) - } -} - -func TestGetHosts(t *testing.T) { - cluster := createCluster() - session := createSessionFromCluster(cluster, t) - - hosts, partitioner, err := session.hostSource.GetHosts() - - assertTrue(t, "err == nil", err == nil) - assertTrue(t, "len(hosts) == 3", len(hosts) == 3) - assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0) - -} - -func TestGetHostsWithFilter(t *testing.T) { - filterHostIP := net.ParseIP("127.0.0.3") - cluster := createCluster() - - // Filter to remove one of the localhost nodes - cluster.HostFilter = HostFilterFunc(func(host *HostInfo) bool { - if host.ConnectAddress().Equal(filterHostIP) { - return false - } - return true - }) - session := createSessionFromCluster(cluster, t) - - hosts, partitioner, err := session.hostSource.GetHosts() - assertTrue(t, "err == nil", err == nil) - assertTrue(t, "len(hosts) == 2", len(hosts) == 2) - assertTrue(t, "len(partitioner) != 0", len(partitioner) != 0) - for _, host := range hosts { - if host.ConnectAddress().Equal(filterHostIP) { - t.Fatal(fmt.Sprintf("Did not expect to see '%q' in host list", filterHostIP)) - } - } -} diff --git a/policies.go b/policies.go index 6be8bb8a4..eb549a15a 100644 --- a/policies.go +++ b/policies.go @@ -102,7 +102,7 @@ func (c *cowHostList) remove(ip net.IP) bool { found := false newL := make([]*HostInfo, 0, size) for i := 0; i < len(l); i++ { - if !l[i].ConnectAddress().Equal(ip) { + if !l[i].Peer().Equal(ip) { newL = append(newL, l[i]) } else { found = true @@ -236,7 +236,7 @@ func (r *roundRobinHostPolicy) AddHost(host *HostInfo) { } func (r *roundRobinHostPolicy) RemoveHost(host *HostInfo) { - r.hosts.remove(host.ConnectAddress()) + r.hosts.remove(host.Peer()) } func (r *roundRobinHostPolicy) HostUp(host *HostInfo) { @@ -279,7 +279,7 @@ func (t *tokenAwareHostPolicy) AddHost(host *HostInfo) { } func (t *tokenAwareHostPolicy) RemoveHost(host *HostInfo) { - t.hosts.remove(host.ConnectAddress()) + t.hosts.remove(host.Peer()) t.fallback.RemoveHost(host) t.resetTokenRing() @@ -393,7 +393,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { hostMap := make(map[string]*HostInfo, len(hosts)) for i, host := range hosts { - ip := host.ConnectAddress().String() + ip := host.Peer().String() peers[i] = ip hostMap[ip] = host } @@ -405,7 +405,7 @@ func (r *hostPoolHostPolicy) SetHosts(hosts []*HostInfo) { } func (r *hostPoolHostPolicy) AddHost(host *HostInfo) { - ip := host.ConnectAddress().String() + ip := host.Peer().String() r.mu.Lock() defer r.mu.Unlock() @@ -426,7 +426,7 @@ func (r *hostPoolHostPolicy) AddHost(host *HostInfo) { } func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) { - ip := host.ConnectAddress().String() + ip := host.Peer().String() r.mu.Lock() defer r.mu.Unlock() @@ -438,7 +438,7 @@ func (r *hostPoolHostPolicy) RemoveHost(host *HostInfo) { delete(r.hostMap, ip) hosts := make([]string, 0, len(r.hostMap)) for _, host := range r.hostMap { - hosts = append(hosts, host.ConnectAddress().String()) + hosts = append(hosts, host.Peer().String()) } r.hp.SetHosts(hosts) @@ -492,7 +492,7 @@ func (host selectedHostPoolHost) Info() *HostInfo { } func (host selectedHostPoolHost) Mark(err error) { - ip := host.info.ConnectAddress().String() + ip := host.info.Peer().String() host.policy.mu.RLock() defer host.policy.mu.RUnlock() diff --git a/policies_test.go b/policies_test.go index ff43e9927..957e31270 100644 --- a/policies_test.go +++ b/policies_test.go @@ -17,8 +17,8 @@ func TestRoundRobinHostPolicy(t *testing.T) { policy := RoundRobinHostPolicy() hosts := [...]*HostInfo{ - {hostId: "0", connectAddress: net.IPv4(0, 0, 0, 1)}, - {hostId: "1", connectAddress: net.IPv4(0, 0, 0, 2)}, + {hostId: "0", peer: net.IPv4(0, 0, 0, 1)}, + {hostId: "1", peer: net.IPv4(0, 0, 0, 2)}, } for _, host := range hosts { @@ -68,10 +68,10 @@ func TestTokenAwareHostPolicy(t *testing.T) { // set the hosts hosts := [...]*HostInfo{ - {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}}, - {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}}, - {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}}, - {connectAddress: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}}, + {peer: net.IPv4(10, 0, 0, 1), tokens: []string{"00"}}, + {peer: net.IPv4(10, 0, 0, 2), tokens: []string{"25"}}, + {peer: net.IPv4(10, 0, 0, 3), tokens: []string{"50"}}, + {peer: net.IPv4(10, 0, 0, 4), tokens: []string{"75"}}, } for _, host := range hosts { policy.AddHost(host) @@ -79,13 +79,13 @@ func TestTokenAwareHostPolicy(t *testing.T) { // the token ring is not setup without the partitioner, but the fallback // should work - if actual := policy.Pick(nil)(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) { - t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress()) + if actual := policy.Pick(nil)(); !actual.Info().Peer().Equal(hosts[0].peer) { + t.Errorf("Expected peer 0 but was %s", actual.Info().Peer()) } query.RoutingKey([]byte("30")) - if actual := policy.Pick(query)(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) { - t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress()) + if actual := policy.Pick(query)(); !actual.Info().Peer().Equal(hosts[1].peer) { + t.Errorf("Expected peer 1 but was %s", actual.Info().Peer()) } policy.SetPartitioner("OrderedPartitioner") @@ -93,18 +93,18 @@ func TestTokenAwareHostPolicy(t *testing.T) { // now the token ring is configured query.RoutingKey([]byte("20")) iter = policy.Pick(query) - if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[1].ConnectAddress()) { - t.Errorf("Expected peer 1 but was %s", actual.Info().ConnectAddress()) + if actual := iter(); !actual.Info().Peer().Equal(hosts[1].peer) { + t.Errorf("Expected peer 1 but was %s", actual.Info().Peer()) } // rest are round robin - if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[2].ConnectAddress()) { - t.Errorf("Expected peer 2 but was %s", actual.Info().ConnectAddress()) + if actual := iter(); !actual.Info().Peer().Equal(hosts[2].peer) { + t.Errorf("Expected peer 2 but was %s", actual.Info().Peer()) } - if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[3].ConnectAddress()) { - t.Errorf("Expected peer 3 but was %s", actual.Info().ConnectAddress()) + if actual := iter(); !actual.Info().Peer().Equal(hosts[3].peer) { + t.Errorf("Expected peer 3 but was %s", actual.Info().Peer()) } - if actual := iter(); !actual.Info().ConnectAddress().Equal(hosts[0].ConnectAddress()) { - t.Errorf("Expected peer 0 but was %s", actual.Info().ConnectAddress()) + if actual := iter(); !actual.Info().Peer().Equal(hosts[0].peer) { + t.Errorf("Expected peer 0 but was %s", actual.Info().Peer()) } } @@ -113,8 +113,8 @@ func TestHostPoolHostPolicy(t *testing.T) { policy := HostPoolHostPolicy(hostpool.New(nil)) hosts := []*HostInfo{ - {hostId: "0", connectAddress: net.IPv4(10, 0, 0, 0)}, - {hostId: "1", connectAddress: net.IPv4(10, 0, 0, 1)}, + {hostId: "0", peer: net.IPv4(10, 0, 0, 0)}, + {hostId: "1", peer: net.IPv4(10, 0, 0, 1)}, } // Using set host to control the ordering of the hosts as calling "AddHost" iterates the map @@ -178,10 +178,10 @@ func TestTokenAwareNilHostInfo(t *testing.T) { policy := TokenAwareHostPolicy(RoundRobinHostPolicy()) hosts := [...]*HostInfo{ - {connectAddress: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}}, - {connectAddress: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}}, - {connectAddress: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}}, - {connectAddress: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}}, + {peer: net.IPv4(10, 0, 0, 0), tokens: []string{"00"}}, + {peer: net.IPv4(10, 0, 0, 1), tokens: []string{"25"}}, + {peer: net.IPv4(10, 0, 0, 2), tokens: []string{"50"}}, + {peer: net.IPv4(10, 0, 0, 3), tokens: []string{"75"}}, } for _, host := range hosts { policy.AddHost(host) @@ -197,8 +197,8 @@ func TestTokenAwareNilHostInfo(t *testing.T) { t.Fatal("got nil host") } else if v := next.Info(); v == nil { t.Fatal("got nil HostInfo") - } else if !v.ConnectAddress().Equal(hosts[1].ConnectAddress()) { - t.Fatalf("expected peer 1 got %v", v.ConnectAddress()) + } else if !v.Peer().Equal(hosts[1].peer) { + t.Fatalf("expected peer 1 got %v", v.Peer()) } // Empty the hosts to trigger the panic when using the fallback. @@ -221,7 +221,7 @@ func TestCOWList_Add(t *testing.T) { toAdd := [...]net.IP{net.IPv4(0, 0, 0, 0), net.IPv4(1, 0, 0, 0), net.IPv4(2, 0, 0, 0)} for _, addr := range toAdd { - if !cow.add(&HostInfo{connectAddress: addr}) { + if !cow.add(&HostInfo{peer: addr}) { t.Fatal("did not add peer which was not in the set") } } @@ -233,7 +233,7 @@ func TestCOWList_Add(t *testing.T) { set := make(map[string]bool) for _, host := range hosts { - set[string(host.ConnectAddress())] = true + set[string(host.Peer())] = true } for _, addr := range toAdd { diff --git a/ring.go b/ring.go index 2789e9ebe..45ace957f 100644 --- a/ring.go +++ b/ring.go @@ -53,7 +53,7 @@ func (r *ring) allHosts() []*HostInfo { } func (r *ring) addHost(host *HostInfo) bool { - ip := host.ConnectAddress().String() + ip := host.Peer().String() r.mu.Lock() if r.hosts == nil { @@ -79,7 +79,7 @@ func (r *ring) addOrUpdate(host *HostInfo) *HostInfo { } func (r *ring) addHostIfMissing(host *HostInfo) (*HostInfo, bool) { - ip := host.ConnectAddress().String() + ip := host.Peer().String() r.mu.Lock() if r.hosts == nil { @@ -106,7 +106,7 @@ func (r *ring) removeHost(ip net.IP) bool { _, ok := r.hosts[k] if ok { for i, host := range r.hostList { - if host.ConnectAddress().Equal(ip) { + if host.Peer().Equal(ip) { r.hostList = append(r.hostList[:i], r.hostList[i+1:]...) break } diff --git a/ring_test.go b/ring_test.go index 88e593a32..feea8d2ca 100644 --- a/ring_test.go +++ b/ring_test.go @@ -8,7 +8,7 @@ import ( func TestRing_AddHostIfMissing_Missing(t *testing.T) { ring := &ring{} - host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)} + host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)} h1, ok := ring.addHostIfMissing(host) if ok { t.Fatal("host was reported as already existing") @@ -22,10 +22,10 @@ func TestRing_AddHostIfMissing_Missing(t *testing.T) { func TestRing_AddHostIfMissing_Existing(t *testing.T) { ring := &ring{} - host := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)} + host := &HostInfo{peer: net.IPv4(1, 1, 1, 1)} ring.addHostIfMissing(host) - h2 := &HostInfo{connectAddress: net.IPv4(1, 1, 1, 1)} + h2 := &HostInfo{peer: net.IPv4(1, 1, 1, 1)} h1, ok := ring.addHostIfMissing(h2) if !ok { diff --git a/session.go b/session.go index e49353bf6..503d6c2e4 100644 --- a/session.go +++ b/session.go @@ -111,7 +111,8 @@ func NewSession(cfg ClusterConfig) (*Session, error) { s.routingKeyInfoCache.lru = lru.New(cfg.MaxRoutingKeyInfo) s.hostSource = &ringDescriber{ - session: s, + session: s, + closeChan: make(chan bool), } if cfg.PoolConfig.HostSelectionPolicy == nil { @@ -173,19 +174,26 @@ func (s *Session) init() error { return err } + // need to setup host source to check for broadcast_address in system.local + localHasRPCAddr, _ := checkSystemLocal(s.control) + s.hostSource.localHasRpcAddr = localHasRPCAddr + if !s.cfg.DisableInitialHostLookup { - var partitioner string - hosts, partitioner, err = s.hostSource.GetHosts() + // TODO(zariel): we need to get the partitioner from here + var p string + hosts, p, err = s.hostSource.GetHosts() if err != nil { return err } - s.policy.SetPartitioner(partitioner) + s.policy.SetPartitioner(p) } } for _, host := range hosts { - host = s.ring.addOrUpdate(host) - s.handleNodeUp(host.ConnectAddress(), host.Port(), false) + if s.cfg.HostFilter == nil || s.cfg.HostFilter.Accept(host) { + host = s.ring.addOrUpdate(host) + s.handleNodeUp(host.Peer(), host.Port(), false) + } } // TODO(zariel): we probably dont need this any more as we verify that we @@ -226,7 +234,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { if gocqlDebug { buf := bytes.NewBufferString("Session.ring:") for _, h := range hosts { - buf.WriteString("[" + h.ConnectAddress().String() + ":" + h.State().String() + "]") + buf.WriteString("[" + h.Peer().String() + ":" + h.State().String() + "]") } Logger.Println(buf.String()) } @@ -235,7 +243,7 @@ func (s *Session) reconnectDownedHosts(intv time.Duration) { if h.IsUp() { continue } - s.handleNodeUp(h.ConnectAddress(), h.Port(), true) + s.handleNodeUp(h.Peer(), h.Port(), true) } case <-s.quit: return @@ -336,6 +344,10 @@ func (s *Session) Close() { s.pool.Close() } + if s.hostSource != nil { + close(s.hostSource.closeChan) + } + if s.control != nil { s.control.close() } diff --git a/session_connect_test.go b/session_connect_test.go index eecbf803e..5b1629506 100644 --- a/session_connect_test.go +++ b/session_connect_test.go @@ -1,13 +1,12 @@ package gocql import ( + "golang.org/x/net/context" "net" "strconv" "sync" "testing" "time" - - "golang.org/x/net/context" ) type OneConnTestServer struct { @@ -104,8 +103,8 @@ func TestSession_connect_WithNoTranslator(t *testing.T) { go srvr.Serve() Connect(&HostInfo{ - connectAddress: srvr.Addr, - port: srvr.Port, + peer: srvr.Addr, + port: srvr.Port, }, session.connCfg, testConnErrorHandler(t), session) assertConnectionEventually(t, 500*time.Millisecond, srvr) @@ -124,8 +123,8 @@ func TestSession_connect_WithTranslator(t *testing.T) { // the provided address will be translated Connect(&HostInfo{ - connectAddress: net.ParseIP("10.10.10.10"), - port: 5432, + peer: net.ParseIP("10.10.10.10"), + port: 5432, }, session.connCfg, testConnErrorHandler(t), session) assertConnectionEventually(t, 500*time.Millisecond, srvr) diff --git a/token.go b/token.go index edc37302b..1113b9276 100644 --- a/token.go +++ b/token.go @@ -184,7 +184,7 @@ func (t *tokenRing) String() string { buf.WriteString("]") buf.WriteString(t.tokens[i].String()) buf.WriteString(":") - buf.WriteString(t.hosts[i].ConnectAddress().String()) + buf.WriteString(t.hosts[i].Peer().String()) } buf.WriteString("\n}") return string(buf.Bytes()) diff --git a/token_test.go b/token_test.go index b71ff74cc..9d78daa85 100644 --- a/token_test.go +++ b/token_test.go @@ -232,8 +232,8 @@ func hostsForTests(n int) []*HostInfo { hosts := make([]*HostInfo, n) for i := 0; i < n; i++ { host := &HostInfo{ - connectAddress: net.IPv4(1, 1, 1, byte(n)), - tokens: []string{fmt.Sprintf("%d", n)}, + peer: net.IPv4(1, 1, 1, byte(n)), + tokens: []string{fmt.Sprintf("%d", n)}, } hosts[i] = host @@ -254,20 +254,19 @@ func TestMurmur3TokenRing(t *testing.T) { for _, host := range hosts { actual := ring.GetHostForToken(p.ParseString(host.tokens[0])) - if !actual.ConnectAddress().Equal(host.ConnectAddress()) { - t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(), - host.tokens[0], actual.ConnectAddress()) + if !actual.Peer().Equal(host.peer) { + t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer) } } actual := ring.GetHostForToken(p.ParseString("12")) - if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) { - t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress()) + if !actual.Peer().Equal(hosts[1].peer) { + t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer()) } actual = ring.GetHostForToken(p.ParseString("24324545443332")) - if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) { - t.Errorf("Expected address 0 for token \"24324545443332\", but was %s", actual.ConnectAddress()) + if !actual.Peer().Equal(hosts[0].peer) { + t.Errorf("Expected peer 0 for token \"24324545443332\", but was %s", actual.Peer()) } } @@ -286,20 +285,19 @@ func TestOrderedTokenRing(t *testing.T) { var actual *HostInfo for _, host := range hosts { actual = ring.GetHostForToken(p.ParseString(host.tokens[0])) - if !actual.ConnectAddress().Equal(host.ConnectAddress()) { - t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(), - host.tokens[0], actual.ConnectAddress()) + if !actual.Peer().Equal(host.peer) { + t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer) } } actual = ring.GetHostForToken(p.ParseString("12")) if !actual.peer.Equal(hosts[1].peer) { - t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress()) + t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer()) } actual = ring.GetHostForToken(p.ParseString("24324545443332")) - if !actual.ConnectAddress().Equal(hosts[1].ConnectAddress()) { - t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress()) + if !actual.peer.Equal(hosts[1].peer) { + t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer()) } } @@ -317,19 +315,18 @@ func TestRandomTokenRing(t *testing.T) { var actual *HostInfo for _, host := range hosts { actual = ring.GetHostForToken(p.ParseString(host.tokens[0])) - if !actual.ConnectAddress().Equal(host.ConnectAddress()) { - t.Errorf("Expected address %v for token %q, but was %v", host.ConnectAddress(), - host.tokens[0], actual.ConnectAddress()) + if !actual.Peer().Equal(host.peer) { + t.Errorf("Expected peer %v for token %q, but was %v", host.peer, host.tokens[0], actual.peer) } } actual = ring.GetHostForToken(p.ParseString("12")) if !actual.peer.Equal(hosts[1].peer) { - t.Errorf("Expected address 1 for token \"12\", but was %s", actual.ConnectAddress()) + t.Errorf("Expected peer 1 for token \"12\", but was %s", actual.Peer()) } actual = ring.GetHostForToken(p.ParseString("24324545443332")) - if !actual.ConnectAddress().Equal(hosts[0].ConnectAddress()) { - t.Errorf("Expected address 1 for token \"24324545443332\", but was %s", actual.ConnectAddress()) + if !actual.peer.Equal(hosts[0].peer) { + t.Errorf("Expected peer 1 for token \"24324545443332\", but was %s", actual.Peer()) } }