Skip to content

Commit

Permalink
pipeline now has its own interface "Pipelineable"
Browse files Browse the repository at this point in the history
  • Loading branch information
felipejfc committed May 1, 2017
1 parent 4c6e2ad commit 6fca4d5
Show file tree
Hide file tree
Showing 16 changed files with 70 additions and 42 deletions.
2 changes: 1 addition & 1 deletion bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func BenchmarkPipeline(b *testing.B) {

b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Set("key", "hello", 0)
pipe.Expire("key", time.Second)
return nil
Expand Down
8 changes: 4 additions & 4 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
}
}

func (c *ClusterClient) Pipeline() *Pipeline {
func (c *ClusterClient) Pipeline() Pipelineable {
pipe := Pipeline{
exec: c.pipelineExec,
}
Expand All @@ -683,7 +683,7 @@ func (c *ClusterClient) Pipeline() *Pipeline {
return &pipe
}

func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *ClusterClient) Pipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}

Expand Down Expand Up @@ -797,7 +797,7 @@ func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]C
}

// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *ClusterClient) TxPipeline() *Pipeline {
func (c *ClusterClient) TxPipeline() Pipelineable {
pipe := Pipeline{
exec: c.txPipelineExec,
}
Expand All @@ -806,7 +806,7 @@ func (c *ClusterClient) TxPipeline() *Pipeline {
return &pipe
}

func (c *ClusterClient) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *ClusterClient) TxPipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.TxPipeline().pipelined(fn)
}

Expand Down
14 changes: 7 additions & 7 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ var _ = Describe("ClusterClient", func() {
return err
}

_, err = tx.Pipelined(func(pipe *redis.Pipeline) error {
_, err = tx.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
Expand Down Expand Up @@ -449,7 +449,7 @@ var _ = Describe("ClusterClient", func() {

Describe("Pipeline", func() {
BeforeEach(func() {
pipe = client.Pipeline()
pipe = client.Pipeline().(*redis.Pipeline)
})

AfterEach(func() {
Expand All @@ -461,7 +461,7 @@ var _ = Describe("ClusterClient", func() {

Describe("TxPipeline", func() {
BeforeEach(func() {
pipe = client.TxPipeline()
pipe = client.TxPipeline().(*redis.Pipeline)
})

AfterEach(func() {
Expand Down Expand Up @@ -544,7 +544,7 @@ var _ = Describe("ClusterClient without nodes", func() {
})

It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand All @@ -571,7 +571,7 @@ var _ = Describe("ClusterClient without valid nodes", func() {
})

It("pipeline returns an error", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand All @@ -594,7 +594,7 @@ var _ = Describe("ClusterClient timeout", func() {
})

It("Pipeline timeouts", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand All @@ -612,7 +612,7 @@ var _ = Describe("ClusterClient timeout", func() {

It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
_, err := tx.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand Down
13 changes: 11 additions & 2 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ func formatSec(dur time.Duration) int64 {
}

type Cmdable interface {
Pipeline() *Pipeline
Pipelined(fn func(*Pipeline) error) ([]Cmder, error)
Pipeline() Pipelineable
Pipelined(fn func(Pipelineable) error) ([]Cmder, error)

Echo(message interface{}) *StringCmd
Ping() *StatusCmd
Expand Down Expand Up @@ -237,6 +237,15 @@ type Cmdable interface {
Command() *CommandsInfoCmd
}

type StatefulCmdable interface {
Auth(password string) *StatusCmd
Select(index int) *StatusCmd
ClientSetName(name string) *BoolCmd
ClientGetName() *StringCmd
ReadOnly() *StatusCmd
ReadWrite() *StatusCmd
}

var _ Cmdable = (*Client)(nil)
var _ Cmdable = (*Tx)(nil)
var _ Cmdable = (*Ring)(nil)
Expand Down
2 changes: 1 addition & 1 deletion commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var _ = Describe("Commands", func() {
Describe("server", func() {

It("should Auth", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Auth("password")
return nil
})
Expand Down
6 changes: 3 additions & 3 deletions example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func ExampleClient_Scan() {

func ExampleClient_Pipelined() {
var incr *redis.IntCmd
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
incr = pipe.Incr("pipelined_counter")
pipe.Expire("pipelined_counter", time.Hour)
return nil
Expand Down Expand Up @@ -187,7 +187,7 @@ func ExampleClient_Pipeline() {

func ExampleClient_TxPipelined() {
var incr *redis.IntCmd
_, err := client.TxPipelined(func(pipe *redis.Pipeline) error {
_, err := client.TxPipelined(func(pipe redis.Pipelineable) error {
incr = pipe.Incr("tx_pipelined_counter")
pipe.Expire("tx_pipelined_counter", time.Hour)
return nil
Expand Down Expand Up @@ -226,7 +226,7 @@ func ExampleClient_Watch() {
return err
}

_, err = tx.Pipelined(func(pipe *redis.Pipeline) error {
_, err = tx.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
return nil
})
Expand Down
21 changes: 20 additions & 1 deletion pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,17 @@ import (

type pipelineExecer func([]Cmder) error

type Pipelineable interface {
Cmdable
StatefulCmdable
Process(cmd Cmder) error
Close() error
Discard() error
discard() error
Exec() ([]Cmder, error)
pipelined(fn func(Pipelineable) error) ([]Cmder, error)
}

// Pipeline implements pipelining as described in
// http://redis.io/topics/pipelining. It's safe for concurrent use
// by multiple goroutines.
Expand Down Expand Up @@ -78,11 +89,19 @@ func (c *Pipeline) Exec() ([]Cmder, error) {
return cmds, c.exec(cmds)
}

func (c *Pipeline) pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *Pipeline) pipelined(fn func(Pipelineable) error) ([]Cmder, error) {
if err := fn(c); err != nil {
return nil, err
}
cmds, err := c.Exec()
_ = c.Close()
return cmds, err
}

func (c *Pipeline) Pipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.pipelined(fn)
}

func (c *Pipeline) Pipeline() Pipelineable {
return c
}
4 changes: 2 additions & 2 deletions pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ = Describe("pipelining", func() {

It("supports block style", func() {
var get *redis.StringCmd
cmds, err := client.Pipelined(func(pipe *redis.Pipeline) error {
cmds, err := client.Pipelined(func(pipe redis.Pipelineable) error {
get = pipe.Get("foo")
return nil
})
Expand Down Expand Up @@ -63,7 +63,7 @@ var _ = Describe("pipelining", func() {

Describe("Pipeline", func() {
BeforeEach(func() {
pipe = client.Pipeline()
pipe = client.Pipeline().(*redis.Pipeline)
})

assertPipeline()
Expand Down
2 changes: 1 addition & 1 deletion pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var _ = Describe("pool", func() {
var ping *redis.StatusCmd

err := client.Watch(func(tx *redis.Tx) error {
cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipelineable) error {
ping = pipe.Ping()
return nil
})
Expand Down
2 changes: 1 addition & 1 deletion race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ var _ = Describe("races", func() {
num, err := strconv.ParseInt(val, 10, 64)
Expect(err).NotTo(HaveOccurred())

cmds, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
cmds, err := tx.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
return nil
})
Expand Down
8 changes: 4 additions & 4 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *baseClient) initConn(cn *pool.Conn) error {

// Temp client for Auth and Select.
client := newClient(c.opt, pool.NewSingleConnPool(cn))
_, err := client.Pipelined(func(pipe *Pipeline) error {
_, err := client.Pipelined(func(pipe Pipelineable) error {
if c.opt.Password != "" {
pipe.Auth(c.opt.Password)
}
Expand Down Expand Up @@ -324,11 +324,11 @@ func (c *Client) PoolStats() *PoolStats {
}
}

func (c *Client) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *Client) Pipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}

func (c *Client) Pipeline() *Pipeline {
func (c *Client) Pipeline() Pipelineable {
pipe := Pipeline{
exec: c.pipelineExecer(c.pipelineProcessCmds),
}
Expand All @@ -337,7 +337,7 @@ func (c *Client) Pipeline() *Pipeline {
return &pipe
}

func (c *Client) TxPipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *Client) TxPipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.TxPipeline().pipelined(fn)
}

Expand Down
6 changes: 3 additions & 3 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ var _ = Describe("Client", func() {

It("should close Tx without closing the client", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
_, err := tx.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand Down Expand Up @@ -232,7 +232,7 @@ var _ = Describe("Client timeout", func() {
})

It("Pipeline timeouts", func() {
_, err := client.Pipelined(func(pipe *redis.Pipeline) error {
_, err := client.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand Down Expand Up @@ -263,7 +263,7 @@ var _ = Describe("Client timeout", func() {

It("Tx Pipeline timeouts", func() {
err := client.Watch(func(tx *redis.Tx) error {
_, err := tx.Pipelined(func(pipe *redis.Pipeline) error {
_, err := tx.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func (c *Ring) Close() error {
return firstErr
}

func (c *Ring) Pipeline() *Pipeline {
func (c *Ring) Pipeline() Pipelineable {
pipe := Pipeline{
exec: c.pipelineExec,
}
Expand All @@ -390,7 +390,7 @@ func (c *Ring) Pipeline() *Pipeline {
return &pipe
}

func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *Ring) Pipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}

Expand Down
6 changes: 3 additions & 3 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ var _ = Describe("Redis Ring", func() {
keys = append(keys, string(key))
}

_, err := ring.Pipelined(func(pipe *redis.Pipeline) error {
_, err := ring.Pipelined(func(pipe redis.Pipelineable) error {
for _, key := range keys {
pipe.Set(key, "value", 0).Err()
}
Expand All @@ -153,7 +153,7 @@ var _ = Describe("Redis Ring", func() {
})

It("supports hash tags", func() {
_, err := ring.Pipelined(func(pipe *redis.Pipeline) error {
_, err := ring.Pipelined(func(pipe redis.Pipelineable) error {
for i := 0; i < 100; i++ {
pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
}
Expand Down Expand Up @@ -184,7 +184,7 @@ var _ = Describe("empty Redis Ring", func() {
})

It("pipeline returns an error", func() {
_, err := ring.Pipelined(func(pipe *redis.Pipeline) error {
_, err := ring.Pipelined(func(pipe redis.Pipelineable) error {
pipe.Ping()
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (c *Tx) Unwatch(keys ...string) *StatusCmd {
return cmd
}

func (c *Tx) Pipeline() *Pipeline {
func (c *Tx) Pipeline() Pipelineable {
pipe := Pipeline{
exec: c.pipelineExecer(c.txPipelineProcessCmds),
}
Expand All @@ -94,6 +94,6 @@ func (c *Tx) Pipeline() *Pipeline {
// Exec always returns list of commands. If transaction fails
// TxFailedErr is returned. Otherwise Exec returns error of the first
// failed command or nil.
func (c *Tx) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
func (c *Tx) Pipelined(fn func(Pipelineable) error) ([]Cmder, error) {
return c.Pipeline().pipelined(fn)
}
Loading

0 comments on commit 6fca4d5

Please sign in to comment.