Skip to content

Commit

Permalink
Change Tx.Pipeline and Tx.TxPipeline meaning
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Jan 12, 2020
1 parent 08dad1e commit dd4ef4e
Show file tree
Hide file tree
Showing 8 changed files with 36 additions and 30 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## v7 WIP

- Tx.Pipeline now returns a non-transactional pipeline. Use Tx.TxPipeline for a transactional pipeline.
- WrapProcess is replaced with more convenient AddHook that has access to context.Context.
- WithContext now can not be used to create a shallow copy of the client.
- New methods ProcessContext, DoContext, and ExecContext.
Expand Down
4 changes: 2 additions & 2 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ var _ = Describe("ClusterClient", func() {
return err
}

_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
Expand Down Expand Up @@ -1009,7 +1009,7 @@ var _ = Describe("ClusterClient timeout", func() {

It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func ExampleClient_Watch() {
n++

// runs only if the watched keys remain unchanged
_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
// pipe handles the error case
pipe.Set(key, n, 0)
return nil
Expand Down
2 changes: 1 addition & 1 deletion race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ var _ = Describe("races", func() {
num, err := strconv.ParseInt(val, 10, 64)
Expect(err).NotTo(HaveOccurred())

cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = Describe("Client", func() {

It("should close Tx without closing the client", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expand Down Expand Up @@ -286,7 +286,7 @@ var _ = Describe("Client timeout", func() {

It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expand Down
16 changes: 8 additions & 8 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ var _ = Describe("Ring watch", func() {
return err
}

_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
Expand Down Expand Up @@ -285,7 +285,7 @@ var _ = Describe("Ring watch", func() {

It("should discard", func() {
err := ring.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key1", "hello1", 0)
pipe.Discard()
pipe.Set("key2", "hello2", 0)
Expand All @@ -308,7 +308,7 @@ var _ = Describe("Ring watch", func() {

It("returns no error when there are no commands", func() {
err := ring.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
_, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
return err
}, "key")
Expect(err).NotTo(HaveOccurred())
Expand All @@ -322,7 +322,7 @@ var _ = Describe("Ring watch", func() {
const N = 20000

err := ring.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
for i := 0; i < N; i++ {
pipe.Incr("key")
}
Expand Down Expand Up @@ -358,7 +358,7 @@ var _ = Describe("Ring watch", func() {
num, err := strconv.ParseInt(val, 10, 64)
Expect(err).NotTo(HaveOccurred())

cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
return nil
})
Expand All @@ -380,7 +380,7 @@ var _ = Describe("Ring watch", func() {

It("should close Tx without closing the client", func() {
err := ring.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expand All @@ -396,7 +396,7 @@ var _ = Describe("Ring watch", func() {
var ping *redis.StatusCmd

err := ring.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
ping = pipe.Ping()
return nil
})
Expand Down Expand Up @@ -443,7 +443,7 @@ var _ = Describe("Ring Tx timeout", func() {

It("Tx Pipeline timeouts", func() {
err := ring.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expand Down
27 changes: 16 additions & 11 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,36 +116,41 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
return cmd
}

// Pipeline creates a new pipeline. It is more convenient to use Pipelined.
func (c *Tx) Pipeline() Pipeliner {
pipe := Pipeline{
ctx: c.ctx,
exec: func(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processPipeline)
},
}
pipe.init()
return &pipe
}

// Pipelined executes commands queued in the fn in a transaction.
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}

// TxPipelined executes commands queued in the fn in a transaction.
//
// When using WATCH, EXEC will execute commands only if the watched keys
// were not modified, allowing for a check-and-set mechanism.
//
// Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns an error of the first
// failed command or nil.
func (c *Tx) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(fn)
}

// TxPipelined is an alias for Pipelined.
func (c *Tx) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipelined(fn)
return c.TxPipeline().Pipelined(fn)
}

// TxPipeline is an alias for Pipeline.
// TxPipeline creates a new pipeline. Usually it is more convenient to use TxPipelined.
func (c *Tx) TxPipeline() Pipeliner {
return c.Pipeline()
pipe := Pipeline{
ctx: c.ctx,
exec: func(ctx context.Context, cmds []Cmder) error {
return c.hooks.processPipeline(ctx, cmds, c.baseClient.processTxPipeline)
},
}
pipe.init()
return &pipe
}
10 changes: 5 additions & 5 deletions tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ var _ = Describe("Tx", func() {
return err
}

_, err = tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
Expand Down Expand Up @@ -66,7 +66,7 @@ var _ = Describe("Tx", func() {

It("should discard", func() {
err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Set("key1", "hello1", 0)
pipe.Discard()
pipe.Set("key2", "hello2", 0)
Expand All @@ -89,7 +89,7 @@ var _ = Describe("Tx", func() {

It("returns no error when there are no commands", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(redis.Pipeliner) error { return nil })
_, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
return err
})
Expect(err).NotTo(HaveOccurred())
Expand All @@ -103,7 +103,7 @@ var _ = Describe("Tx", func() {
const N = 20000

err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
for i := 0; i < N; i++ {
pipe.Incr("key")
}
Expand Down Expand Up @@ -133,7 +133,7 @@ var _ = Describe("Tx", func() {

do := func() error {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe redis.Pipeliner) error {
_, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
pipe.Ping()
return nil
})
Expand Down

0 comments on commit dd4ef4e

Please sign in to comment.