Skip to content

Commit

Permalink
Move Publish channel to cmdable. Remove method that was deprecated in…
Browse files Browse the repository at this point in the history
… v3.
  • Loading branch information
vmihailenco committed Jul 21, 2016
1 parent eca5d02 commit 4210c09
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 34 deletions.
7 changes: 7 additions & 0 deletions commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -1618,6 +1618,13 @@ func (c *cmdable) DebugObject(key string) *StringCmd {

//------------------------------------------------------------------------------

// Publish posts the message to the channel.
func (c *cmdable) Publish(channel, message string) *IntCmd {
cmd := NewIntCmd("PUBLISH", channel, message)
c.process(cmd)
return cmd
}

func (c *cmdable) PubSubChannels(pattern string) *StringSliceCmd {
args := []interface{}{"pubsub", "channels"}
if pattern != "*" {
Expand Down
4 changes: 2 additions & 2 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ var _ = Describe("pool", func() {
connPool.(*pool.ConnPool).DialLimiter = nil

perform(1000, func(id int) {
pubsub := client.PubSub()
Expect(pubsub.Subscribe()).NotTo(HaveOccurred())
pubsub, err := client.Subscribe()
Expect(err).NotTo(HaveOccurred())
Expect(pubsub.Close()).NotTo(HaveOccurred())
})

Expand Down
29 changes: 0 additions & 29 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,6 @@ import (
"gopkg.in/redis.v4/internal/pool"
)

// Posts a message to the given channel.
func (c *Client) Publish(channel, message string) *IntCmd {
req := NewIntCmd("PUBLISH", channel, message)
c.Process(req)
return req
}

// PubSub implements Pub/Sub commands as described in
// http://redis.io/topics/pubsub. It's NOT safe for concurrent use by
// multiple goroutines.
Expand All @@ -29,28 +22,6 @@ type PubSub struct {
nsub int // number of active subscriptions
}

// Deprecated. Use Subscribe/PSubscribe instead.
func (c *Client) PubSub() *PubSub {
return &PubSub{
base: baseClient{
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
},
}
}

// Subscribes the client to the specified channels.
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
pubsub := c.PubSub()
return pubsub, pubsub.Subscribe(channels...)
}

// Subscribes the client to the given patterns.
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
pubsub := c.PubSub()
return pubsub, pubsub.PSubscribe(channels...)
}

func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, err := c.base.conn()
if err != nil {
Expand Down
21 changes: 21 additions & 0 deletions redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,24 @@ func (c *Client) pipelineExec(cmds []Cmder) error {
}
return retErr
}

func (c *Client) pubSub() *PubSub {
return &PubSub{
base: baseClient{
opt: c.opt,
connPool: pool.NewStickyConnPool(c.connPool.(*pool.ConnPool), false),
},
}
}

// Subscribe subscribes the client to the specified channels.
func (c *Client) Subscribe(channels ...string) (*PubSub, error) {
pubsub := c.pubSub()
return pubsub, pubsub.Subscribe(channels...)
}

// PSubscribe subscribes the client to the given patterns.
func (c *Client) PSubscribe(channels ...string) (*PubSub, error) {
pubsub := c.pubSub()
return pubsub, pubsub.PSubscribe(channels...)
}
10 changes: 7 additions & 3 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ var _ = Describe("Client", func() {
})

It("should close pubsub without closing the client", func() {
pubsub := client.PubSub()
pubsub, err := client.Subscribe()
Expect(pubsub.Close()).NotTo(HaveOccurred())

_, err := pubsub.Receive()
_, err = pubsub.Receive()
Expect(err).To(MatchError("redis: client is closed"))
Expect(client.Ping().Err()).NotTo(HaveOccurred())
})
Expand Down Expand Up @@ -90,8 +90,12 @@ var _ = Describe("Client", func() {
})

It("should close pubsub when client is closed", func() {
pubsub := client.PubSub()
pubsub, err := client.Subscribe()
Expect(client.Close()).NotTo(HaveOccurred())

_, err = pubsub.Receive()
Expect(err).To(HaveOccurred())

Expect(pubsub.Close()).NotTo(HaveOccurred())
})

Expand Down

0 comments on commit 4210c09

Please sign in to comment.