Skip to content

Commit

Permalink
Merge pull request redis#173 from go-redis/fix/pool-panic-on-retry
Browse files Browse the repository at this point in the history
Fix pool panic on slow connection with MaxRetries > 0.
  • Loading branch information
vmihailenco committed Oct 13, 2015
2 parents 8ca66a5 + 2516433 commit 0880b0b
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 39 deletions.
7 changes: 2 additions & 5 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,8 @@ func (cn *conn) init(opt *Options) error {
return nil
}

// Use connection to connect to Redis.
pool := newSingleConnPoolConn(cn)

// Client is not closed because we want to reuse underlying connection.
client := newClient(opt, pool)
// Temp client for Auth and Select.
client := newClient(opt, newSingleConnPool(cn))

if opt.Password != "" {
if err := client.Auth(opt.Password).Err(); err != nil {
Expand Down
25 changes: 25 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package redis_test

import (
"net"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

"gopkg.in/redis.v3"
)

var _ = Describe("newConnDialer with bad connection", func() {
It("should return an error", func() {
dialer := redis.NewConnDialer(&redis.Options{
Dialer: func() (net.Conn, error) {
return &badConn{}, nil
},
MaxRetries: 3,
Password: "password",
DB: 1,
})
_, err := dialer()
Expect(err).To(MatchError("bad connection"))
})
})
2 changes: 2 additions & 0 deletions export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ func (c *baseClient) Pool() pool {
return c.connPool
}

var NewConnDialer = newConnDialer

func (cn *conn) SetNetConn(netcn net.Conn) {
cn.netcn = netcn
}
Expand Down
14 changes: 11 additions & 3 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,15 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {

//------------------------------------------------------------------------------

var errTimeout = syscall.ETIMEDOUT
var (
errTimeout = syscall.ETIMEDOUT
)

type badConnError string

func (e badConnError) Error() string { return string(e) }
func (e badConnError) Timeout() bool { return false }
func (e badConnError) Temporary() bool { return false }

type badConn struct {
net.TCPConn
Expand All @@ -250,7 +258,7 @@ func (cn *badConn) Read([]byte) (int, error) {
if cn.readErr != nil {
return 0, cn.readErr
}
return 0, net.UnknownNetworkError("badConn")
return 0, badConnError("bad connection")
}

func (cn *badConn) Write([]byte) (int, error) {
Expand All @@ -260,5 +268,5 @@ func (cn *badConn) Write([]byte) (int, error) {
if cn.writeErr != nil {
return 0, cn.writeErr
}
return 0, net.UnknownNetworkError("badConn")
return 0, badConnError("bad connection")
}
2 changes: 1 addition & 1 deletion multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (c *Client) Multi() *Multi {
multi := &Multi{
base: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, true),
connPool: newStickyConnPool(c.connPool, true),
},
}
multi.commandable.process = multi.process
Expand Down
92 changes: 66 additions & 26 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,52 @@ func (p *connPool) reaper() {
//------------------------------------------------------------------------------

type singleConnPool struct {
cn *conn
}

func newSingleConnPool(cn *conn) *singleConnPool {
return &singleConnPool{
cn: cn,
}
}

func (p *singleConnPool) First() *conn {
return p.cn
}

func (p *singleConnPool) Get() (*conn, error) {
return p.cn, nil
}

func (p *singleConnPool) Put(cn *conn) error {
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}

func (p *singleConnPool) Remove(cn *conn) error {
if p.cn != cn {
panic("p.cn != cn")
}
return nil
}

func (p *singleConnPool) Len() int {
return 1
}

func (p *singleConnPool) FreeLen() int {
return 0
}

func (p *singleConnPool) Close() error {
return nil
}

//------------------------------------------------------------------------------

type stickyConnPool struct {
pool pool
reusable bool

Expand All @@ -322,27 +368,21 @@ type singleConnPool struct {
mx sync.Mutex
}

func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
return &singleConnPool{
func newStickyConnPool(pool pool, reusable bool) *stickyConnPool {
return &stickyConnPool{
pool: pool,
reusable: reusable,
}
}

func newSingleConnPoolConn(cn *conn) *singleConnPool {
return &singleConnPool{
cn: cn,
}
}

func (p *singleConnPool) First() *conn {
func (p *stickyConnPool) First() *conn {
p.mx.Lock()
cn := p.cn
p.mx.Unlock()
return cn
}

func (p *singleConnPool) Get() (*conn, error) {
func (p *stickyConnPool) Get() (*conn, error) {
defer p.mx.Unlock()
p.mx.Lock()

Expand All @@ -362,15 +402,13 @@ func (p *singleConnPool) Get() (*conn, error) {
return p.cn, nil
}

func (p *singleConnPool) put() (err error) {
if p.pool != nil {
err = p.pool.Put(p.cn)
}
func (p *stickyConnPool) put() (err error) {
err = p.pool.Put(p.cn)
p.cn = nil
return err
}

func (p *singleConnPool) Put(cn *conn) error {
func (p *stickyConnPool) Put(cn *conn) error {
defer p.mx.Unlock()
p.mx.Lock()
if p.cn != cn {
Expand All @@ -382,30 +420,32 @@ func (p *singleConnPool) Put(cn *conn) error {
return nil
}

func (p *singleConnPool) remove() (err error) {
if p.pool != nil {
err = p.pool.Remove(p.cn)
}
func (p *stickyConnPool) remove() (err error) {
err = p.pool.Remove(p.cn)
p.cn = nil
return err
}

func (p *singleConnPool) Remove(cn *conn) error {
func (p *stickyConnPool) Remove(cn *conn) error {
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
panic("p.cn == nil")
}
if cn != nil && cn != p.cn {
panic("cn != p.cn")
if cn != nil && p.cn != cn {
panic("p.cn != cn")
}
if p.closed {
return errClosed
}
return p.remove()
if cn == nil {
return p.remove()
} else {
return nil
}
}

func (p *singleConnPool) Len() int {
func (p *stickyConnPool) Len() int {
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
Expand All @@ -414,7 +454,7 @@ func (p *singleConnPool) Len() int {
return 1
}

func (p *singleConnPool) FreeLen() int {
func (p *stickyConnPool) FreeLen() int {
defer p.mx.Unlock()
p.mx.Lock()
if p.cn == nil {
Expand All @@ -423,7 +463,7 @@ func (p *singleConnPool) FreeLen() int {
return 0
}

func (p *singleConnPool) Close() error {
func (p *stickyConnPool) Close() error {
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
Expand Down
2 changes: 1 addition & 1 deletion pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"gopkg.in/redis.v3"
)

var _ = Describe("Pool", func() {
var _ = Describe("pool", func() {
var client *redis.Client

var perform = func(n int, cb func()) {
Expand Down
2 changes: 1 addition & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (c *Client) PubSub() *PubSub {
return &PubSub{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, false),
connPool: newStickyConnPool(c.connPool, false),
},
}
}
Expand Down
3 changes: 2 additions & 1 deletion redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ var _ = Describe("Client", func() {
Expect(err).NotTo(HaveOccurred())

cn.SetNetConn(&badConn{})
Expect(client.Pool().Put(cn)).NotTo(HaveOccurred())
err = client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())

err = client.Ping().Err()
Expect(err).NotTo(HaveOccurred())
Expand Down
2 changes: 1 addition & 1 deletion sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c *sentinelClient) PubSub() *PubSub {
return &PubSub{
baseClient: &baseClient{
opt: c.opt,
connPool: newSingleConnPool(c.connPool, false),
connPool: newStickyConnPool(c.connPool, false),
},
}
}
Expand Down

0 comments on commit 0880b0b

Please sign in to comment.