Skip to content

Commit

Permalink
Remove costly 'appendIfNotExists' and 'remove' call from PubSub (redi…
Browse files Browse the repository at this point in the history
…s#743)

* remove costly 'appendIfNotExists' and 'remove' call from pubsub
  • Loading branch information
superkinglabs authored and vmihailenco committed Mar 14, 2018
1 parent 17b6cf1 commit 877867d
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 36 deletions.
28 changes: 28 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1409,3 +1409,31 @@ func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
}
return append(nodes, node)
}

func appendIfNotExists(ss []string, es ...string) []string {
loop:
for _, e := range es {
for _, s := range ss {
if s == e {
continue loop
}
}
ss = append(ss, e)
}
return ss
}

func remove(ss []string, es ...string) []string {
if len(es) == 0 {
return ss[:0]
}
for _, e := range es {
for i, s := range ss {
if s == e {
ss = append(ss[:i], ss[i+1:]...)
break
}
}
}
return ss
}
70 changes: 34 additions & 36 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ type PubSub struct {

mu sync.Mutex
cn *pool.Conn
channels []string
patterns []string
channels map[string]struct{}
patterns map[string]struct{}
closed bool

cmd *Cmd
Expand Down Expand Up @@ -67,12 +67,24 @@ func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
func (c *PubSub) resubscribe(cn *pool.Conn) error {
var firstErr error
if len(c.channels) > 0 {
if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
channels := make([]string, len(c.channels))
i := 0
for channel := range c.channels {
channels[i] = channel
i++
}
if err := c._subscribe(cn, "subscribe", channels...); err != nil && firstErr == nil {
firstErr = err
}
}
if len(c.patterns) > 0 {
if err := c._subscribe(cn, "psubscribe", c.patterns...); err != nil && firstErr == nil {
patterns := make([]string, len(c.patterns))
i := 0
for pattern := range c.patterns {
patterns[i] = pattern
i++
}
if err := c._subscribe(cn, "psubscribe", patterns...); err != nil && firstErr == nil {
firstErr = err
}
}
Expand Down Expand Up @@ -132,7 +144,12 @@ func (c *PubSub) Close() error {
func (c *PubSub) Subscribe(channels ...string) error {
c.mu.Lock()
err := c.subscribe("subscribe", channels...)
c.channels = appendIfNotExists(c.channels, channels...)
if c.channels == nil {
c.channels = make(map[string]struct{})
}
for _, channel := range channels {
c.channels[channel] = struct{}{}
}
c.mu.Unlock()
return err
}
Expand All @@ -142,7 +159,12 @@ func (c *PubSub) Subscribe(channels ...string) error {
func (c *PubSub) PSubscribe(patterns ...string) error {
c.mu.Lock()
err := c.subscribe("psubscribe", patterns...)
c.patterns = appendIfNotExists(c.patterns, patterns...)
if c.patterns == nil {
c.patterns = make(map[string]struct{})
}
for _, pattern := range patterns {
c.patterns[pattern] = struct{}{}
}
c.mu.Unlock()
return err
}
Expand All @@ -152,7 +174,9 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
func (c *PubSub) Unsubscribe(channels ...string) error {
c.mu.Lock()
err := c.subscribe("unsubscribe", channels...)
c.channels = remove(c.channels, channels...)
for _, channel := range channels {
delete(c.channels, channel)
}
c.mu.Unlock()
return err
}
Expand All @@ -162,7 +186,9 @@ func (c *PubSub) Unsubscribe(channels ...string) error {
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.mu.Lock()
err := c.subscribe("punsubscribe", patterns...)
c.patterns = remove(c.patterns, patterns...)
for _, pattern := range patterns {
delete(c.patterns, pattern)
}
c.mu.Unlock()
return err
}
Expand Down Expand Up @@ -371,31 +397,3 @@ func (c *PubSub) Channel() <-chan *Message {
})
return c.ch
}

func appendIfNotExists(ss []string, es ...string) []string {
loop:
for _, e := range es {
for _, s := range ss {
if s == e {
continue loop
}
}
ss = append(ss, e)
}
return ss
}

func remove(ss []string, es ...string) []string {
if len(es) == 0 {
return ss[:0]
}
for _, e := range es {
for i, s := range ss {
if s == e {
ss = append(ss[:i], ss[i+1:]...)
break
}
}
}
return ss
}

0 comments on commit 877867d

Please sign in to comment.