forked from redis/go-redis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pubsub.go
129 lines (105 loc) · 2.67 KB
/
pubsub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package redis
import (
"fmt"
"sync"
)
type PubSubClient struct {
*BaseClient
ch chan *Message
once sync.Once
}
func newPubSubClient(client *Client) (*PubSubClient, error) {
return &PubSubClient{
BaseClient: &BaseClient{
ConnPool: NewSingleConnPool(client.ConnPool, false),
InitConn: client.InitConn,
},
ch: make(chan *Message),
}, nil
}
func (c *Client) PubSubClient() (*PubSubClient, error) {
return newPubSubClient(c)
}
func (c *Client) Publish(channel, message string) *IntReq {
req := NewIntReq("PUBLISH", channel, message)
c.Process(req)
return req
}
type Message struct {
Name, Channel, ChannelPattern, Message string
Number int64
Err error
}
func (c *PubSubClient) consumeMessages(conn *Conn) {
req := NewMultiBulkReq()
for {
for {
msg := &Message{}
replyI, err := req.ParseReply(conn.Rd)
if err != nil {
msg.Err = err
c.ch <- msg
break
}
reply := replyI.([]interface{})
msgName := reply[0].(string)
switch msgName {
case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
msg.Name = msgName
msg.Channel = reply[1].(string)
msg.Number = reply[2].(int64)
case "message":
msg.Name = msgName
msg.Channel = reply[1].(string)
msg.Message = reply[2].(string)
case "pmessage":
msg.Name = msgName
msg.ChannelPattern = reply[1].(string)
msg.Channel = reply[2].(string)
msg.Message = reply[3].(string)
default:
msg.Err = fmt.Errorf("Unsupported message name: %q.", msgName)
}
c.ch <- msg
if conn.Rd.Buffered() <= 0 {
break
}
}
}
}
func (c *PubSubClient) subscribe(cmd string, channels ...string) (chan *Message, error) {
args := append([]string{cmd}, channels...)
req := NewMultiBulkReq(args...)
conn, err := c.conn()
if err != nil {
return nil, err
}
if err := c.WriteReq(req.Req(), conn); err != nil {
return nil, err
}
c.once.Do(func() {
go c.consumeMessages(conn)
})
return c.ch, nil
}
func (c *PubSubClient) Subscribe(channels ...string) (chan *Message, error) {
return c.subscribe("SUBSCRIBE", channels...)
}
func (c *PubSubClient) PSubscribe(patterns ...string) (chan *Message, error) {
return c.subscribe("PSUBSCRIBE", patterns...)
}
func (c *PubSubClient) unsubscribe(cmd string, channels ...string) error {
args := append([]string{cmd}, channels...)
req := NewMultiBulkReq(args...)
conn, err := c.conn()
if err != nil {
return err
}
return c.WriteReq(req.Req(), conn)
}
func (c *PubSubClient) Unsubscribe(channels ...string) error {
return c.unsubscribe("UNSUBSCRIBE", channels...)
}
func (c *PubSubClient) PUnsubscribe(patterns ...string) error {
return c.unsubscribe("PUNSUBSCRIBE", patterns...)
}