Skip to content

Commit

Permalink
AMBARI-23330. Bump up ws4py websocket lib version from 0.4.2 to 0.5.1…
Browse files Browse the repository at this point in the history
… to fix connection drop (aonishuk)
  • Loading branch information
aonishuk committed Mar 22, 2018
1 parent 71915a4 commit 5a75b79
Show file tree
Hide file tree
Showing 17 changed files with 1,231 additions and 46 deletions.
4 changes: 2 additions & 2 deletions ambari-common/src/main/python/ambari_ws4py/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# copyright notice, this list of conditions and the following disclaimer
# in the documentation and/or other materials provided with the
# distribution.
# * Neither the name of ambari_ws4py nor the names of its
# * Neither the name of ws4py nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
Expand All @@ -30,7 +30,7 @@
import logging.handlers as handlers

__author__ = "Sylvain Hellegouarch"
__version__ = "0.4.2"
__version__ = "0.5.1"
__all__ = ['WS_KEY', 'WS_VERSION', 'configure_logger', 'format_addresses']

WS_KEY = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
Expand Down
126 changes: 126 additions & 0 deletions ambari-common/src/main/python/ambari_ws4py/async_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# -*- coding: utf-8 -*-
__doc__ = """
WebSocket implementation that relies on two new Python
features:
* asyncio to provide the high-level interface above transports
* yield from to delegate to the reading stream whenever more
bytes are required
You can use these implementations in that context
and benefit from those features whilst using ws4py.
Strictly speaking this module probably doesn't have to
be called async_websocket but it feels this will be its typical
usage and is probably more readable than
delegated_generator_websocket_on_top_of_asyncio.py
"""
import asyncio
import types

from ambari_ws4py.websocket import WebSocket as _WebSocket
from ambari_ws4py.messaging import Message

__all__ = ['WebSocket', 'EchoWebSocket']

class WebSocket(_WebSocket):
def __init__(self, proto):
"""
A :pep:`3156` ready websocket handler that works
well in a coroutine-aware loop such as the one provided
by the asyncio module.
The provided `proto` instance is a
:class:`asyncio.Protocol` subclass instance that will
be used internally to read and write from the
underlying transport.
Because the base :class:`ws4py.websocket.WebSocket`
class is still coupled a bit to the socket interface,
we have to override a little more than necessary
to play nice with the :pep:`3156` interface. Hopefully,
some day this will be cleaned out.
"""
_WebSocket.__init__(self, None)
self.started = False
self.proto = proto

@property
def local_address(self):
"""
Local endpoint address as a tuple
"""
if not self._local_address:
self._local_address = self.proto.reader.transport.get_extra_info('sockname')
if len(self._local_address) == 4:
self._local_address = self._local_address[:2]
return self._local_address

@property
def peer_address(self):
"""
Peer endpoint address as a tuple
"""
if not self._peer_address:
self._peer_address = self.proto.reader.transport.get_extra_info('peername')
if len(self._peer_address) == 4:
self._peer_address = self._peer_address[:2]
return self._peer_address

def once(self):
"""
The base class directly is used in conjunction with
the :class:`ws4py.manager.WebSocketManager` which is
not actually used with the asyncio implementation
of ws4py. So let's make it clear it shan't be used.
"""
raise NotImplemented()

def close_connection(self):
"""
Close the underlying transport
"""
@asyncio.coroutine
def closeit():
yield from self.proto.writer.drain()
self.proto.writer.close()
asyncio.async(closeit())

def _write(self, data):
"""
Write to the underlying transport
"""
@asyncio.coroutine
def sendit(data):
self.proto.writer.write(data)
yield from self.proto.writer.drain()
asyncio.async(sendit(data))

@asyncio.coroutine
def run(self):
"""
Coroutine that runs until the websocket
exchange is terminated. It also calls the
`opened()` method to indicate the exchange
has started.
"""
self.started = True
try:
self.opened()
reader = self.proto.reader
while True:
data = yield from reader.read(self.reading_buffer_size)
if not self.process(data):
return False
finally:
self.terminate()

return True

class EchoWebSocket(WebSocket):
def received_message(self, message):
"""
Automatically sends back the provided ``message`` to
its originating endpoint.
"""
self.send(message.data, message.is_binary)
25 changes: 15 additions & 10 deletions ambari-common/src/main/python/ambari_ws4py/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

class WebSocketBaseClient(WebSocket):
def __init__(self, url, protocols=None, extensions=None,
heartbeat_freq=None, ssl_options=None, headers=None):
heartbeat_freq=None, ssl_options=None, headers=None, exclude_headers=None):
"""
A websocket client that implements :rfc:`6455` and provides a simple
interface to communicate with a websocket server.
Expand All @@ -36,23 +36,23 @@ def __init__(self, url, protocols=None, extensions=None,
.. code-block:: python
>>> from websocket.client import WebSocketBaseClient
>>> from ambari_ws4py.client import WebSocketBaseClient
>>> ws = WebSocketBaseClient('ws://localhost/ws')
Here is an example for a TCP client over SSL:
.. code-block:: python
>>> from websocket.client import WebSocketBaseClient
>>> from ambari_ws4py.client import WebSocketBaseClient
>>> ws = WebSocketBaseClient('wss://localhost/ws')
Finally an example of a Unix-domain connection:
.. code-block:: python
>>> from websocket.client import WebSocketBaseClient
>>> from ambari_ws4py.client import WebSocketBaseClient
>>> ws = WebSocketBaseClient('ws+unix:///tmp/my.sock')
Note that in this case, the initial Upgrade request
Expand All @@ -61,7 +61,7 @@ def __init__(self, url, protocols=None, extensions=None,
.. code-block:: python
>>> from websocket.client import WebSocketBaseClient
>>> from ambari_ws4py.client import WebSocketBaseClient
>>> ws = WebSocketBaseClient('ws+unix:///tmp/my.sock')
>>> ws.resource = '/ws'
>>> ws.connect()
Expand All @@ -78,6 +78,8 @@ def __init__(self, url, protocols=None, extensions=None,
self.resource = None
self.ssl_options = ssl_options or {}
self.extra_headers = headers or []
self.exclude_headers = exclude_headers or []
self.exclude_headers = [x.lower() for x in self.exclude_headers]

if self.scheme == "wss":
# Prevent check_hostname requires server_hostname (ref #187)
Expand Down Expand Up @@ -211,7 +213,7 @@ def connect(self):
# default port is now 443; upgrade self.sender to send ssl
self.sock = ssl.wrap_socket(self.sock, **self.ssl_options)
self._is_secure = True

self.sock.settimeout(10.0)
self.sock.connect(self.bind_addr)

Expand Down Expand Up @@ -258,14 +260,15 @@ def handshake_headers(self):
('Sec-WebSocket-Key', self.key.decode('utf-8')),
('Sec-WebSocket-Version', str(max(WS_VERSION)))
]

if self.protocols:
headers.append(('Sec-WebSocket-Protocol', ','.join(self.protocols)))

if self.extra_headers:
headers.extend(self.extra_headers)

if not any(x for x in headers if x[0].lower() == 'origin'):
if not any(x for x in headers if x[0].lower() == 'origin') and \
'origin' not in self.exclude_headers:

scheme, url = self.url.split(":", 1)
parsed = urlsplit(url, scheme="http")
Expand All @@ -278,6 +281,8 @@ def handshake_headers(self):
origin = origin + ':' + str(parsed.port)
headers.append(('Origin', origin))

headers = [x for x in headers if x[0].lower() not in self.exclude_headers]

return headers

@property
Expand Down Expand Up @@ -329,10 +334,10 @@ def process_handshake_header(self, headers):
raise HandshakeError("Invalid challenge response: %s" % value)

elif header == b'sec-websocket-protocol':
protocols = ','.join(value)
protocols.extend([x.strip() for x in value.split(b',')])

elif header == b'sec-websocket-extensions':
extensions = ','.join(value)
extensions.extend([x.strip() for x in value.split(b',')])

return protocols, extensions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
__all__ = ['WebSocketClient']

class WebSocketClient(WebSocketBaseClient):
def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None, ssl_options=None, headers=None):
def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None, ssl_options=None, headers=None, exclude_headers=None):
"""
WebSocket client that executes the
:meth:`run() <ambari_ws4py.websocket.WebSocket.run>` into a gevent greenlet.
:meth:`run() <ws4py.websocket.WebSocket.run>` into a gevent greenlet.
.. code-block:: python
Expand Down Expand Up @@ -41,7 +41,7 @@ def outgoing():
gevent.joinall(greenlets)
"""
WebSocketBaseClient.__init__(self, url, protocols, extensions, heartbeat_freq,
ssl_options=ssl_options, headers=headers)
ssl_options=ssl_options, headers=headers, exclude_headers=exclude_headers)
self._th = Greenlet(self.run)

self.messages = Queue()
Expand Down Expand Up @@ -75,18 +75,22 @@ def closed(self, code, reason=None):
# to wait for
self.messages.put(StopIteration)

def receive(self):
def receive(self, block=True):
"""
Returns messages that were stored into the
`messages` queue and returns `None` when the
websocket is terminated or closed.
`block` is passed though the gevent queue `.get()` method, which if
True will block until an item in the queue is available. Set this to
False if you just want to check the queue, which will raise an
Empty exception you need to handle if there is no message to return.
"""
# If the websocket was terminated and there are no messages
# left in the queue, return None immediately otherwise the client
# will block forever
if self.terminated and self.messages.empty():
return None
message = self.messages.get()
message = self.messages.get(block=block)
if message is StopIteration:
return None
return message
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

class WebSocketClient(WebSocketBaseClient):
def __init__(self, url, protocols=None, extensions=None, heartbeat_freq=None,
ssl_options=None, headers=None):
ssl_options=None, headers=None, exclude_headers=None):
"""
.. code-block:: python
Expand All @@ -32,7 +32,7 @@ def received_message(self, m):
"""
WebSocketBaseClient.__init__(self, url, protocols, extensions, heartbeat_freq,
ssl_options, headers=headers)
ssl_options, headers=headers, exclude_headers=exclude_headers)
self._th = threading.Thread(target=self.run, name='WebSocketClient')
self._th.daemon = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class TornadoWebSocketClient(WebSocketBaseClient):
def __init__(self, url, protocols=None, extensions=None,
io_loop=None, ssl_options=None, headers=None):
io_loop=None, ssl_options=None, headers=None, exclude_headers=None):
"""
.. code-block:: python
Expand All @@ -32,7 +32,7 @@ def closed(self, code, reason=None):
ioloop.IOLoop.instance().start()
"""
WebSocketBaseClient.__init__(self, url, protocols, extensions,
ssl_options=ssl_options, headers=headers)
ssl_options=ssl_options, headers=headers, exclude_headers=exclude_headers)
if self.scheme == "wss":
self.sock = ssl.wrap_socket(self.sock, do_handshake_on_connect=False, **self.ssl_options)
self._is_secure = True
Expand Down
2 changes: 1 addition & 1 deletion ambari-common/src/main/python/ambari_ws4py/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
__doc__ = """
This compatibility module is inspired by the one found
in CherryPy. It provides a common entry point for the various
functions and types that are used with ambari_ws4py but which
functions and types that are used with ws4py but which
differ from Python 2.x to Python 3.x
There are likely better ways for some of them so feel
Expand Down
10 changes: 5 additions & 5 deletions ambari-common/src/main/python/ambari_ws4py/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ def add(self, websocket):
"""
Manage a new websocket.
First calls its :meth:`opened() <ambari_ws4py.websocket.WebSocket.opened>`
First calls its :meth:`opened() <ws4py.websocket.WebSocket.opened>`
method and register its socket against the poller
for reading events.
"""
Expand All @@ -261,7 +261,7 @@ def remove(self, websocket):
"""
Remove the given ``websocket`` from the manager.
This does not call its :meth:`closed() <ambari_ws4py.websocket.WebSocket.closed>`
This does not call its :meth:`closed() <ws4py.websocket.WebSocket.closed>`
method as it's out-of-band by your application
or from within the manager's run loop.
"""
Expand Down Expand Up @@ -292,8 +292,8 @@ def run(self):
call related websockets' `once` method to
read and process the incoming data.
If the :meth:`once() <ambari_ws4py.websocket.WebSocket.once>`
method returns a `False` value, its :meth:`terminate() <ambari_ws4py.websocket.WebSocket.terminate>`
If the :meth:`once() <ws4py.websocket.WebSocket.once>`
method returns a `False` value, its :meth:`terminate() <ws4py.websocket.WebSocket.terminate>`
method is also applied to properly close
the websocket and its socket is unregistered from the poller.
Expand Down Expand Up @@ -335,7 +335,7 @@ def run(self):

def close_all(self, code=1001, message='Server is shutting down'):
"""
Execute the :meth:`close() <ambari_ws4py.websocket.WebSocket.close>`
Execute the :meth:`close() <ws4py.websocket.WebSocket.close>`
method of each registered websockets to initiate the closing handshake.
It doesn't wait for the handshake to complete properly.
"""
Expand Down
2 changes: 1 addition & 1 deletion ambari-common/src/main/python/ambari_ws4py/messaging.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def single(self, mask=False):

def fragment(self, first=False, last=False, mask=False):
"""
Returns a :class:`ambari_ws4py.framing.Frame` bytes.
Returns a :class:`ws4py.framing.Frame` bytes.
The behavior depends on the given flags:
Expand Down
Empty file.
Loading

0 comments on commit 5a75b79

Please sign in to comment.