Skip to content

Commit

Permalink
Merge pull request redis#285 from go-redis/feature/new-pool
Browse files Browse the repository at this point in the history
Faster and simpler pool.
  • Loading branch information
vmihailenco committed Mar 19, 2016
2 parents 0303cfb + 6e1aef3 commit fb6ea09
Show file tree
Hide file tree
Showing 22 changed files with 416 additions and 490 deletions.
108 changes: 62 additions & 46 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@ import (
type ClusterClient struct {
commandable

opt *ClusterOptions

slotsMx sync.RWMutex // protects slots and addrs
addrs []string
slots [][]string
slotsMx sync.RWMutex // Protects slots and addrs.

clientsMx sync.RWMutex // protects clients and closed
clients map[string]*Client
closed bool
clientsMx sync.RWMutex // Protects clients and closed.

opt *ClusterOptions
_closed int32 // atomic

// Reports where slots reloading is in progress.
reloading uint32
Expand All @@ -34,17 +35,29 @@ type ClusterClient struct {
// http://redis.io/topics/cluster-spec.
func NewClusterClient(opt *ClusterOptions) *ClusterClient {
client := &ClusterClient{
opt: opt,
addrs: opt.Addrs,
slots: make([][]string, hashtag.SlotNumber),
clients: make(map[string]*Client),
opt: opt,
}
client.commandable.process = client.process
client.reloadSlots()
go client.reaper()
return client
}

// getClients returns a snapshot of clients for cluster nodes
// this ClusterClient has been working with recently.
// Note that snapshot can contain closed clients.
func (c *ClusterClient) getClients() map[string]*Client {
c.clientsMx.RLock()
clients := make(map[string]*Client, len(c.clients))
for addr, client := range c.clients {
clients[addr] = client
}
c.clientsMx.RUnlock()
return clients
}

// Watch creates new transaction and marks the keys to be watched
// for conditional execution of a transaction.
func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
Expand All @@ -59,56 +72,56 @@ func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
acc := PoolStats{}
c.clientsMx.RLock()
for _, client := range c.clients {
m := client.PoolStats()
acc.Requests += m.Requests
acc.Waits += m.Waits
acc.Timeouts += m.Timeouts
acc.TotalConns += m.TotalConns
acc.FreeConns += m.FreeConns
for _, client := range c.getClients() {
s := client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
acc.Waits += s.Waits
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
c.clientsMx.RUnlock()
return &acc
}

func (c *ClusterClient) closed() bool {
return atomic.LoadInt32(&c._closed) == 1
}

// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a ClusterClient, as the ClusterClient is meant
// to be long-lived and shared between many goroutines.
func (c *ClusterClient) Close() error {
defer c.clientsMx.Unlock()
c.clientsMx.Lock()

if c.closed {
if !atomic.CompareAndSwapInt32(&c._closed, 0, 1) {
return pool.ErrClosed
}
c.closed = true

c.clientsMx.Lock()
c.resetClients()
c.clientsMx.Unlock()
c.setSlots(nil)
return nil
}

// getClient returns a Client for a given address.
func (c *ClusterClient) getClient(addr string) (*Client, error) {
if c.closed() {
return nil, pool.ErrClosed
}

if addr == "" {
return c.randomClient()
}

c.clientsMx.RLock()
client, ok := c.clients[addr]
c.clientsMx.RUnlock()
if ok {
c.clientsMx.RUnlock()
return client, nil
}
c.clientsMx.RUnlock()

c.clientsMx.Lock()
if c.closed {
c.clientsMx.Unlock()
return nil, pool.ErrClosed
}

client, ok = c.clients[addr]
if !ok {
opt := c.opt.clientOptions()
Expand Down Expand Up @@ -276,28 +289,30 @@ func (c *ClusterClient) lazyReloadSlots() {
}

// reaper closes idle connections to the cluster.
func (c *ClusterClient) reaper() {
ticker := time.NewTicker(time.Minute)
func (c *ClusterClient) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for _ = range ticker.C {
c.clientsMx.RLock()

if c.closed {
c.clientsMx.RUnlock()
for _ = range ticker.C {
if c.closed() {
break
}

for _, client := range c.clients {
pool := client.connPool
// pool.First removes idle connections from the pool and
// returns first non-idle connection. So just put returned
// connection back.
if cn := pool.First(); cn != nil {
pool.Put(cn)
var n int
for _, client := range c.getClients() {
nn, err := client.connPool.(*pool.ConnPool).ReapStaleConns()
if err != nil {
Logger.Printf("ReapStaleConns failed: %s", err)
} else {
n += nn
}
}

c.clientsMx.RUnlock()
s := c.PoolStats()
Logger.Printf(
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
)
}
}

Expand All @@ -309,8 +324,7 @@ type ClusterOptions struct {
// A seed list of host:port addresses of cluster nodes.
Addrs []string

// The maximum number of MOVED/ASK redirects to follow before
// giving up.
// The maximum number of MOVED/ASK redirects to follow before giving up.
// Default is 16
MaxRedirects int

Expand All @@ -323,9 +337,10 @@ type ClusterOptions struct {
WriteTimeout time.Duration

// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
PoolTimeout time.Duration
IdleTimeout time.Duration
PoolSize int
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
}

func (opt *ClusterOptions) getMaxRedirects() int {
Expand All @@ -349,5 +364,6 @@ func (opt *ClusterOptions) clientOptions() *Options {
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
// IdleCheckFrequency is not copied to disable reaper
}
}
10 changes: 9 additions & 1 deletion cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,15 @@ func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.Cluste
addrs[i] = net.JoinHostPort("127.0.0.1", port)
}
if opt == nil {
opt = &redis.ClusterOptions{}
opt = &redis.ClusterOptions{
DialTimeout: 10 * time.Second,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
PoolSize: 10,
PoolTimeout: 30 * time.Second,
IdleTimeout: time.Second,
IdleCheckFrequency: time.Second,
}
}
opt.Addrs = addrs
return redis.NewClusterClient(opt)
Expand Down
14 changes: 9 additions & 5 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1301,12 +1301,16 @@ var _ = Describe("Commands", func() {
})

It("should BLPop timeout", func() {
bLPop := client.BLPop(time.Second, "list1")
Expect(bLPop.Val()).To(BeNil())
Expect(bLPop.Err()).To(Equal(redis.Nil))
val, err := client.BLPop(time.Second, "list1").Result()
Expect(err).To(Equal(redis.Nil))
Expect(val).To(BeNil())

Expect(client.Ping().Err()).NotTo(HaveOccurred())

stats := client.Pool().Stats()
Expect(stats.Requests - stats.Hits - stats.Waits).To(Equal(uint32(1)))
stats := client.PoolStats()
Expect(stats.Requests).To(Equal(uint32(3)))
Expect(stats.Hits).To(Equal(uint32(2)))
Expect(stats.Timeouts).To(Equal(uint32(0)))
})

It("should BRPop", func() {
Expand Down
10 changes: 9 additions & 1 deletion export_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package redis

import "gopkg.in/redis.v3/internal/pool"
import (
"time"

"gopkg.in/redis.v3/internal/pool"
)

func (c *baseClient) Pool() pool.Pooler {
return c.connPool
Expand All @@ -9,3 +13,7 @@ func (c *baseClient) Pool() pool.Pooler {
func (c *PubSub) Pool() pool.Pooler {
return c.base.connPool
}

func SetReceiveMessageTimeout(d time.Duration) {
receiveMessageTimeout = d
}
46 changes: 19 additions & 27 deletions internal/pool/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,26 @@ package pool_test

import (
"errors"
"net"
"testing"
"time"

"gopkg.in/redis.v3/internal/pool"
)

func benchmarkPoolGetPut(b *testing.B, poolSize int) {
dial := func() (net.Conn, error) {
return &net.TCPConn{}, nil
}
pool := pool.NewConnPool(dial, poolSize, time.Second, 0)
pool.DialLimiter = nil
connPool := pool.NewConnPool(dummyDialer, poolSize, time.Second, time.Hour, time.Hour)
connPool.DialLimiter = nil

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
conn, err := pool.Get()
cn, err := connPool.Get()
if err != nil {
b.Fatalf("no error expected on pool.Get but received: %s", err.Error())
b.Fatal(err)
}
if err = pool.Put(conn); err != nil {
b.Fatalf("no error expected on pool.Put but received: %s", err.Error())
if err = connPool.Put(cn); err != nil {
b.Fatal(err)
}
}
})
Expand All @@ -43,38 +39,34 @@ func BenchmarkPoolGetPut1000Conns(b *testing.B) {
benchmarkPoolGetPut(b, 1000)
}

func benchmarkPoolGetReplace(b *testing.B, poolSize int) {
dial := func() (net.Conn, error) {
return &net.TCPConn{}, nil
}
pool := pool.NewConnPool(dial, poolSize, time.Second, 0)
pool.DialLimiter = nil

func benchmarkPoolGetRemove(b *testing.B, poolSize int) {
connPool := pool.NewConnPool(dummyDialer, poolSize, time.Second, time.Hour, time.Hour)
connPool.DialLimiter = nil
removeReason := errors.New("benchmark")

b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
conn, err := pool.Get()
cn, err := connPool.Get()
if err != nil {
b.Fatalf("no error expected on pool.Get but received: %s", err.Error())
b.Fatal(err)
}
if err = pool.Replace(conn, removeReason); err != nil {
b.Fatalf("no error expected on pool.Remove but received: %s", err.Error())
if err := connPool.Remove(cn, removeReason); err != nil {
b.Fatal(err)
}
}
})
}

func BenchmarkPoolGetReplace10Conns(b *testing.B) {
benchmarkPoolGetReplace(b, 10)
func BenchmarkPoolGetRemove10Conns(b *testing.B) {
benchmarkPoolGetRemove(b, 10)
}

func BenchmarkPoolGetReplace100Conns(b *testing.B) {
benchmarkPoolGetReplace(b, 100)
func BenchmarkPoolGetRemove100Conns(b *testing.B) {
benchmarkPoolGetRemove(b, 100)
}

func BenchmarkPoolGetReplace1000Conns(b *testing.B) {
benchmarkPoolGetReplace(b, 1000)
func BenchmarkPoolGetRemove1000Conns(b *testing.B) {
benchmarkPoolGetRemove(b, 1000)
}
17 changes: 0 additions & 17 deletions internal/pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"io"
"net"
"sync/atomic"
"time"
)

Expand All @@ -13,8 +12,6 @@ const defaultBufSize = 4096
var noDeadline = time.Time{}

type Conn struct {
idx int32

NetConn net.Conn
Rd *bufio.Reader
Buf []byte
Expand All @@ -28,8 +25,6 @@ type Conn struct {

func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
idx: -1,

NetConn: netConn,
Buf: make([]byte, defaultBufSize),

Expand All @@ -39,18 +34,6 @@ func NewConn(netConn net.Conn) *Conn {
return cn
}

func (cn *Conn) Index() int {
return int(atomic.LoadInt32(&cn.idx))
}

func (cn *Conn) SetIndex(newIdx int) int {
oldIdx := cn.Index()
if !atomic.CompareAndSwapInt32(&cn.idx, int32(oldIdx), int32(newIdx)) {
return -1
}
return oldIdx
}

func (cn *Conn) IsStale(timeout time.Duration) bool {
return timeout > 0 && time.Since(cn.UsedAt) > timeout
}
Expand Down
Loading

0 comments on commit fb6ea09

Please sign in to comment.