Skip to content

Commit

Permalink
Add TxPipeline.
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Dec 16, 2016
1 parent c6acf2e commit 865d501
Show file tree
Hide file tree
Showing 13 changed files with 566 additions and 579 deletions.
267 changes: 203 additions & 64 deletions cluster.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package redis

import (
"fmt"
"math/rand"
"sync"
"sync/atomic"
Expand All @@ -9,6 +10,7 @@ import (
"gopkg.in/redis.v5/internal"
"gopkg.in/redis.v5/internal/hashtag"
"gopkg.in/redis.v5/internal/pool"
"gopkg.in/redis.v5/internal/proto"
)

var errClusterNoNodes = internal.RedisError("redis: cluster has no nodes")
Expand Down Expand Up @@ -417,10 +419,6 @@ func (c *ClusterClient) Process(cmd Cmder) error {

var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
cmd.reset()
}

if ask {
pipe := node.Client.Pipeline()
pipe.Process(NewCmd("ASKING"))
Expand Down Expand Up @@ -655,111 +653,252 @@ func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
}

func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
var firstErr error
setFirstErr := func(err error) {
if firstErr == nil {
firstErr = err
}
}

state := c.state()
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
_, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
cmd.setErr(err)
setFirstErr(err)
continue
}
cmdsMap[node] = append(cmdsMap[node], cmd)
cmdsMap, err := c.mapCmdsByNode(cmds)
if err != nil {
return err
}

for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
for i := 0; i <= c.opt.MaxRedirects; i++ {
failedCmds := make(map[*clusterNode][]Cmder)

for node, cmds := range cmdsMap {
cn, _, err := node.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
setFirstErr(err)
continue
}

failedCmds, err = c.execClusterCmds(cn, cmds, failedCmds)
err = c.pipelineProcessCmds(cn, cmds, failedCmds)
node.Client.putConn(cn, err, false)
if err != nil {
setFirstErr(err)
}
}

if len(failedCmds) == 0 {
break
}
cmdsMap = failedCmds
}

var firstErr error
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
firstErr = err
break
}
}
return firstErr
}

func (c *ClusterClient) execClusterCmds(
func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
state := c.state()
cmdsMap := make(map[*clusterNode][]Cmder)
for _, cmd := range cmds {
_, node, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
return nil, err
}
cmdsMap[node] = append(cmdsMap[node], cmd)
}
return cmdsMap, nil
}

func (c *ClusterClient) pipelineProcessCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) (map[*clusterNode][]Cmder, error) {
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := writeCmd(cn, cmds...); err != nil {
setCmdsErr(cmds, err)
return failedCmds, err
return err
}

// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)

return c.pipelineReadCmds(cn, cmds, failedCmds)
}

func (c *ClusterClient) pipelineReadCmds(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error
setFirstErr := func(err error) {
for _, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
continue
}

if firstErr == nil {
firstErr = err
}

err = c.checkMovedErr(cmd, failedCmds)
if err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}

// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)
func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
moved, ask, addr := internal.IsMovedError(cmd.Err())
if moved {
c.lazyReloadSlots()

for i, cmd := range cmds {
err := cmd.readReply(cn)
if err == nil {
node, err := c.nodes.Get(addr)
if err != nil {
return err
}

failedCmds[node] = append(failedCmds[node], cmd)
}
if ask {
node, err := c.nodes.Get(addr)
if err != nil {
return err
}

failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
}
return nil
}

func (c *ClusterClient) TxPipeline() *Pipeline {
pipe := Pipeline{
exec: c.txPipelineExec,
}
pipe.cmdable.process = pipe.Process
pipe.statefulCmdable.process = pipe.Process
return &pipe
}

func (c *ClusterClient) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}

func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
cmdsMap, err := c.mapCmdsBySlot(cmds)
if err != nil {
return err
}

for slot, cmds := range cmdsMap {
node, err := c.state().slotMasterNode(slot)
if err != nil {
setCmdsErr(cmds, err)
continue
}

if i == 0 && internal.IsRetryableError(err) {
node, err := c.nodes.Random()
if err != nil {
setFirstErr(err)
continue
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for i := 0; i <= c.opt.MaxRedirects; i++ {
failedCmds := make(map[*clusterNode][]Cmder)

for node, cmds := range cmdsMap {
cn, _, err := node.Client.conn()
if err != nil {
setCmdsErr(cmds, err)
continue
}

err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.putConn(cn, err, false)
}

cmd.reset()
failedCmds[node] = append(failedCmds[node], cmds...)
if len(failedCmds) == 0 {
break
}
cmdsMap = failedCmds
}
}

var firstErr error
for _, cmd := range cmds {
if err := cmd.Err(); err != nil {
firstErr = err
break
}
}
return firstErr
}

moved, ask, addr := internal.IsMovedError(err)
if moved {
c.lazyReloadSlots()
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) (map[int][]Cmder, error) {
state := c.state()
cmdsMap := make(map[int][]Cmder)
for _, cmd := range cmds {
slot, _, err := c.cmdSlotAndNode(state, cmd)
if err != nil {
return nil, err
}
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
return cmdsMap, nil
}

node, err := c.nodes.Get(addr)
if err != nil {
setFirstErr(err)
continue
}
func (c *ClusterClient) txPipelineProcessCmds(
node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
cn.SetWriteTimeout(c.opt.WriteTimeout)
if err := txPipelineWriteMulti(cn, cmds); err != nil {
setCmdsErr(cmds, err)
failedCmds[node] = cmds
return err
}

cmd.reset()
failedCmds[node] = append(failedCmds[node], cmd)
} else if ask {
node, err := c.nodes.Get(addr)
if err != nil {
setFirstErr(err)
continue
}
// Set read timeout for all commands.
cn.SetReadTimeout(c.opt.ReadTimeout)

cmd.reset()
failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
} else {
setFirstErr(err)
if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
return err
}

_, err := pipelineReadCmds(cn, cmds)
return err
}

func (c *ClusterClient) txPipelineReadQueued(
cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
) error {
var firstErr error

// Parse queued replies.
var statusCmd StatusCmd
if err := statusCmd.readReply(cn); err != nil && firstErr == nil {
firstErr = err
}

for _, cmd := range cmds {
err := statusCmd.readReply(cn)
if err == nil {
continue
}

cmd.setErr(err)
if firstErr == nil {
firstErr = err
}

err = c.checkMovedErr(cmd, failedCmds)
if err != nil && firstErr == nil {
firstErr = err
}
}

return failedCmds, firstErr
// Parse number of replies.
line, err := cn.Rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
}
return err
}

switch line[0] {
case proto.ErrorReply:
return proto.ParseErrorReply(line)
case proto.ArrayReply:
// ok
default:
err := fmt.Errorf("redis: expected '*', but got line %q", line)
return err
}

return firstErr
}
Loading

0 comments on commit 865d501

Please sign in to comment.