Skip to content

Commit

Permalink
Propagate context in Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Jul 25, 2019
1 parent 6bc7daa commit 52ec525
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {

func (c *ClusterClient) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processPipeline,
}
pipe.init()
Expand Down Expand Up @@ -1220,6 +1221,7 @@ func (c *ClusterClient) checkMovedErr(
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processTxPipeline,
}
pipe.init()
Expand Down
4 changes: 1 addition & 3 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ func appendArgs(dst, src []interface{}) []interface{} {
}
}

for _, v := range src {
dst = append(dst, v)
}
dst = append(dst, src...)
return dst
}

Expand Down
3 changes: 2 additions & 1 deletion pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Pipeline struct {
cmdable
statefulCmdable

ctx context.Context
exec pipelineExecer

mu sync.Mutex
Expand Down Expand Up @@ -98,7 +99,7 @@ func (c *Pipeline) discard() error {
// Exec always returns list of commands and error of the first failed
// command if any.
func (c *Pipeline) Exec() ([]Cmder, error) {
return c.ExecContext(context.Background())
return c.ExecContext(c.ctx)
}

func (c *Pipeline) ExecContext(ctx context.Context) ([]Cmder, error) {
Expand Down
34 changes: 23 additions & 11 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
return nil, err
}

err = c.initConn(cn)
err = c.initConn(ctx, cn)
if err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
Expand Down Expand Up @@ -169,7 +169,7 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
return nil, err
}

err = c.initConn(cn)
err = c.initConn(ctx, cn)
if err != nil {
c.connPool.Remove(cn)
return nil, err
Expand Down Expand Up @@ -202,7 +202,7 @@ func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
}
}

func (c *baseClient) initConn(cn *pool.Conn) error {
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
if cn.Inited {
return nil
}
Expand All @@ -215,7 +215,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {
return nil
}

conn := newConn(c.opt, cn)
conn := newConn(ctx, c.opt, cn)
_, err := conn.Pipelined(func(pipe Pipeliner) error {
if c.opt.Password != "" {
pipe.Auth(c.opt.Password)
Expand Down Expand Up @@ -547,6 +547,7 @@ func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {

func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processPipeline,
}
pipe.init()
Expand All @@ -560,6 +561,7 @@ func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processTxPipeline,
}
pipe.init()
Expand Down Expand Up @@ -625,27 +627,35 @@ func (c *Client) PSubscribe(channels ...string) *PubSub {

//------------------------------------------------------------------------------

// Conn is like Client, but its pool contains single connection.
type Conn struct {
type conn struct {
baseClient
cmdable
statefulCmdable
}

func newConn(opt *Options, cn *pool.Conn) *Conn {
// Conn is like Client, but its pool contains single connection.
type Conn struct {
*conn
ctx context.Context
}

func newConn(ctx context.Context, opt *Options, cn *pool.Conn) *Conn {
c := Conn{
baseClient: baseClient{
opt: opt,
connPool: pool.NewSingleConnPool(cn),
conn: &conn{
baseClient: baseClient{
opt: opt,
connPool: pool.NewSingleConnPool(cn),
},
},
ctx: ctx,
}
c.cmdable = c.Process
c.statefulCmdable = c.Process
return &c
}

func (c *Conn) Process(cmd Cmder) error {
return c.ProcessContext(context.TODO(), cmd)
return c.ProcessContext(c.ctx, cmd)
}

func (c *Conn) ProcessContext(ctx context.Context, cmd Cmder) error {
Expand All @@ -658,6 +668,7 @@ func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {

func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processPipeline,
}
pipe.init()
Expand All @@ -671,6 +682,7 @@ func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processTxPipeline,
}
pipe.init()
Expand Down
2 changes: 2 additions & 0 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {

func (c *Ring) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processPipeline,
}
pipe.init()
Expand All @@ -597,6 +598,7 @@ func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {

func (c *Ring) TxPipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processTxPipeline,
}
pipe.init()
Expand Down
5 changes: 3 additions & 2 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Tx) Watch(keys ...string) *StatusCmd {
args[1+i] = key
}
cmd := NewStatusCmd(args...)
c.Process(cmd)
_ = c.Process(cmd)
return cmd
}

Expand All @@ -105,13 +105,14 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
args[1+i] = key
}
cmd := NewStatusCmd(args...)
c.Process(cmd)
_ = c.Process(cmd)
return cmd
}

// Pipeline creates a new pipeline. It is more convenient to use Pipelined.
func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: c.processTxPipeline,
}
pipe.init()
Expand Down

0 comments on commit 52ec525

Please sign in to comment.