Skip to content

Commit

Permalink
Set read/write timeouts more consistently.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Dec 3, 2016
1 parent e7f23a3 commit b4efc45
Show file tree
Hide file tree
Showing 18 changed files with 340 additions and 195 deletions.
44 changes: 27 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,13 @@ func (c *ClusterClient) cmdSlotAndNode(state *clusterState, cmd Cmder) (int, *cl
}

func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
node, err := c.state().slotMasterNode(hashtag.Slot(keys[0]))
var node *clusterNode
var err error
if len(keys) > 0 {
node, err = c.state().slotMasterNode(hashtag.Slot(keys[0]))
} else {
node, err = c.nodes.Random()
}
if err != nil {
return err
}
Expand Down Expand Up @@ -612,10 +618,10 @@ func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
}

func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
var retErr error
setRetErr := func(err error) {
if retErr == nil {
retErr = err
var firstErr error
setFirstErr := func(err error) {
if firstErr == nil {
firstErr = err
}
}

Expand All @@ -625,7 +631,7 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
_, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
setRetErr(err)
setFirstErr(err)
continue
}
cmdsMap[node] = append(cmdsMap[node], cmd)
Expand All @@ -638,38 +644,42 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
cn, _, err := node.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
setRetErr(err)
setFirstErr(err)
continue
}

failedCmds, err = c.execClusterCmds(cn, cmds, failedCmds)
if err != nil {
setRetErr(err)
setFirstErr(err)
}
node.Client.putConn(cn, err, false)
}

cmdsMap = failedCmds
}

return retErr
return firstErr
}

func (c *ClusterClient) execClusterCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) (map[*clusterNode][]Cmder, error) {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
return failedCmds, err
}

var retErr error
setRetErr := func(err error) {
if retErr == nil {
retErr = err
var firstErr error
setFirstErr := func(err error) {
if firstErr == nil {
firstErr = err
}
}

// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)

for i, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
Expand All @@ -688,7 +698,7 @@ func (c *ClusterClient) execClusterCmds(

node, err := c.nodes.Get(addr)
if err != nil {
setRetErr(err)
setFirstErr(err)
continue
}

Expand All @@ -697,16 +707,16 @@ func (c *ClusterClient) execClusterCmds(
} else if ask {
node, err := c.nodes.Get(addr)
if err != nil {
setRetErr(err)
setFirstErr(err)
continue
}

cmd.reset()
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
} else {
setRetErr(err)
setFirstErr(err)
}
}

return failedCmds, retErr
return failedCmds, firstErr
}
121 changes: 100 additions & 21 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,45 +483,124 @@ var _ = Describe("ClusterClient", func() {

describeClusterClient()
})
})

Describe("ClusterClient without nodes", func() {
BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{})
var _ = Describe("ClusterClient without nodes", func() {
var client *redis.ClusterClient

BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{})
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
})

It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
})

var _ = Describe("ClusterClient without valid nodes", func() {
var client *redis.ClusterClient

BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{redisAddr},
})
})

AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})

It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
})

var _ = Describe("ClusterClient timeout", func() {
var client *redis.ClusterClient

It("returns an error", func() {
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

testTimeout := func() {
It("Ping timeouts", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})

It("pipeline returns an error", func() {
It("Pipeline timeouts", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("redis: cluster has no nodes"))
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
})

Describe("ClusterClient without valid nodes", func() {
BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{redisAddr},
It("Tx timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
return tx.Ping().Err()
})
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})

It("returns an error", func() {
err := client.Ping().Err()
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
pipe.Ping()
return nil
})
return err
})
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
}

It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
pipe.Ping()
return nil
})
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
Context("read timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = -1
client = cluster.clusterClient(opt)
})

testTimeout()
})

Context("write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = -1
client = cluster.clusterClient(opt)
})

testTimeout()
})
})

Expand Down
2 changes: 1 addition & 1 deletion command.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
}
}

_, err := cn.Write(cn.Wb.Bytes())
_, err := cn.NetConn.Write(cn.Wb.Bytes())
return err
}

Expand Down
30 changes: 10 additions & 20 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ type Conn struct {

Inited bool
UsedAt time.Time

ReadTimeout time.Duration
WriteTimeout time.Duration
}

func NewConn(netConn net.Conn) *Conn {
Expand All @@ -30,36 +27,29 @@ func NewConn(netConn net.Conn) *Conn {

UsedAt: time.Now(),
}
cn.Rd = proto.NewReader(cn)
cn.Rd = proto.NewReader(cn.NetConn)
return cn
}

func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt) > timeout
}

func (cn *Conn) Read(b []byte) (int, error) {
func (cn *Conn) SetReadTimeout(timeout time.Duration) error {
cn.UsedAt = time.Now()
if cn.ReadTimeout != 0 {
cn.NetConn.SetReadDeadline(cn.UsedAt.Add(cn.ReadTimeout))
} else {
cn.NetConn.SetReadDeadline(noDeadline)
if timeout > 0 {
return cn.NetConn.SetReadDeadline(cn.UsedAt.Add(timeout))
}
return cn.NetConn.Read(b)
return cn.NetConn.SetReadDeadline(noDeadline)

}

func (cn *Conn) Write(b []byte) (int, error) {
func (cn *Conn) SetWriteTimeout(timeout time.Duration) error {
cn.UsedAt = time.Now()
if cn.WriteTimeout != 0 {
cn.NetConn.SetWriteDeadline(cn.UsedAt.Add(cn.WriteTimeout))
} else {
cn.NetConn.SetWriteDeadline(noDeadline)
if timeout > 0 {
return cn.NetConn.SetWriteDeadline(cn.UsedAt.Add(timeout))
}
return cn.NetConn.Write(b)
}

func (cn *Conn) RemoteAddr() net.Addr {
return cn.NetConn.RemoteAddr()
return cn.NetConn.SetWriteDeadline(noDeadline)
}

func (cn *Conn) Close() error {
Expand Down
10 changes: 5 additions & 5 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,19 +266,19 @@ func (p *ConnPool) Closed() bool {
return atomic.LoadInt32(&p._closed) == 1
}

func (p *ConnPool) Close() (retErr error) {
func (p *ConnPool) Close() error {
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
return ErrClosed
}

p.connsMu.Lock()
// Close all connections.
var firstErr error
for _, cn := range p.conns {
if cn == nil {
continue
}
if err := p.closeConn(cn, ErrClosed); err != nil && retErr == nil {
retErr = err
if err := p.closeConn(cn, ErrClosed); err != nil && firstErr == nil {
firstErr = err
}
}
p.conns = nil
Expand All @@ -288,7 +288,7 @@ func (p *ConnPool) Close() (retErr error) {
p.freeConns = nil
p.freeConnsMu.Unlock()

return retErr
return firstErr
}

func (p *ConnPool) closeConn(cn *Conn, reason error) error {
Expand Down
Loading

0 comments on commit b4efc45

Please sign in to comment.