Skip to content

Commit

Permalink
GetHosts() is now more C* version tolerant
Browse files Browse the repository at this point in the history
  • Loading branch information
thrawn01 committed Feb 14, 2017
1 parent 9d95e30 commit 58dbbb6
Show file tree
Hide file tree
Showing 19 changed files with 537 additions and 267 deletions.
8 changes: 4 additions & 4 deletions cassandra_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func TestRingDiscovery(t *testing.T) {

if *clusterSize != size {
for p, pool := range session.pool.hostConnPools {
t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.Peer().String())
t.Logf("p=%q host=%v ips=%s", p, pool.host, pool.host.ConnectAddress().String())

}
t.Errorf("Expected a cluster size of %d, but actual size was %d", *clusterSize, size)
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestReconnection(t *testing.T) {
defer session.Close()

h := session.ring.allHosts()[0]
session.handleNodeDown(h.Peer(), h.Port())
session.handleNodeDown(h.ConnectAddress(), h.Port())

if h.State() != NodeDown {
t.Fatal("Host should be NodeDown but not.")
Expand Down Expand Up @@ -2369,8 +2369,8 @@ func TestDiscoverViaProxy(t *testing.T) {
session := createSessionFromCluster(cluster, t)
defer session.Close()

if !session.hostSource.localHasRpcAddr {
t.Skip("Target cluster does not have rpc_address in system.local.")
if session.hostSource.localHost.BroadcastAddress() == nil {
t.Skip("Target cluster does not have broadcast_address in system.local.")
goto close
}

Expand Down
6 changes: 3 additions & 3 deletions common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import (
"flag"
"fmt"
"log"
"net"
"strings"
"sync"
"testing"
"time"
"net"
)

var (
Expand Down Expand Up @@ -154,9 +154,9 @@ func createTestSession() *Session {
config.IgnorePeerAddr = true
config.PoolConfig.HostSelectionPolicy = RoundRobinHostPolicy()
session := &Session{
cfg: *config,
cfg: *config,
connCfg: &ConnConfig{
Timeout: 10*time.Millisecond,
Timeout: 10 * time.Millisecond,
Keepalive: 0,
},
policy: config.PoolConfig.HostSelectionPolicy,
Expand Down
6 changes: 3 additions & 3 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
// TODO(zariel): remove these
if host == nil {
panic("host is nil")
} else if len(host.Peer()) == 0 {
panic("host missing peer ip address")
} else if len(host.ConnectAddress()) == 0 {
panic("host missing connect ip address")
} else if host.Port() == 0 {
panic("host missing port")
}
Expand All @@ -172,7 +172,7 @@ func Connect(host *HostInfo, cfg *ConnConfig, errorHandler ConnErrorHandler, ses
}

// TODO(zariel): handle ipv6 zone
translatedPeer, translatedPort := session.cfg.translateAddressPort(host.Peer(), host.Port())
translatedPeer, translatedPort := session.cfg.translateAddressPort(host.ConnectAddress(), host.Port())
addr := (&net.TCPAddr{IP: translatedPeer, Port: translatedPort}).String()
//addr := (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String()

Expand Down
14 changes: 7 additions & 7 deletions connectionpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
// don't create a connection pool for a down host
continue
}
ip := host.Peer().String()
ip := host.ConnectAddress().String()
if _, exists := p.hostConnPools[ip]; exists {
// still have this host, so don't remove it
delete(toRemove, ip)
Expand All @@ -154,7 +154,7 @@ func (p *policyConnPool) SetHosts(hosts []*HostInfo) {
createCount--
if pool.Size() > 0 {
// add pool onyl if there a connections available
p.hostConnPools[string(pool.host.Peer())] = pool
p.hostConnPools[string(pool.host.ConnectAddress())] = pool
}
}

Expand All @@ -177,7 +177,7 @@ func (p *policyConnPool) Size() int {
}

func (p *policyConnPool) getPool(host *HostInfo) (pool *hostConnPool, ok bool) {
ip := host.Peer().String()
ip := host.ConnectAddress().String()
p.mu.RLock()
pool, ok = p.hostConnPools[ip]
p.mu.RUnlock()
Expand All @@ -196,7 +196,7 @@ func (p *policyConnPool) Close() {
}

func (p *policyConnPool) addHost(host *HostInfo) {
ip := host.Peer().String()
ip := host.ConnectAddress().String()
p.mu.Lock()
pool, ok := p.hostConnPools[ip]
if !ok {
Expand Down Expand Up @@ -274,7 +274,7 @@ func newHostConnPool(session *Session, host *HostInfo, port, size int,
session: session,
host: host,
port: port,
addr: (&net.TCPAddr{IP: host.Peer(), Port: host.Port()}).String(),
addr: (&net.TCPAddr{IP: host.ConnectAddress(), Port: host.Port()}).String(),
size: size,
keyspace: keyspace,
conns: make([]*Conn, 0, size),
Expand Down Expand Up @@ -398,7 +398,7 @@ func (pool *hostConnPool) fill() {

// this is call with the connection pool mutex held, this call will
// then recursively try to lock it again. FIXME
go pool.session.handleNodeDown(pool.host.Peer(), pool.port)
go pool.session.handleNodeDown(pool.host.ConnectAddress(), pool.port)
return
}

Expand All @@ -420,7 +420,7 @@ func (pool *hostConnPool) logConnectErr(err error) {
// connection refused
// these are typical during a node outage so avoid log spam.
if gocqlDebug {
Logger.Printf("unable to dial %q: %v\n", pool.host.Peer(), err)
Logger.Printf("unable to dial %q: %v\n", pool.host.ConnectAddress(), err)
}
} else if err != nil {
// unexpected error
Expand Down
47 changes: 4 additions & 43 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func hostInfo(addr string, defaultPort int) (*HostInfo, error) {

}

return &HostInfo{peer: ip, port: port}, nil
return &HostInfo{connectAddress: ip, port: port}, nil
}

func shuffleHosts(hosts []*HostInfo) []*HostInfo {
Expand Down Expand Up @@ -163,7 +163,7 @@ func (c *controlConn) shuffleDial(endpoints []*HostInfo) (*Conn, error) {
return conn, nil
}

Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.Peer(), err)
Logger.Printf("gocql: unable to dial control conn %v: %v\n", host.ConnectAddress(), err)
}

return nil, err
Expand Down Expand Up @@ -316,7 +316,7 @@ func (c *controlConn) reconnect(refreshring bool) {
if err != nil {
// host is dead
// TODO: this is replicated in a few places
c.session.handleNodeDown(host.Peer(), host.Port())
c.session.handleNodeDown(host.ConnectAddress(), host.Port())
} else {
newConn = conn
}
Expand Down Expand Up @@ -422,52 +422,13 @@ func (c *controlConn) query(statement string, values ...interface{}) (iter *Iter
return
}

func (c *controlConn) fetchHostInfo(ip net.IP, port int) (*HostInfo, error) {
// TODO(zariel): we should probably move this into host_source or atleast
// share code with it.
localHost := c.host()
if localHost == nil {
return nil, errors.New("unable to fetch host info, invalid conn host")
}

isLocal := localHost.Peer().Equal(ip)

var fn func(*HostInfo) error

// TODO(zariel): fetch preferred_ip address (is it >3.x only?)
if isLocal {
fn = func(host *HostInfo) error {
iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.local WHERE key='local'")
iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
return iter.Close()
}
} else {
fn = func(host *HostInfo) error {
iter := c.query("SELECT data_center, rack, host_id, tokens, release_version FROM system.peers WHERE peer=?", ip)
iter.Scan(&host.dataCenter, &host.rack, &host.hostId, &host.tokens, &host.version)
return iter.Close()
}
}

host := &HostInfo{
port: port,
peer: ip,
}

if err := fn(host); err != nil {
return nil, err
}

return host, nil
}

func (c *controlConn) awaitSchemaAgreement() error {
return c.withConn(func(conn *Conn) *Iter {
return &Iter{err: conn.awaitSchemaAgreement()}
}).err
}

func (c *controlConn) host() *HostInfo {
func (c *controlConn) GetHostInfo() *HostInfo {
conn := c.conn.Load().(*Conn)
if conn == nil {
return nil
Expand Down
4 changes: 2 additions & 2 deletions control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func TestHostInfo_Lookup(t *testing.T) {
continue
}

if !host.peer.Equal(test.ip) {
t.Errorf("expected ip %v got %v for addr %q", test.ip, host.peer, test.addr)
if !host.ConnectAddress().Equal(test.ip) {
t.Errorf("expected ip %v got %v for addr %q", test.ip, host.ConnectAddress(), test.addr)
}
}
}
Expand Down
30 changes: 11 additions & 19 deletions events.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,15 @@ func (s *Session) handleNodeEvent(frames []frame) {
}

func (s *Session) handleNewNode(ip net.IP, port int, waitForBinary bool) {
var hostInfo *HostInfo
if s.control != nil && !s.cfg.IgnorePeerAddr {
var err error
hostInfo, err = s.control.fetchHostInfo(ip, port)
if err != nil {
Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err)
return
}
} else {
hostInfo = &HostInfo{peer: ip, port: port}
}

if s.cfg.IgnorePeerAddr && hostInfo.Peer().Equal(ip) {
hostInfo.setPeer(ip)
// Get host info and apply any filters to the host
hostInfo, err := s.hostSource.GetHostInfo(ip, port)
if err != nil {
Logger.Printf("gocql: events: unable to fetch host info for (%s:%d): %v\n", ip, port, err)
return
}

if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(hostInfo) {
// If hostInfo is nil, this host was filtered out by cfg.HostFilter
if hostInfo == nil {
return
}

Expand All @@ -213,7 +205,7 @@ func (s *Session) handleRemovedNode(ip net.IP, port int) {
// we remove all nodes but only add ones which pass the filter
host := s.ring.getHost(ip)
if host == nil {
host = &HostInfo{peer: ip, port: port}
host = &HostInfo{connectAddress: ip, port: port}
}

if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
Expand All @@ -237,9 +229,9 @@ func (s *Session) handleNodeUp(ip net.IP, port int, waitForBinary bool) {

host := s.ring.getHost(ip)
if host != nil {
if s.cfg.IgnorePeerAddr && host.Peer().Equal(ip) {
if s.cfg.IgnorePeerAddr && host.ConnectAddress().Equal(ip) {
// TODO: how can this ever be true?
host.setPeer(ip)
host.SetConnectAddress(ip)
}

if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
Expand All @@ -266,7 +258,7 @@ func (s *Session) handleNodeDown(ip net.IP, port int) {

host := s.ring.getHost(ip)
if host == nil {
host = &HostInfo{peer: ip, port: port}
host = &HostInfo{connectAddress: ip, port: port}
}

if s.cfg.HostFilter != nil && !s.cfg.HostFilter.Accept(host) {
Expand Down
4 changes: 2 additions & 2 deletions filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func WhiteListHostFilter(hosts ...string) HostFilter {

m := make(map[string]bool, len(hostInfos))
for _, host := range hostInfos {
m[string(host.peer)] = true
m[string(host.ConnectAddress())] = true
}

return HostFilterFunc(func(host *HostInfo) bool {
return m[string(host.Peer())]
return m[string(host.ConnectAddress())]
})
}
6 changes: 3 additions & 3 deletions filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestFilter_WhiteList(t *testing.T) {
}

for i, test := range tests {
if f.Accept(&HostInfo{peer: test.addr}) {
if f.Accept(&HostInfo{connectAddress: test.addr}) {
if !test.accept {
t.Errorf("%d: should not have been accepted but was", i)
}
Expand All @@ -39,7 +39,7 @@ func TestFilter_AllowAll(t *testing.T) {
}

for i, test := range tests {
if f.Accept(&HostInfo{peer: test.addr}) {
if f.Accept(&HostInfo{connectAddress: test.addr}) {
if !test.accept {
t.Errorf("%d: should not have been accepted but was", i)
}
Expand All @@ -61,7 +61,7 @@ func TestFilter_DenyAll(t *testing.T) {
}

for i, test := range tests {
if f.Accept(&HostInfo{peer: test.addr}) {
if f.Accept(&HostInfo{connectAddress: test.addr}) {
if !test.accept {
t.Errorf("%d: should not have been accepted but was", i)
}
Expand Down
Loading

0 comments on commit 58dbbb6

Please sign in to comment.