Skip to content

Commit

Permalink
Merge branch 'master' into v9
Browse files Browse the repository at this point in the history
Signed-off-by: monkey <[email protected]>
  • Loading branch information
monkey92t committed Jul 24, 2021
2 parents 916da5b + 704212e commit 8345485
Show file tree
Hide file tree
Showing 21 changed files with 414 additions and 71 deletions.
10 changes: 10 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
version: 2
updates:
- package-ecosystem: gomod
directory: /
schedule:
interval: weekly
- package-ecosystem: github-actions
directory: /
schedule:
interval: weekly
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,11 @@ Lastly, run:
```
go test
```

## Contributors

Thanks to all the people who already contributed!

<a href="https://github.com/go-redis/redis/graphs/contributors">
<img src="https://contributors-img.web.app/image?repo=go-redis/redis" />
</a>
16 changes: 14 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type ClusterOptions struct {
ReadTimeout time.Duration
WriteTimeout time.Duration

// PoolFIFO uses FIFO mode for each node connection pool GET/PUT (default LIFO).
PoolFIFO bool

// PoolSize applies per cluster node and not for the whole cluster.
PoolSize int
MinIdleConns int
Expand Down Expand Up @@ -146,6 +149,7 @@ func (opt *ClusterOptions) clientOptions() *Options {
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,

PoolFIFO: opt.PoolFIFO,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
Expand Down Expand Up @@ -591,8 +595,16 @@ func (c *clusterState) slotRandomNode(slot int) (*clusterNode, error) {
if len(nodes) == 0 {
return c.nodes.Random()
}
n := rand.Intn(len(nodes))
return nodes[n], nil
if len(nodes) == 1 {
return nodes[0], nil
}
randomNodes := rand.Perm(len(nodes))
for _, idx := range randomNodes {
if node := nodes[idx]; !node.Failing() {
return node, nil
}
}
return nodes[randomNodes[0]], nil
}

func (c *clusterState) slotNodes(slot int) []*clusterNode {
Expand Down
102 changes: 56 additions & 46 deletions cluster_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,55 +8,63 @@ import (

func (c *ClusterClient) DBSize(ctx context.Context) *IntCmd {
cmd := NewIntCmd(ctx, "dbsize")
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
var size int64
err := c.ForEachMaster(ctx, func(ctx context.Context, master *Client) error {
n, err := master.DBSize(ctx).Result()
if err != nil {
return err
}
atomic.AddInt64(&size, n)
return nil
})
if err != nil {
return err
cmd.SetErr(err)
} else {
cmd.val = size
}
atomic.AddInt64(&size, n)
return nil
})
if err != nil {
cmd.SetErr(err)
return cmd
}
cmd.val = size
return cmd
}

func (c *ClusterClient) ScriptLoad(ctx context.Context, script string) *StringCmd {
cmd := NewStringCmd(ctx, "script", "load", script)
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptLoad(ctx, script).Result()
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
}
mu.Unlock()

return nil
})
if err != nil {
return err
}

mu.Lock()
if cmd.Val() == "" {
cmd.val = val
cmd.SetErr(err)
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

return cmd
}

func (c *ClusterClient) ScriptFlush(ctx context.Context) *StatusCmd {
cmd := NewStatusCmd(ctx, "script", "flush")
_ = c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
shard.ScriptFlush(ctx)

_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
return shard.ScriptFlush(ctx).Err()
})
if err != nil {
cmd.SetErr(err)
}
return nil
})

return cmd
}

Expand All @@ -74,26 +82,28 @@ func (c *ClusterClient) ScriptExists(ctx context.Context, hashes ...string) *Boo
result[i] = true
}

mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
_ = c.hooks.process(ctx, cmd, func(ctx context.Context, _ Cmder) error {
mu := &sync.Mutex{}
err := c.ForEachShard(ctx, func(ctx context.Context, shard *Client) error {
val, err := shard.ScriptExists(ctx, hashes...).Result()
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
}
mu.Unlock()

return nil
})
if err != nil {
return err
}

mu.Lock()
for i, v := range val {
result[i] = result[i] && v
cmd.SetErr(err)
} else {
cmd.val = result
}
mu.Unlock()

return nil
})
if err != nil {
cmd.SetErr(err)
}

cmd.val = result

return cmd
}
7 changes: 7 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ type Cmdable interface {
LSet(ctx context.Context, key string, index int64, value interface{}) *StatusCmd
LTrim(ctx context.Context, key string, start, stop int64) *StatusCmd
RPop(ctx context.Context, key string) *StringCmd
RPopCount(ctx context.Context, key string, count int) *StringSliceCmd
RPopLPush(ctx context.Context, source, destination string) *StringCmd
RPush(ctx context.Context, key string, values ...interface{}) *IntCmd
RPushX(ctx context.Context, key string, values ...interface{}) *IntCmd
Expand Down Expand Up @@ -1452,6 +1453,12 @@ func (c cmdable) RPop(ctx context.Context, key string) *StringCmd {
return cmd
}

func (c cmdable) RPopCount(ctx context.Context, key string, count int) *StringSliceCmd {
cmd := NewStringSliceCmd(ctx, "rpop", key, count)
_ = c(ctx, cmd)
return cmd
}

func (c cmdable) RPopLPush(ctx context.Context, source, destination string) *StringCmd {
cmd := NewStringCmd(ctx, "rpoplpush", source, destination)
_ = c(ctx, cmd)
Expand Down
80 changes: 80 additions & 0 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2282,6 +2282,20 @@ var _ = Describe("Commands", func() {
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))
})

It("should RPopCount", func() {
rPush := client.RPush(ctx, "list", "one", "two", "three", "four")
Expect(rPush.Err()).NotTo(HaveOccurred())
Expect(rPush.Val()).To(Equal(int64(4)))

rPopCount := client.RPopCount(ctx, "list", 2)
Expect(rPopCount.Err()).NotTo(HaveOccurred())
Expect(rPopCount.Val()).To(Equal([]string{"four", "three"}))

lRange := client.LRange(ctx, "list", 0, -1)
Expect(lRange.Err()).NotTo(HaveOccurred())
Expect(lRange.Val()).To(Equal([]string{"one", "two"}))
})

It("should RPopLPush", func() {
rPush := client.RPush(ctx, "list", "one")
Expect(rPush.Err()).NotTo(HaveOccurred())
Expand Down Expand Up @@ -4113,6 +4127,45 @@ var _ = Describe("Commands", func() {
}))
})

It("should ZUnion", func() {
err := client.ZAddArgs(ctx, "zset1", redis.ZAddArgs{
Members: []redis.Z{
{Score: 1, Member: "one"},
{Score: 2, Member: "two"},
},
}).Err()
Expect(err).NotTo(HaveOccurred())

err = client.ZAddArgs(ctx, "zset2", redis.ZAddArgs{
Members: []redis.Z{
{Score: 1, Member: "one"},
{Score: 2, Member: "two"},
{Score: 3, Member: "three"},
},
}).Err()
Expect(err).NotTo(HaveOccurred())

union, err := client.ZUnion(ctx, redis.ZStore{
Keys: []string{"zset1", "zset2"},
Weights: []float64{2, 3},
Aggregate: "sum",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(union).To(Equal([]string{"one", "three", "two"}))

unionScores, err := client.ZUnionWithScores(ctx, redis.ZStore{
Keys: []string{"zset1", "zset2"},
Weights: []float64{2, 3},
Aggregate: "sum",
}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(unionScores).To(Equal([]redis.Z{
{Score: 5, Member: "one"},
{Score: 9, Member: "three"},
{Score: 10, Member: "two"},
}))
})

It("should ZUnionStore", func() {
err := client.ZAdd(ctx, "zset1", redis.Z{Score: 1, Member: "one"}).Err()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -4339,6 +4392,33 @@ var _ = Describe("Commands", func() {
Expect(n).To(Equal(int64(3)))
})

// TODO XTrimMaxLenApprox/XTrimMinIDApprox There is a bug in the limit parameter.
// TODO Don't test it for now.
// TODO link: https://github.com/redis/redis/issues/9046
It("should XTrimMaxLen", func() {
n, err := client.XTrimMaxLen(ctx, "stream", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMaxLenApprox", func() {
n, err := client.XTrimMaxLenApprox(ctx, "stream", 0, 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMinID", func() {
n, err := client.XTrimMinID(ctx, "stream", "4-0").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XTrimMinIDApprox", func() {
n, err := client.XTrimMinIDApprox(ctx, "stream", "4-0", 0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(3)))
})

It("should XAdd", func() {
id, err := client.XAdd(ctx, &redis.XAddArgs{
Stream: "stream",
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ func ExampleClient_Watch() {
// Actual opperation (local in optimistic lock).
n++

// Operation is commited only if the watched keys remain unchanged.
// Operation is committed only if the watched keys remain unchanged.
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, n, 0)
return nil
Expand Down
45 changes: 45 additions & 0 deletions internal/pool/conncheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// +build linux darwin dragonfly freebsd netbsd openbsd solaris illumos

package pool

import (
"errors"
"io"
"net"
"syscall"
)

var errUnexpectedRead = errors.New("unexpected read from socket")

func connCheck(conn net.Conn) error {
sysConn, ok := conn.(syscall.Conn)
if !ok {
return nil
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return err
}

var sysErr error
err = rawConn.Read(func(fd uintptr) bool {
var buf [1]byte
n, err := syscall.Read(int(fd), buf[:])
switch {
case n == 0 && err == nil:
sysErr = io.EOF
case n > 0:
sysErr = errUnexpectedRead
case err == syscall.EAGAIN || err == syscall.EWOULDBLOCK:
sysErr = nil
default:
sysErr = err
}
return true
})
if err != nil {
return err
}

return sysErr
}
9 changes: 9 additions & 0 deletions internal/pool/conncheck_dummy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// +build !linux,!darwin,!dragonfly,!freebsd,!netbsd,!openbsd,!solaris,!illumos

package pool

import "net"

func connCheck(conn net.Conn) error {
return nil
}
Loading

0 comments on commit 8345485

Please sign in to comment.