Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/do #838

Merged
merged 2 commits into from
Aug 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
- Added Options.MinIdleConns.
- Added Options.MaxConnAge.
- PoolStats.FreeConns is renamed to PoolStats.IdleConns.
- Add Client.Do to simplify creating custom commands.

## v6.13

Expand Down
11 changes: 9 additions & 2 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,13 @@ func (c *ClusterClient) Close() error {
return c.nodes.Close()
}

// Do creates a Cmd from the args and processes the cmd.
func (c *ClusterClient) Do(args ...interface{}) *Cmd {
cmd := NewCmd(args...)
c.Process(cmd)
return cmd
}

func (c *ClusterClient) WrapProcess(
fn func(oldProcess func(Cmder) error) func(Cmder) error,
) {
Expand Down Expand Up @@ -1242,7 +1249,7 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
cmdsMap = failedCmds
}

return firstCmdsErr(cmds)
return cmdsFirstErr(cmds)
}

func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
Expand Down Expand Up @@ -1424,7 +1431,7 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
}
}

return firstCmdsErr(cmds)
return cmdsFirstErr(cmds)
}

func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
Expand Down
76 changes: 72 additions & 4 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type Cmder interface {
readTimeout() *time.Duration

Err() error
fmt.Stringer
}

func setCmdsErr(cmds []Cmder, e error) {
Expand All @@ -36,7 +35,7 @@ func setCmdsErr(cmds []Cmder, e error) {
}
}

func firstCmdsErr(cmds []Cmder) error {
func cmdsFirstErr(cmds []Cmder) error {
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
return err
Expand Down Expand Up @@ -167,8 +166,77 @@ func (cmd *Cmd) Result() (interface{}, error) {
return cmd.val, cmd.err
}

func (cmd *Cmd) String() string {
return cmdString(cmd, cmd.val)
func (cmd *Cmd) String() (string, error) {
if cmd.err != nil {
return "", cmd.err
}
switch val := cmd.val.(type) {
case string:
return val, nil
default:
err := fmt.Errorf("redis: unexpected type=%T for String", val)
return "", err
}
}

func (cmd *Cmd) Int64() (int64, error) {
if cmd.err != nil {
return 0, cmd.err
}
switch val := cmd.val.(type) {
case int64:
return val, nil
case string:
return strconv.ParseInt(val, 10, 64)
default:
err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
return 0, err
}
}

func (cmd *Cmd) Uint64() (uint64, error) {
if cmd.err != nil {
return 0, cmd.err
}
switch val := cmd.val.(type) {
case int64:
return uint64(val), nil
case string:
return strconv.ParseUint(val, 10, 64)
default:
err := fmt.Errorf("redis: unexpected type=%T for Uint64", val)
return 0, err
}
}

func (cmd *Cmd) Float64() (float64, error) {
if cmd.err != nil {
return 0, cmd.err
}
switch val := cmd.val.(type) {
case int64:
return float64(val), nil
case string:
return strconv.ParseFloat(val, 64)
default:
err := fmt.Errorf("redis: unexpected type=%T for Float64", val)
return 0, err
}
}

func (cmd *Cmd) Bool() (bool, error) {
if cmd.err != nil {
return false, cmd.err
}
switch val := cmd.val.(type) {
case int64:
return val != 0, nil
case string:
return strconv.ParseBool(val)
default:
err := fmt.Errorf("redis: unexpected type=%T for Bool", val)
return false, err
}
}

func (cmd *Cmd) readReply(cn *pool.Conn) error {
Expand Down
12 changes: 6 additions & 6 deletions example_instrumentation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
)

func Example_instrumentation() {
cl := redis.NewClient(&redis.Options{
redisdb := redis.NewClient(&redis.Options{
Addr: ":6379",
})
cl.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
redisdb.WrapProcess(func(old func(cmd redis.Cmder) error) func(cmd redis.Cmder) error {
return func(cmd redis.Cmder) error {
fmt.Printf("starting processing: <%s>\n", cmd)
err := old(cmd)
Expand All @@ -19,17 +19,17 @@ func Example_instrumentation() {
}
})

cl.Ping()
redisdb.Ping()
// Output: starting processing: <ping: >
// finished processing: <ping: PONG>
}

func ExamplePipeline_instrumentation() {
client := redis.NewClient(&redis.Options{
redisdb := redis.NewClient(&redis.Options{
Addr: ":6379",
})

client.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error {
redisdb.WrapProcessPipeline(func(old func([]redis.Cmder) error) func([]redis.Cmder) error {
return func(cmds []redis.Cmder) error {
fmt.Printf("pipeline starting processing: %v\n", cmds)
err := old(cmds)
Expand All @@ -38,7 +38,7 @@ func ExamplePipeline_instrumentation() {
}
})

client.Pipelined(func(pipe redis.Pipeliner) error {
redisdb.Pipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
pipe.Ping()
return nil
Expand Down
Loading