Skip to content

Commit

Permalink
Fix Tx pipeline hook
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Jan 9, 2021
1 parent b543ea9 commit 76fd0ea
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
6 changes: 5 additions & 1 deletion cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1256,10 +1256,13 @@ func (c *ClusterClient) TxPipelined(ctx context.Context, fn func(Pipeliner) erro
}

func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds, c._processTxPipeline)
return c.hooks.processTxPipeline(ctx, cmds, c._processTxPipeline)
}

func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) error {
// Trim multi .. exec.
cmds = cmds[1 : len(cmds)-1]

state, err := c.state.Get(ctx)
if err != nil {
setCmdsErr(cmds, err)
Expand Down Expand Up @@ -1295,6 +1298,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
if err == nil {
return
}

if attempt < c.opt.MaxRedirects {
if err := c.mapCmdsByNode(ctx, failedCmds, cmds); err != nil {
setCmdsErr(cmds, err)
Expand Down
8 changes: 4 additions & 4 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -820,14 +820,14 @@ var _ = Describe("ClusterClient", func() {

client.AddHook(&hook{
beforeProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: "))
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcessPipeline")
return ctx, nil
},
afterProcessPipeline: func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG"))
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcessPipeline")
return nil
},
Expand Down
1 change: 1 addition & 0 deletions internal/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func (p *ConnPool) ReapStaleConns() (int, error) {
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()

p.freeTurn()

if cn != nil {
Expand Down

0 comments on commit 76fd0ea

Please sign in to comment.