Skip to content

Commit

Permalink
Create PubSub channel once
Browse files Browse the repository at this point in the history
  • Loading branch information
vmihailenco committed Oct 30, 2017
1 parent 3524815 commit 15f14b8
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ type PubSub struct {
closed bool

cmd *Cmd

chOnce sync.Once
ch chan *Message
}

func (c *PubSub) conn() (*pool.Conn, error) {
Expand Down Expand Up @@ -346,24 +349,27 @@ func (c *PubSub) receiveMessage(timeout time.Duration) (*Message, error) {
}
}

// Channel returns a channel for concurrently receiving messages.
// The channel is closed with PubSub.
// Channel returns a Go channel for concurrently receiving messages.
// The channel is closed with PubSub. Receive or ReceiveMessage APIs
// can not be used after channel is created.
func (c *PubSub) Channel() <-chan *Message {
ch := make(chan *Message, 100)
go func() {
for {
msg, err := c.ReceiveMessage()
if err != nil {
if err == pool.ErrClosed {
break
c.chOnce.Do(func() {
c.ch = make(chan *Message, 100)
go func() {
for {
msg, err := c.ReceiveMessage()
if err != nil {
if err == pool.ErrClosed {
break
}
continue
}
continue
c.ch <- msg
}
ch <- msg
}
close(ch)
}()
return ch
close(c.ch)
}()
})
return c.ch
}

func appendIfNotExists(ss []string, es ...string) []string {
Expand Down

0 comments on commit 15f14b8

Please sign in to comment.