Skip to content

Commit

Permalink
Don't panic if cluster state is nil.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Mar 4, 2017
1 parent e90826e commit 15998ef
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 34 deletions.
93 changes: 60 additions & 33 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
var errNilClusterState = internal.RedisError("redis: cannot load cluster slots")

// ClusterOptions are used to configure a cluster client and should be
// passed to NewClusterClient.
Expand Down Expand Up @@ -355,7 +356,14 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
_, _ = c.nodes.Get(addr)
}

c.reloadSlots()
// Preload cluster slots.
for i := 0; i < 10; i++ {
state, err := c.reloadSlots()
if err == nil {
c._state.Store(state)
break
}
}

if opt.IdleCheckFrequency > 0 {
go c.reaper(opt.IdleCheckFrequency)
Expand All @@ -366,10 +374,11 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {

func (c *ClusterClient) state() *clusterState {
v := c._state.Load()
if v == nil {
return nil
if v != nil {
return v.(*clusterState)
}
return v.(*clusterState)
c.lazyReloadSlots()
return nil
}

func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *clusterNode, error) {
Expand Down Expand Up @@ -397,10 +406,12 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
}

func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
state := c.state()

var node *clusterNode
var err error
if len(keys) > 0 {
node, err = c.state().slotMasterNode(hashtag.Slot(keys[0]))
if state != nil && len(keys) > 0 {
node, err = state.slotMasterNode(hashtag.Slot(keys[0]))
} else {
node, err = c.nodes.Random()
}
Expand Down Expand Up @@ -463,8 +474,9 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var addr string
moved, ask, addr = internal.IsMovedError(err)
if moved || ask {
if slot >= 0 {
master, _ := c.state().slotMasterNode(slot)
state := c.state()
if state != nil && slot >= 0 {
master, _ := state.slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) {
c.lazyReloadSlots()
}
Expand Down Expand Up @@ -523,7 +535,7 @@ func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return nil
return errNilClusterState
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -564,12 +576,13 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {

// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats

nodes, err := c.nodes.All()
if err != nil {
return nil
return &acc
}

var acc PoolStats
for _, node := range nodes {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
Expand All @@ -585,37 +598,46 @@ func (c *ClusterClient) lazyReloadSlots() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}

go func() {
c.reloadSlots()
for i := 0; i < 1000; i++ {
state, err := c.reloadSlots()
if err == pool.ErrClosed {
break
}
if err == nil {
c._state.Store(state)
break
}
time.Sleep(time.Millisecond)
}

time.Sleep(3 * time.Second)
atomic.StoreUint32(&c.reloading, 0)
}()
}

func (c *ClusterClient) reloadSlots() {
for i := 0; i < 10; i++ {
node, err := c.nodes.Random()
if err != nil {
return
}

if c.cmds == nil {
cmds, err := node.Client.Command().Result()
if err == nil {
c.cmds = cmds
}
}
func (c *ClusterClient) reloadSlots() (*clusterState, error) {
node, err := c.nodes.Random()
if err != nil {
return nil, err
}

slots, err := node.Client.ClusterSlots().Result()
// TODO: fix race
if c.cmds == nil {
cmds, err := node.Client.Command().Result()
if err != nil {
continue
return nil, err
}
c.cmds = cmds
}

state, err := newClusterState(c.nodes, slots)
if err != nil {
return
}
c._state.Store(state)
slots, err := node.Client.ClusterSlots().Result()
if err != nil {
return nil, err
}

return newClusterState(c.nodes, slots)
}

// reaper closes idle connections to the cluster.
Expand Down Expand Up @@ -789,8 +811,13 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
return err
}

state := c.state()
if state == nil {
return errNilClusterState
}

for slot, cmds := range cmdsMap {
node, err := c.state().slotMasterNode(slot)
node, err := state.slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
Expand Down
2 changes: 1 addition & 1 deletion cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ var _ = Describe("ClusterClient timeout", func() {
var client *redis.ClusterClient

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
_ = client.Close()
})

testTimeout := func() {
Expand Down

0 comments on commit 15998ef

Please sign in to comment.