Skip to content

Commit

Permalink
Fix WrapProcess for Ring and Cluster. Add better example.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Nov 30, 2016
1 parent b148c1a commit 82f2163
Show file tree
Hide file tree
Showing 7 changed files with 124 additions and 61 deletions.
17 changes: 4 additions & 13 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ type Cmdable interface {
ClientKill(ipPort string) *StatusCmd
ClientList() *StringCmd
ClientPause(dur time.Duration) *BoolCmd
ClientSetName(name string) *BoolCmd
ConfigGet(parameter string) *SliceCmd
ConfigResetStat() *StatusCmd
ConfigSet(parameter, value string) *StatusCmd
Expand Down Expand Up @@ -241,14 +240,6 @@ type cmdable struct {
process func(cmd Cmder) error
}

// WrapProcess replaces the process func. It takes a function createWrapper
// which is supplied by the user. createWrapper takes the old process func as
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *cmdable) WrapProcess(createWrapper func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.process = createWrapper(c.process)
}

type statefulCmdable struct {
process func(cmd Cmder) error
}
Expand Down Expand Up @@ -1625,15 +1616,15 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
return cmd
}

// ClientSetName assigns a name to the one of many connections in the pool.
func (c *cmdable) ClientSetName(name string) *BoolCmd {
// ClientSetName assigns a name to the connection.
func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
cmd := NewBoolCmd("client", "setname", name)
c.process(cmd)
return cmd
}

// ClientGetName returns the name of the one of many connections in the pool.
func (c *Client) ClientGetName() *StringCmd {
// ClientGetName returns the name of the connection.
func (c *statefulCmdable) ClientGetName() *StringCmd {
cmd := NewStringCmd("client", "getname")
c.process(cmd)
return cmd
Expand Down
47 changes: 31 additions & 16 deletions commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@ var _ = Describe("Commands", func() {

Describe("server", func() {

// It("should Auth", func() {
// auth := client.Auth("password")
// Expect(auth.Err()).To(MatchError("ERR Client sent AUTH, but no password is set"))
// Expect(auth.Val()).To(Equal(""))
// })
It("should Auth", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
pipe.Auth("password")
return nil
})
Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
})

It("should Echo", func() {
echo := client.Echo("hello")
pipe := client.Pipeline()
echo := pipe.Echo("hello")
_, err := pipe.Exec()
Expect(err).NotTo(HaveOccurred())

Expect(echo.Err()).NotTo(HaveOccurred())
Expect(echo.Val()).To(Equal("hello"))
})
Expand All @@ -44,11 +50,15 @@ var _ = Describe("Commands", func() {
Expect(ping.Val()).To(Equal("PONG"))
})

// It("should Select", func() {
// sel := client.Select(1)
// Expect(sel.Err()).NotTo(HaveOccurred())
// Expect(sel.Val()).To(Equal("OK"))
// })
It("should Select", func() {
pipe := client.Pipeline()
sel := pipe.Select(1)
_, err := pipe.Exec()
Expect(err).NotTo(HaveOccurred())

Expect(sel.Err()).NotTo(HaveOccurred())
Expect(sel.Val()).To(Equal("OK"))
})

It("should BgRewriteAOF", func() {
Skip("flaky test")
Expand Down Expand Up @@ -84,13 +94,18 @@ var _ = Describe("Commands", func() {
})

It("should ClientSetName and ClientGetName", func() {
isSet, err := client.ClientSetName("theclientname").Result()
pipe := client.Pipeline()
set := pipe.ClientSetName("theclientname")
get := pipe.ClientGetName()
_, err := pipe.Exec()
Expect(err).NotTo(HaveOccurred())
Expect(isSet).To(BeTrue())

val, err := client.ClientGetName().Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("theclientname"))
Expect(set.Err()).NotTo(HaveOccurred())
Expect(set.Val()).To(BeTrue())

Expect(get.Err()).NotTo(HaveOccurred())
Expect(get.Val()).To(Equal("theclientname"))

})

It("should ConfigGet", func() {
Expand Down
59 changes: 59 additions & 0 deletions example_instrumentation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package redis_test

import (
"fmt"
"sync/atomic"
"time"

redis "gopkg.in/redis.v5"
)

func Example_instrumentation() {
ring := redis.NewRing(&redis.RingOptions{
Addrs: map[string]string{
"shard1": ":6379",
},
})
ring.ForEachShard(func(client *redis.Client) error {
wrapRedisProcess(client)
return nil
})

for {
ring.Ping()
}
}

func wrapRedisProcess(client *redis.Client) {
const precision = time.Microsecond
var count, avgDur uint32

go func() {
for _ = range time.Tick(3 * time.Second) {
n := atomic.LoadUint32(&count)
dur := time.Duration(atomic.LoadUint32(&avgDur)) * precision
fmt.Printf("%s: processed=%d avg_dur=%s\n", client, n, dur)
}
}()

client.WrapProcess(func(oldProcess func(redis.Cmder) error) func(redis.Cmder) error {
return func(cmd redis.Cmder) error {
start := time.Now()
err := oldProcess(cmd)
dur := time.Since(start)

const decay = float64(1) / 100
ms := float64(dur / precision)
for {
avg := atomic.LoadUint32(&avgDur)
newAvg := uint32((1-decay)*float64(avg) + decay*ms)
if atomic.CompareAndSwapUint32(&avgDur, avg, newAvg) {
break
}
}
atomic.AddUint32(&count, 1)

return err
}
})
}
18 changes: 0 additions & 18 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,21 +331,3 @@ func ExampleScanCmd_Iterator() {
panic(err)
}
}

func ExampleClient_instrumentation() {
client := redis.NewClient(&redis.Options{
Addr: ":6379",
})
client.WrapProcess(func(oldProcess func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
start := time.Now()
err := oldProcess(cmd)
if err != nil {
fmt.Printf("command %s failed: %s", cmd, err)
} else {
fmt.Printf("command %q took %s", cmd, time.Since(start))
}
return err
}
})
}
2 changes: 1 addition & 1 deletion pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ var _ = Describe("Pipelining", func() {
const N = 1000

pipeline := client.Pipeline()
wg := &sync.WaitGroup{}
var wg sync.WaitGroup
wg.Add(N)
for i := 0; i < N; i++ {
go func() {
Expand Down
16 changes: 16 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type baseClient struct {
connPool pool.Pooler
opt *Options

process func(Cmder) error
onClose func() error // hook called when client is closed
}

Expand Down Expand Up @@ -78,6 +79,21 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
}

func (c *baseClient) Process(cmd Cmder) error {
if c.process != nil {
return c.process(cmd)
}
return c.defaultProcess(cmd)
}

// WrapProcess replaces the process func. It takes a function createWrapper
// which is supplied by the user. createWrapper takes the old process func as
// an input and returns the new wrapper process func. createWrapper should
// use call the old process func within the new process func.
func (c *baseClient) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
c.process = fn(c.defaultProcess)
}

func (c *baseClient) defaultProcess(cmd Cmder) error {
for i := 0; i <= c.opt.MaxRetries; i++ {
if i > 0 {
cmd.reset()
Expand Down
26 changes: 13 additions & 13 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ func (c *Ring) addClient(name string, cl *Client) {
c.mu.Unlock()
}

func (c *Ring) shardByKey(key string) (*Client, error) {
func (c *Ring) shardByKey(key string) (*ringShard, error) {
key = hashtag.Key(key)

c.mu.RLock()
Expand All @@ -246,27 +246,27 @@ func (c *Ring) shardByKey(key string) (*Client, error) {
return nil, errRingShardsDown
}

cl := c.shards[name].Client
shard := c.shards[name]
c.mu.RUnlock()
return cl, nil
return shard, nil
}

func (c *Ring) randomShard() (*Client, error) {
func (c *Ring) randomShard() (*ringShard, error) {
return c.shardByKey(strconv.Itoa(rand.Int()))
}

func (c *Ring) shardByName(name string) (*Client, error) {
func (c *Ring) shardByName(name string) (*ringShard, error) {
if name == "" {
return c.randomShard()
}

c.mu.RLock()
cl := c.shards[name].Client
shard := c.shards[name]
c.mu.RUnlock()
return cl, nil
return shard, nil
}

func (c *Ring) cmdShard(cmd Cmder) (*Client, error) {
func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
cmdInfo := c.cmdInfo(cmd.arg(0))
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
if firstKey == "" {
Expand All @@ -276,12 +276,12 @@ func (c *Ring) cmdShard(cmd Cmder) (*Client, error) {
}

func (c *Ring) Process(cmd Cmder) error {
cl, err := c.cmdShard(cmd)
shard, err := c.cmdShard(cmd)
if err != nil {
cmd.setErr(err)
return err
}
return cl.baseClient.Process(cmd)
return shard.Client.Process(cmd)
}

// rebalance removes dead shards from the Ring.
Expand Down Expand Up @@ -384,7 +384,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
resetCmds(cmds)
}

client, err := c.shardByName(name)
shard, err := c.shardByName(name)
if err != nil {
setCmdsErr(cmds, err)
if firstErr == nil {
Expand All @@ -393,7 +393,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
continue
}

cn, _, err := client.conn()
cn, _, err := shard.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
if firstErr == nil {
Expand All @@ -403,7 +403,7 @@ func (c *Ring) pipelineExec(cmds []Cmder) (firstErr error) {
}

retry, err := execCmds(cn, cmds)
client.putConn(cn, err, false)
shard.Client.putConn(cn, err, false)
if err == nil {
continue
}
Expand Down

0 comments on commit 82f2163

Please sign in to comment.