Skip to content

Commit

Permalink
Optimize pool.Remove.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Mar 12, 2016
1 parent ad0739b commit fdd0fdf
Show file tree
Hide file tree
Showing 13 changed files with 126 additions and 128 deletions.
10 changes: 5 additions & 5 deletions bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,8 @@ func BenchmarkZAdd(b *testing.B) {
}

func benchmarkPoolGetPut(b *testing.B, poolSize int) {
dial := func() (*pool.Conn, error) {
return pool.NewConn(&net.TCPConn{}), nil
dial := func() (net.Conn, error) {
return &net.TCPConn{}, nil
}
pool := pool.NewConnPool(dial, poolSize, time.Second, 0)

Expand Down Expand Up @@ -311,8 +311,8 @@ func BenchmarkPoolGetPut1000Conns(b *testing.B) {
}

func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
dial := func() (*pool.Conn, error) {
return pool.NewConn(&net.TCPConn{}), nil
dial := func() (net.Conn, error) {
return &net.TCPConn{}, nil
}
pool := pool.NewConnPool(dial, poolSize, time.Second, 0)
removeReason := errors.New("benchmark")
Expand All @@ -325,7 +325,7 @@ func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
if err != nil {
b.Fatalf("no error expected on pool.Get but received: %s", err.Error())
}
if err = pool.Remove(conn, removeReason); err != nil {
if err = pool.Replace(conn, removeReason); err != nil {
b.Fatalf("no error expected on pool.Remove but received: %s", err.Error())
}
}
Expand Down
31 changes: 20 additions & 11 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ import (

const defaultBufSize = 4096

var noTimeout = time.Time{}
var noDeadline = time.Time{}

type Conn struct {
NetConn net.Conn
idx int

netConn net.Conn
Rd *bufio.Reader
Buf []byte

Expand All @@ -22,7 +24,9 @@ type Conn struct {

func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
NetConn: netConn,
idx: -1,

netConn: netConn,
Buf: make([]byte, defaultBufSize),

UsedAt: time.Now(),
Expand All @@ -31,30 +35,35 @@ func NewConn(netConn net.Conn) *Conn {
return cn
}

func (cn *Conn) SetNetConn(netConn net.Conn) {
cn.netConn = netConn
cn.UsedAt = time.Now()
}

func (cn *Conn) Read(b []byte) (int, error) {
cn.UsedAt = time.Now()
if cn.ReadTimeout != 0 {
cn.NetConn.SetReadDeadline(cn.UsedAt.Add(cn.ReadTimeout))
cn.netConn.SetReadDeadline(cn.UsedAt.Add(cn.ReadTimeout))
} else {
cn.NetConn.SetReadDeadline(noTimeout)
cn.netConn.SetReadDeadline(noDeadline)
}
return cn.NetConn.Read(b)
return cn.netConn.Read(b)
}

func (cn *Conn) Write(b []byte) (int, error) {
cn.UsedAt = time.Now()
if cn.WriteTimeout != 0 {
cn.NetConn.SetWriteDeadline(cn.UsedAt.Add(cn.WriteTimeout))
cn.netConn.SetWriteDeadline(cn.UsedAt.Add(cn.WriteTimeout))
} else {
cn.NetConn.SetWriteDeadline(noTimeout)
cn.netConn.SetWriteDeadline(noDeadline)
}
return cn.NetConn.Write(b)
return cn.netConn.Write(b)
}

func (cn *Conn) RemoteAddr() net.Addr {
return cn.NetConn.RemoteAddr()
return cn.netConn.RemoteAddr()
}

func (cn *Conn) Close() error {
return cn.NetConn.Close()
return cn.netConn.Close()
}
76 changes: 30 additions & 46 deletions internal/pool/conn_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ import (

type connList struct {
cns []*Conn
mx sync.Mutex
mu sync.Mutex
len int32 // atomic
size int32
}

func newConnList(size int) *connList {
return &connList{
cns: make([]*Conn, 0, size),
cns: make([]*Conn, size),
size: int32(size),
}
}
Expand All @@ -23,8 +23,8 @@ func (l *connList) Len() int {
return int(atomic.LoadInt32(&l.len))
}

// Reserve reserves place in the list and returns true on success. The
// caller must add or remove connection if place was reserved.
// Reserve reserves place in the list and returns true on success.
// The caller must add or remove connection if place was reserved.
func (l *connList) Reserve() bool {
len := atomic.AddInt32(&l.len, 1)
reserved := len <= l.size
Expand All @@ -36,65 +36,49 @@ func (l *connList) Reserve() bool {

// Add adds connection to the list. The caller must reserve place first.
func (l *connList) Add(cn *Conn) {
l.mx.Lock()
l.cns = append(l.cns, cn)
l.mx.Unlock()
l.mu.Lock()
for i, c := range l.cns {
if c == nil {
cn.idx = i
l.cns[i] = cn
l.mu.Unlock()
return
}
}
panic("not reached")
}

// Remove closes connection and removes it from the list.
func (l *connList) Remove(cn *Conn) error {
defer l.mx.Unlock()
l.mx.Lock()
atomic.AddInt32(&l.len, -1)

if cn == nil {
atomic.AddInt32(&l.len, -1)
if cn == nil { // free reserved place
return nil
}

for i, c := range l.cns {
if c == cn {
l.cns = append(l.cns[:i], l.cns[i+1:]...)
atomic.AddInt32(&l.len, -1)
return cn.Close()
}
l.mu.Lock()
if l.cns != nil {
l.cns[cn.idx] = nil
cn.idx = -1
}
l.mu.Unlock()

if l.closed() {
return nil
}
panic("conn not found in the list")
return nil
}

func (l *connList) Replace(cn, newcn *Conn) error {
defer l.mx.Unlock()
l.mx.Lock()

for i, c := range l.cns {
if c == cn {
l.cns[i] = newcn
return cn.Close()
}
}

if l.closed() {
return newcn.Close()
}
panic("conn not found in the list")
}

func (l *connList) Close() (retErr error) {
l.mx.Lock()
func (l *connList) Close() error {
var retErr error
l.mu.Lock()
for _, c := range l.cns {
if err := c.Close(); err != nil {
if c == nil {
continue
}
if err := c.Close(); err != nil && retErr == nil {
retErr = err
}
}
l.cns = nil
atomic.StoreInt32(&l.len, 0)
l.mx.Unlock()
l.mu.Unlock()
return retErr
}

func (l *connList) closed() bool {
return l.cns == nil
}
38 changes: 24 additions & 14 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"log"
"net"
"sync/atomic"
"time"

Expand Down Expand Up @@ -32,17 +33,17 @@ type Pooler interface {
First() *Conn
Get() (*Conn, bool, error)
Put(*Conn) error
Remove(*Conn, error) error
Replace(*Conn, error) error
Len() int
FreeLen() int
Close() error
Stats() *PoolStats
}

type dialer func() (*Conn, error)
type dialer func() (net.Conn, error)

type ConnPool struct {
dial dialer
_dial dialer

poolTimeout time.Duration
idleTimeout time.Duration
Expand All @@ -59,7 +60,7 @@ type ConnPool struct {

func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout time.Duration) *ConnPool {
p := &ConnPool{
dial: dial,
_dial: dial,

poolTimeout: poolTimeout,
idleTimeout: idleTimeout,
Expand Down Expand Up @@ -126,8 +127,7 @@ func (p *ConnPool) wait() *Conn {
panic("not reached")
}

// Establish a new connection
func (p *ConnPool) new() (*Conn, error) {
func (p *ConnPool) dial() (net.Conn, error) {
if p.rl.Limit() {
err := fmt.Errorf(
"redis: you open connections too fast (last_error=%q)",
Expand All @@ -136,15 +136,22 @@ func (p *ConnPool) new() (*Conn, error) {
return nil, err
}

cn, err := p.dial()
cn, err := p._dial()
if err != nil {
p.storeLastErr(err.Error())
return nil, err
}

return cn, nil
}

func (p *ConnPool) newConn() (*Conn, error) {
netConn, err := p.dial()
if err != nil {
return nil, err
}
return NewConn(netConn), nil
}

// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (cn *Conn, isNew bool, err error) {
if p.closed() {
Expand All @@ -164,7 +171,7 @@ func (p *ConnPool) Get() (cn *Conn, isNew bool, err error) {
if p.conns.Reserve() {
isNew = true

cn, err = p.new()
cn, err = p.newConn()
if err != nil {
p.conns.Remove(nil)
return
Expand All @@ -189,23 +196,26 @@ func (p *ConnPool) Put(cn *Conn) error {
b, _ := cn.Rd.Peek(cn.Rd.Buffered())
err := fmt.Errorf("connection has unread data: %q", b)
Logger.Print(err)
return p.Remove(cn, err)
return p.Replace(cn, err)
}
p.freeConns <- cn
return nil
}

func (p *ConnPool) replace(cn *Conn) (*Conn, error) {
newcn, err := p.new()
_ = cn.Close()

netConn, err := p.dial()
if err != nil {
_ = p.conns.Remove(cn)
return nil, err
}
_ = p.conns.Replace(cn, newcn)
return newcn, nil
cn.SetNetConn(netConn)

return cn, nil
}

func (p *ConnPool) Remove(cn *Conn, reason error) error {
func (p *ConnPool) Replace(cn *Conn, reason error) error {
p.storeLastErr(reason.Error())

// Replace existing connection with new one and unblock waiter.
Expand Down
2 changes: 1 addition & 1 deletion internal/pool/pool_single.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (p *SingleConnPool) Put(cn *Conn) error {
return nil
}

func (p *SingleConnPool) Remove(cn *Conn, _ error) error {
func (p *SingleConnPool) Replace(cn *Conn, _ error) error {
if p.cn != cn {
panic("p.cn != cn")
}
Expand Down
10 changes: 5 additions & 5 deletions internal/pool/pool_sticky.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ func (p *StickyConnPool) Put(cn *Conn) error {
return nil
}

func (p *StickyConnPool) remove(reason error) error {
err := p.pool.Remove(p.cn, reason)
func (p *StickyConnPool) replace(reason error) error {
err := p.pool.Replace(p.cn, reason)
p.cn = nil
return err
}

func (p *StickyConnPool) Remove(cn *Conn, reason error) error {
func (p *StickyConnPool) Replace(cn *Conn, reason error) error {
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
Expand All @@ -85,7 +85,7 @@ func (p *StickyConnPool) Remove(cn *Conn, reason error) error {
if cn != nil && p.cn != cn {
panic("p.cn != cn")
}
return p.remove(reason)
return p.replace(reason)
}

func (p *StickyConnPool) Len() int {
Expand Down Expand Up @@ -121,7 +121,7 @@ func (p *StickyConnPool) Close() error {
err = p.put()
} else {
reason := errors.New("redis: sticky not reusable connection")
err = p.remove(reason)
err = p.replace(reason)
}
}
return err
Expand Down
4 changes: 2 additions & 2 deletions multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ var _ = Describe("Multi", func() {
cn, _, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred())

cn.NetConn = &badConn{}
cn.SetNetConn(&badConn{})
err = client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -172,7 +172,7 @@ var _ = Describe("Multi", func() {
cn, _, err := client.Pool().Get()
Expect(err).NotTo(HaveOccurred())

cn.NetConn = &badConn{}
cn.SetNetConn(&badConn{})
err = client.Pool().Put(cn)
Expect(err).NotTo(HaveOccurred())

Expand Down
Loading

0 comments on commit fdd0fdf

Please sign in to comment.