Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Freeaqingme/go-redis into HEAD
Browse files Browse the repository at this point in the history
  • Loading branch information
Freeaqingme committed Dec 30, 2015
2 parents 6eec22a + edae11b commit 84cd769
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 39 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ services:
- redis-server

go:
- 1.3
- 1.4
- 1.5
- tip
Expand Down
49 changes: 49 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,55 @@ var _ = Describe("Cluster", func() {
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
})

It("should CLUSTER KEYSLOT", func() {
hashSlot, err := cluster.primary().ClusterKeySlot("somekey").Result()
Expect(err).NotTo(HaveOccurred())
Expect(hashSlot).To(Equal(int64(11058)))
})

It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
n, err := cluster.primary().ClusterCountFailureReports(cluster.nodeIds[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})

It("should CLUSTER COUNTKEYSINSLOT", func() {
n, err := cluster.primary().ClusterCountKeysInSlot(10).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})

It("should CLUSTER DELSLOTS", func() {
res, err := cluster.primary().ClusterDelSlotsRange(16000, 16384-1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
cluster.primary().ClusterAddSlotsRange(16000, 16384-1)
})

It("should CLUSTER SAVECONFIG", func() {
res, err := cluster.primary().ClusterSaveConfig().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})

It("should CLUSTER SLAVES", func() {
nodesList, err := cluster.primary().ClusterSlaves(cluster.nodeIds[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
Expect(nodesList).Should(HaveLen(1))
})

It("should CLUSTER READONLY", func() {
res, err := cluster.primary().Readonly().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})

It("should CLUSTER READWRITE", func() {
res, err := cluster.primary().ReadWrite().Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})
})

Describe("Client", func() {
Expand Down
78 changes: 76 additions & 2 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,13 @@ func (c *commandable) PFAdd(key string, fields ...string) *IntCmd {
return cmd
}

func (c *commandable) PFCount(key string) *IntCmd {
cmd := NewIntCmd("PFCOUNT", key)
func (c *commandable) PFCount(keys ...string) *IntCmd {
args := make([]interface{}, 1+len(keys))
args[0] = "PFCOUNT"
for i, key := range keys {
args[1+i] = key
}
cmd := NewIntCmd(args...)
c.Process(cmd)
return cmd
}
Expand Down Expand Up @@ -1693,6 +1698,75 @@ func (c *commandable) ClusterInfo() *StringCmd {
return cmd
}

func (c *commandable) ClusterKeySlot(key string) *IntCmd {
cmd := NewIntCmd("CLUSTER", "keyslot", key)
cmd._clusterKeyPos = 2
c.Process(cmd)
return cmd
}

func (c *commandable) ClusterCountFailureReports(nodeID string) *IntCmd {
cmd := NewIntCmd("CLUSTER", "count-failure-reports", nodeID)
cmd._clusterKeyPos = 2
c.Process(cmd)
return cmd
}

func (c *commandable) ClusterCountKeysInSlot(slot int) *IntCmd {
cmd := NewIntCmd("CLUSTER", "countkeysinslot", slot)
cmd._clusterKeyPos = 2
c.Process(cmd)
return cmd
}

func (c *commandable) ClusterDelSlots(slots ...int) *StatusCmd {
args := make([]interface{}, 2+len(slots))
args[0] = "CLUSTER"
args[1] = "DELSLOTS"
for i, slot := range slots {
args[2+i] = slot
}
cmd := newKeylessStatusCmd(args...)
c.Process(cmd)
return cmd
}

func (c *commandable) ClusterDelSlotsRange(min, max int) *StatusCmd {
size := max - min + 1
slots := make([]int, size)
for i := 0; i < size; i++ {
slots[i] = min + i
}
return c.ClusterDelSlots(slots...)
}

func (c *commandable) ClusterSaveConfig() *StatusCmd {
cmd := newKeylessStatusCmd("CLUSTER", "saveconfig")
c.Process(cmd)
return cmd
}

func (c *commandable) ClusterSlaves(nodeID string) *StringSliceCmd {
cmd := NewStringSliceCmd("CLUSTER", "SLAVES", nodeID)
cmd._clusterKeyPos = 2
c.Process(cmd)
return cmd
}

func (c *commandable) Readonly() *StatusCmd {
cmd := newKeylessStatusCmd("READONLY")
cmd._clusterKeyPos = 0
c.Process(cmd)
return cmd
}

func (c *commandable) ReadWrite() *StatusCmd {
cmd := newKeylessStatusCmd("READWRITE")
cmd._clusterKeyPos = 0
c.Process(cmd)
return cmd
}

func (c *commandable) ClusterFailover() *StatusCmd {
cmd := newKeylessStatusCmd("CLUSTER", "failover")
c.Process(cmd)
Expand Down
4 changes: 4 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1203,6 +1203,10 @@ var _ = Describe("Commands", func() {
pfCount = client.PFCount("hllMerged")
Expect(pfCount.Err()).NotTo(HaveOccurred())
Expect(pfCount.Val()).To(Equal(int64(10)))

pfCount = client.PFCount("hll1", "hll2")
Expect(pfCount.Err()).NotTo(HaveOccurred())
Expect(pfCount.Val()).To(Equal(int64(10)))
})
})

Expand Down
15 changes: 7 additions & 8 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,15 @@ func (c *Client) Multi() *Multi {
return multi
}

func (c *Multi) putConn(cn *conn, ei error) {
var err error
if isBadConn(cn, ei) {
func (c *Multi) putConn(cn *conn, err error) {
if isBadConn(cn, err) {
// Close current connection.
c.base.connPool.(*stickyConnPool).Reset()
c.base.connPool.(*stickyConnPool).Reset(err)
} else {
err = c.base.connPool.Put(cn)
}
if err != nil {
log.Printf("redis: putConn failed: %s", err)
err := c.base.connPool.Put(cn)
if err != nil {
log.Printf("redis: putConn failed: %s", err)
}
}
}

Expand Down
45 changes: 30 additions & 15 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type pool interface {
First() *conn
Get() (*conn, bool, error)
Put(*conn) error
Remove(*conn) error
Remove(*conn, error) error
Len() int
FreeLen() int
Close() error
Expand Down Expand Up @@ -130,7 +130,7 @@ type connPool struct {

_closed int32

lastDialErr error
lastErr atomic.Value
}

func newConnPool(opt *Options) *connPool {
Expand Down Expand Up @@ -204,15 +204,15 @@ func (p *connPool) wait() *conn {
func (p *connPool) new() (*conn, error) {
if p.rl.Limit() {
err := fmt.Errorf(
"redis: you open connections too fast (last error: %v)",
p.lastDialErr,
"redis: you open connections too fast (last_error=%q)",
p.loadLastErr(),
)
return nil, err
}

cn, err := p.dialer()
if err != nil {
p.lastDialErr = err
p.storeLastErr(err.Error())
return nil, err
}

Expand Down Expand Up @@ -255,8 +255,9 @@ func (p *connPool) Get() (cn *conn, isNew bool, err error) {
func (p *connPool) Put(cn *conn) error {
if cn.rd.Buffered() != 0 {
b, _ := cn.rd.Peek(cn.rd.Buffered())
log.Printf("redis: connection has unread data: %q", b)
return p.Remove(cn)
err := fmt.Errorf("redis: connection has unread data: %q", b)
log.Print(err)
return p.Remove(cn, err)
}
if p.opt.getIdleTimeout() > 0 {
cn.usedAt = time.Now()
Expand All @@ -275,7 +276,9 @@ func (p *connPool) replace(cn *conn) (*conn, error) {
return newcn, nil
}

func (p *connPool) Remove(cn *conn) error {
func (p *connPool) Remove(cn *conn, reason error) error {
p.storeLastErr(reason.Error())

// Replace existing connection with new one and unblock waiter.
newcn, err := p.replace(cn)
if err != nil {
Expand Down Expand Up @@ -330,6 +333,17 @@ func (p *connPool) reaper() {
}
}

func (p *connPool) storeLastErr(err string) {
p.lastErr.Store(err)
}

func (p *connPool) loadLastErr() string {
if v := p.lastErr.Load(); v != nil {
return v.(string)
}
return ""
}

//------------------------------------------------------------------------------

type singleConnPool struct {
Expand Down Expand Up @@ -357,7 +371,7 @@ func (p *singleConnPool) Put(cn *conn) error {
return nil
}

func (p *singleConnPool) Remove(cn *conn) error {
func (p *singleConnPool) Remove(cn *conn, _ error) error {
if p.cn != cn {
panic("p.cn != cn")
}
Expand Down Expand Up @@ -440,13 +454,13 @@ func (p *stickyConnPool) Put(cn *conn) error {
return nil
}

func (p *stickyConnPool) remove() (err error) {
err = p.pool.Remove(p.cn)
func (p *stickyConnPool) remove(reason error) (err error) {
err = p.pool.Remove(p.cn, reason)
p.cn = nil
return err
}

func (p *stickyConnPool) Remove(cn *conn) error {
func (p *stickyConnPool) Remove(cn *conn, _ error) error {
defer p.mx.Unlock()
p.mx.Lock()
if p.closed {
Expand Down Expand Up @@ -479,10 +493,10 @@ func (p *stickyConnPool) FreeLen() int {
return 0
}

func (p *stickyConnPool) Reset() (err error) {
func (p *stickyConnPool) Reset(reason error) (err error) {
p.mx.Lock()
if p.cn != nil {
err = p.remove()
err = p.remove(reason)
}
p.mx.Unlock()
return err
Expand All @@ -500,7 +514,8 @@ func (p *stickyConnPool) Close() error {
if p.reusable {
err = p.put()
} else {
err = p.remove()
reason := errors.New("redis: sticky not reusable connection")
err = p.remove(reason)
}
}
return err
Expand Down
26 changes: 22 additions & 4 deletions pool_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis_test

import (
"errors"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -36,7 +37,6 @@ var _ = Describe("pool", func() {
})

AfterEach(func() {
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
Expect(client.Close()).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -141,12 +141,12 @@ var _ = Describe("pool", func() {
pool := client.Pool()

// Reserve one connection.
cn, _, err := client.Pool().Get()
cn, _, err := pool.Get()
Expect(err).NotTo(HaveOccurred())

// Reserve the rest of connections.
for i := 0; i < 9; i++ {
_, _, err := client.Pool().Get()
_, _, err := pool.Get()
Expect(err).NotTo(HaveOccurred())
}

Expand All @@ -168,7 +168,8 @@ var _ = Describe("pool", func() {
// ok
}

Expect(pool.Remove(cn)).NotTo(HaveOccurred())
err = pool.Remove(cn, errors.New("test"))
Expect(err).NotTo(HaveOccurred())

// Check that Ping is unblocked.
select {
Expand All @@ -179,6 +180,23 @@ var _ = Describe("pool", func() {
}
Expect(ping.Err()).NotTo(HaveOccurred())
})

It("should rate limit dial", func() {
pool := client.Pool()

var rateErr error
for i := 0; i < 1000; i++ {
cn, _, err := pool.Get()
if err != nil {
rateErr = err
break
}

_ = pool.Remove(cn, errors.New("test"))
}

Expect(rateErr).To(MatchError(`redis: you open connections too fast (last_error="test")`))
})
})

func BenchmarkPool(b *testing.B) {
Expand Down
Loading

0 comments on commit 84cd769

Please sign in to comment.