Skip to content

Commit

Permalink
Merge pull request redis#219 from go-redis/feature/cluster-watch
Browse files Browse the repository at this point in the history
cluster: add Watch support.
  • Loading branch information
vmihailenco committed Dec 17, 2015
2 parents 745d733 + 9079a66 commit ba44d4d
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 24 deletions.
11 changes: 11 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return client
}

// Watch creates new transaction and marks the keys to be watched
// for conditional execution of a transaction.
func (c *ClusterClient) Watch(keys ...string) (*Multi, error) {
addr := c.slotMasterAddr(hashSlot(keys[0]))
client, err := c.getClient(addr)
if err != nil {
return nil, err
}
return client.Watch(keys...)
}

// Close closes the cluster client, releasing any open resources.
//
// It is rare to Close a ClusterClient, as the ClusterClient is meant
Expand Down
5 changes: 3 additions & 2 deletions cluster_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ package redis
type ClusterPipeline struct {
commandable

cmds []Cmder
cluster *ClusterClient
closed bool

cmds []Cmder
closed bool
}

// Pipeline creates a new pipeline which is able to execute commands
Expand Down
45 changes: 45 additions & 0 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"math/rand"
"net"
"reflect"
"strconv"
"strings"
"sync"

"testing"
"time"
Expand Down Expand Up @@ -317,6 +319,49 @@ var _ = Describe("Cluster", func() {
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
})

It("should Watch", func() {
var incr func(string) error

// Transactionally increments key using GET and SET commands.
incr = func(key string) error {
tx, err := client.Watch(key)
if err != nil {
return err
}
defer tx.Close()

n, err := tx.Get(key).Int64()
if err != nil && err != redis.Nil {
return err
}

_, err = tx.Exec(func() error {
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
if err == redis.TxFailedErr {
return incr(key)
}
return err
}

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()

err := incr("key")
Expect(err).NotTo(HaveOccurred())
}()
}
wg.Wait()

n, err := client.Get("key").Int64()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(100)))
})
})
})

Expand Down
4 changes: 2 additions & 2 deletions multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ type Multi struct {
closed bool
}

// Watch marks the keys to be watched for conditional execution
// of a transaction.
// Watch creates new transaction and marks the keys to be watched
// for conditional execution of a transaction.
func (c *Client) Watch(keys ...string) (*Multi, error) {
tx := c.Multi()
if err := tx.Watch(keys...).Err(); err != nil {
Expand Down
61 changes: 41 additions & 20 deletions multi_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package redis_test

import (
"strconv"
"sync"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"

Expand All @@ -21,29 +24,47 @@ var _ = Describe("Multi", func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should exec", func() {
multi := client.Multi()
defer func() {
Expect(multi.Close()).NotTo(HaveOccurred())
}()
It("should Watch", func() {
var incr func(string) error

var (
set *redis.StatusCmd
get *redis.StringCmd
)
cmds, err := multi.Exec(func() error {
set = multi.Set("key", "hello", 0)
get = multi.Get("key")
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(2))
// Transactionally increments key using GET and SET commands.
incr = func(key string) error {
tx, err := client.Watch(key)
if err != nil {
return err
}
defer tx.Close()

Expect(set.Err()).NotTo(HaveOccurred())
Expect(set.Val()).To(Equal("OK"))
n, err := tx.Get(key).Int64()
if err != nil && err != redis.Nil {
return err
}

Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("hello"))
_, err = tx.Exec(func() error {
tx.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
if err == redis.TxFailedErr {
return incr(key)
}
return err
}

var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()

err := incr("key")
Expect(err).NotTo(HaveOccurred())
}()
}
wg.Wait()

n, err := client.Get("key").Int64()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(100)))
})

It("should discard", func() {
Expand Down

0 comments on commit ba44d4d

Please sign in to comment.