-
Notifications
You must be signed in to change notification settings - Fork 10
/
sse.py
175 lines (141 loc) · 6.22 KB
/
sse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
from queue import Queue
from collections import defaultdict, namedtuple
Subscriber = namedtuple('Subscriber', 'queue, properties')
class Publisher(object):
"""
Contains a list of subscribers that can can receive updates.
Each subscriber can have its own private data and may subscribe to
different channel.
"""
END_STREAM = {}
def __init__(self):
"""
Creates a new publisher with an empty list of subscribers.
"""
self.subscribers_by_channel = defaultdict(list)
def _get_subscribers_lists(self, channel):
if isinstance(channel, str):
yield self.subscribers_by_channel[channel]
else:
for channel_name in channel:
yield self.subscribers_by_channel[channel_name]
def get_subscribers(self, channel='default channel'):
"""
Returns a generator of all subscribers in the given channel.
`channel` can either be a channel name (e.g. "secret room") or a list
of channel names (e.g. "['chat', 'global messages']"). It defaults to
the channel named "default channel".
"""
for subscriber_list in self._get_subscribers_lists(channel):
yield from subscriber_list
def _publish_single(self, data, queue):
"""
Publishes a single piece of data to a single user. Data is encoded as
required.
"""
str_data = str(data)
for line in str_data.split('\n'):
queue.put('data: {}\n'.format(line))
queue.put('\n')
def publish(self, data, channel='default channel'):
"""
Publishes data to all subscribers of the given channel.
`channel` can either be a channel name (e.g. "secret room") or a list
of channel names (e.g. "['chat', 'global messages']"). It defaults to
the channel named "default channel".
If data is callable, the return of `data(properties)` will be published
instead, for the `properties` object of each subscriber. This allows
for customized events.
"""
# Note we call `str` here instead of leaving it to each subscriber's
# `format` call. The reason is twofold: this caches the same between
# subscribers, and is not prone to time differences.
if callable(data):
for subscriber in self.get_subscribers(channel):
value = data(subscriber.properties)
if value:
self._publish_single(value, subscriber.queue)
else:
for subscriber in self.get_subscribers(channel):
self._publish_single(data, subscriber.queue)
def subscribe(self, channel='default channel', properties=None, initial_data=[]):
"""
Subscribes to the channel, returning an infinite generator of
Server-Sent-Events.
`channel` can either be a channel name (e.g. "secret room") or a list
of channel names (e.g. "['chat', 'global messages']"). It defaults to
the channel named "default channel".
If `properties` is passed, these will be used for differentiation if a
callable object is published (see `Publisher.publish`).
If the list `initial_data` is passed, all data there will be sent
before the regular channel process starts.
"""
queue = Queue()
properties = properties or {}
subscriber = Subscriber(queue, properties)
for data in initial_data:
self._publish_single(data, queue)
for subscribers_list in self._get_subscribers_lists(channel):
subscribers_list.append(subscriber)
return self._make_generator(queue)
def unsubscribe(self, channel='default channel', properties=None):
"""
`channel` can either be a channel name (e.g. "secret room") or a list
of channel names (e.g. "['chat', 'global messages']"). It defaults to
the channel named "default channel".
If `properties` is None, then all subscribers will be removed from selected
channel(s). If properties are provided then these are used to filter which
subscribers are removed. Only the subscribers exactly matching the properties
are unsubscribed.
"""
for subscribers_list in self._get_subscribers_lists(channel):
if properties is None:
subscribers_list[:] = []
else:
subscribers_list[:] = [subscriber for subscriber in subscribers_list if subscriber.properties != properties]
def _make_generator(self, queue):
"""
Returns a generator that reads data from the queue, emitting data
events, while the Publisher.END_STREAM value is not received.
"""
while True:
data = queue.get()
if data is Publisher.END_STREAM:
return
yield data
def close(self):
"""
Closes all active subscriptions.
"""
for channel in self.subscribers_by_channel.values():
for queue, _ in channel:
queue.put(Publisher.END_STREAM)
channel.clear()
if __name__ == '__main__':
# Starts an example chat application.
# Run this module and point your browser to http://localhost:5000
import cgi
import flask
publisher = Publisher()
app = flask.Flask(__name__, static_folder='static', static_url_path='')
@app.route('/publish', methods=['POST'])
def publish():
sender_username = flask.request.form['username']
chat_message = flask.request.form['message']
template = '<strong>{}</strong>: {}'
full_message = template.format(cgi.escape(sender_username),
cgi.escape(chat_message))
def m(subscriber_username):
if subscriber_username != sender_username:
return full_message
publisher.publish(m)
return ''
@app.route('/subscribe')
def subscribe():
username = flask.request.args.get('username')
return flask.Response(publisher.subscribe(properties=username),
content_type='text/event-stream')
@app.route('/')
def root():
return app.send_static_file('chat.html')
app.run(debug=True, threaded=True)