Skip to content

Commit

Permalink
Merge branch 'feature/transport-refactor'
Browse files Browse the repository at this point in the history
  • Loading branch information
mitsuhiko committed Nov 30, 2016
2 parents 908f371 + 71c9676 commit ebb3924
Show file tree
Hide file tree
Showing 16 changed files with 62 additions and 64 deletions.
3 changes: 3 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ Version 6.0.0
* Do not require `sys.argv` to be available any more.
* Tags defined on a logging handler will now be merged with individual log record's tags.
* Added explicit support for multidicts in the django client.
* Refactored transports to support multiple URLs. This might affect
you if you have custom subclasses of those. The main change is that
the URL parameter moved from the constructor into the `send` method.

Version 5.32.0
--------------
Expand Down
4 changes: 2 additions & 2 deletions raven/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,10 +691,10 @@ def failed_send(e):
try:
transport = self.remote.get_transport()
if transport.async:
transport.async_send(data, headers, self._successful_send,
transport.async_send(url, data, headers, self._successful_send,
failed_send)
else:
transport.send(data, headers)
transport.send(url, data, headers)
self._successful_send()
except Exception as e:
if self.raise_send_errors:
Expand Down
4 changes: 1 addition & 3 deletions raven/conf/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,12 @@ def __unicode__(self):
def is_active(self):
return all([self.base_url, self.project, self.public_key, self.secret_key])

# TODO(dcramer): we dont want transports bound to a URL
def get_transport(self):
if not self.store_endpoint:
return

if not hasattr(self, '_transport'):
parsed = urlparse(self.store_endpoint)
self._transport = self._transport_cls(parsed, **self.options)
self._transport = self._transport_cls(**self.options)
return self._transport

def get_public_dsn(self):
Expand Down
4 changes: 2 additions & 2 deletions raven/transport/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class Transport(object):
async = False
scheme = []

def send(self, data, headers):
def send(self, url, data, headers):
"""
You need to override this to do something with the actual
data. Usually - this is sending to a server
Expand All @@ -41,7 +41,7 @@ class AsyncTransport(Transport):

async = True

def async_send(self, data, headers, success_cb, error_cb):
def async_send(self, url, data, headers, success_cb, error_cb):
"""
Override this method for asynchronous transports. Call
`success_cb()` if the send succeeds or `error_cb(exception)`
Expand Down
17 changes: 8 additions & 9 deletions raven/transport/eventlet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,26 @@ class EventletHTTPTransport(HTTPTransport):

scheme = ['eventlet+http', 'eventlet+https']

def __init__(self, parsed_url, pool_size=100, **kwargs):
def __init__(self, pool_size=100, **kwargs):
if not has_eventlet:
raise ImportError('EventletHTTPTransport requires eventlet.')
super(EventletHTTPTransport, self).__init__(parsed_url, **kwargs)
# remove the eventlet+ from the protocol, as it is not a real protocol
self._url = self._url.split('+', 1)[-1]
super(EventletHTTPTransport, self).__init__(**kwargs)

def _send_payload(self, payload):
req = eventlet_urllib2.Request(self._url, headers=payload[1])
url, data, headers = payload
req = eventlet_urllib2.Request(url, headers=headers)
try:
if sys.version_info < (2, 6):
response = eventlet_urllib2.urlopen(req, payload[0]).read()
response = eventlet_urllib2.urlopen(req, data).read()
else:
response = eventlet_urllib2.urlopen(req, payload[0],
response = eventlet_urllib2.urlopen(req, data,
self.timeout).read()
return response
except Exception as err:
return err

def send(self, data, headers):
def send(self, url, data, headers):
"""
Spawn an async request to a remote webserver.
"""
eventlet.spawn(self._send_payload, (data, headers))
eventlet.spawn(self._send_payload, (url, data, headers))
8 changes: 4 additions & 4 deletions raven/transport/gevent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,23 @@ class GeventedHTTPTransport(AsyncTransport, HTTPTransport):

scheme = ['gevent+http', 'gevent+https']

def __init__(self, parsed_url, maximum_outstanding_requests=100, *args, **kwargs):
def __init__(self, maximum_outstanding_requests=100, *args, **kwargs):
if not has_gevent:
raise ImportError('GeventedHTTPTransport requires gevent.')

self._lock = Semaphore(maximum_outstanding_requests)

super(GeventedHTTPTransport, self).__init__(parsed_url, *args, **kwargs)
super(GeventedHTTPTransport, self).__init__(*args, **kwargs)

def async_send(self, data, headers, success_cb, failure_cb):
def async_send(self, url, data, headers, success_cb, failure_cb):
"""
Spawn an async request to a remote webserver.
"""
# this can be optimized by making a custom self.send that does not
# read the response since we don't use it.
self._lock.acquire()
return gevent.spawn(
super(GeventedHTTPTransport, self).send, data, headers
super(GeventedHTTPTransport, self).send, url, data, headers
).link(lambda x: self._done(x, success_cb, failure_cb))

def _done(self, greenlet, success_cb, failure_cb, *args):
Expand Down
9 changes: 3 additions & 6 deletions raven/transport/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
class HTTPTransport(Transport):
scheme = ['sync+http', 'sync+https']

def __init__(self, parsed_url, timeout=defaults.TIMEOUT, verify_ssl=True,
def __init__(self, timeout=defaults.TIMEOUT, verify_ssl=True,
ca_certs=defaults.CA_BUNDLE):
self._parsed_url = parsed_url
self._url = parsed_url.geturl().rsplit('+', 1)[-1]

if isinstance(timeout, string_types):
timeout = int(timeout)
if isinstance(verify_ssl, string_types):
Expand All @@ -32,11 +29,11 @@ def __init__(self, parsed_url, timeout=defaults.TIMEOUT, verify_ssl=True,
self.verify_ssl = verify_ssl
self.ca_certs = ca_certs

def send(self, data, headers):
def send(self, url, data, headers):
"""
Sends a request to a remote webserver using HTTP POST.
"""
req = urllib2.Request(self._url, headers=headers)
req = urllib2.Request(url, headers=headers)

try:
response = urlopen(
Expand Down
4 changes: 2 additions & 2 deletions raven/transport/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ def __init__(self, *args, **kwargs):

super(RequestsHTTPTransport, self).__init__(*args, **kwargs)

def send(self, data, headers):
def send(self, url, data, headers):
if self.verify_ssl:
# If SSL verification is enabled use the provided CA bundle to
# perform the verification.
self.verify_ssl = self.ca_certs
requests.post(self._url, data=data, headers=headers,
requests.post(url, data=data, headers=headers,
verify=self.verify_ssl, timeout=self.timeout)
8 changes: 4 additions & 4 deletions raven/transport/threaded.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,14 @@ def get_worker(self):
self._worker = AsyncWorker()
return self._worker

def send_sync(self, data, headers, success_cb, failure_cb):
def send_sync(self, url, data, headers, success_cb, failure_cb):
try:
super(ThreadedHTTPTransport, self).send(data, headers)
super(ThreadedHTTPTransport, self).send(url, data, headers)
except Exception as e:
failure_cb(e)
else:
success_cb()

def async_send(self, data, headers, success_cb, failure_cb):
def async_send(self, url, data, headers, success_cb, failure_cb):
self.get_worker().queue(
self.send_sync, data, headers, success_cb, failure_cb)
self.send_sync, url, data, headers, success_cb, failure_cb)
8 changes: 4 additions & 4 deletions raven/transport/threaded_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ def get_worker(self):
self._worker = AsyncWorker()
return self._worker

def send_sync(self, data, headers, success_cb, failure_cb):
def send_sync(self, url, data, headers, success_cb, failure_cb):
try:
super(ThreadedRequestsHTTPTransport, self).send(data, headers)
super(ThreadedRequestsHTTPTransport, self).send(url, data, headers)
except Exception as e:
failure_cb(e)
else:
success_cb()

def async_send(self, data, headers, success_cb, failure_cb):
def async_send(self, url, data, headers, success_cb, failure_cb):
self.get_worker().queue(
self.send_sync, data, headers, success_cb, failure_cb)
self.send_sync, url, data, headers, success_cb, failure_cb)
10 changes: 5 additions & 5 deletions raven/transport/tornado.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ class TornadoHTTPTransport(AsyncTransport, HTTPTransport):

scheme = ['tornado+http', 'tornado+https']

def __init__(self, parsed_url, *args, **kwargs):
def __init__(self, *args, **kwargs):
if not has_tornado:
raise ImportError('TornadoHTTPTransport requires tornado.')

super(TornadoHTTPTransport, self).__init__(parsed_url, *args, **kwargs)
super(TornadoHTTPTransport, self).__init__(*args, **kwargs)

def async_send(self, data, headers, success_cb, failure_cb):
def async_send(self, url, data, headers, success_cb, failure_cb):
kwargs = dict(method='POST', headers=headers, body=data)
kwargs["validate_cert"] = self.verify_ssl
kwargs["connect_timeout"] = self.timeout
Expand All @@ -41,12 +41,12 @@ def async_send(self, data, headers, success_cb, failure_cb):
client = AsyncHTTPClient()
kwargs['callback'] = None

future = client.fetch(self._url, **kwargs)
future = client.fetch(url, **kwargs)
ioloop.IOLoop.current().add_future(future, partial(self.handler, success_cb, failure_cb))
else:
client = HTTPClient()
try:
client.fetch(self._url, **kwargs)
client.fetch(url, **kwargs)
success_cb()
except Exception as e:
failure_cb(e)
Expand Down
8 changes: 4 additions & 4 deletions raven/transport/twisted.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,21 @@
class TwistedHTTPTransport(AsyncTransport, HTTPTransport):
scheme = ['twisted+http', 'twisted+https']

def __init__(self, parsed_url, *args, **kwargs):
def __init__(self, *args, **kwargs):
if not has_twisted:
raise ImportError('TwistedHTTPTransport requires twisted.web.')

super(TwistedHTTPTransport, self).__init__(parsed_url, *args, **kwargs)
super(TwistedHTTPTransport, self).__init__(*args, **kwargs)

# Import reactor as late as possible.
from twisted.internet import reactor

# Use a persistent connection pool.
self._agent = Agent(reactor, pool=HTTPConnectionPool(reactor))

def async_send(self, data, headers, success_cb, failure_cb):
def async_send(self, url, data, headers, success_cb, failure_cb):
d = self._agent.request(
b"POST", self._url,
b"POST", url,
bodyProducer=FileBodyProducer(io.BytesIO(data)),
headers=Headers(dict((k, [v]) for k, v in headers.items()))
)
Expand Down
4 changes: 2 additions & 2 deletions tests/base/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,13 @@ def test_async_send_remote_failover(self, should_try, get_transport):

# test recovery
client.send_remote('http://example.com/api/1/store/', client.encode({}))
success_cb = async_send.call_args[0][2]
success_cb = async_send.call_args[0][3]
success_cb()
self.assertEquals(client.state.status, client.state.ONLINE)

# test delayed raise of error
client.send_remote('http://example.com/api/1/store/', client.encode({}))
failure_cb = async_send.call_args[0][3]
failure_cb = async_send.call_args[0][4]
failure_cb(Exception())
self.assertEquals(client.state.status, client.state.ERROR)

Expand Down
8 changes: 4 additions & 4 deletions tests/transport/requests/test_threaded_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@ def __init__(self, *args, **kwargs):
self.events = []
self.send_delay = 0

def send_sync(self, data, headers, success_cb, failure_cb):
def send_sync(self, url, data, headers, success_cb, failure_cb):
# delay sending the message, to allow us to test that the shutdown
# hook waits correctly
time.sleep(self.send_delay)

self.events.append((data, headers, success_cb, failure_cb))
self.events.append((url, data, headers, success_cb, failure_cb))


class ThreadedTransportTest(TestCase):
Expand All @@ -38,11 +38,11 @@ def test_does_send(self, send):

def test_shutdown_waits_for_send(self):
url = urlparse(self.url)
transport = DummyThreadedScheme(url)
transport = DummyThreadedScheme()
transport.send_delay = 0.5

data = self.client.build_msg('raven.events.Message', message='foo')
transport.async_send(data, None, None, None)
transport.async_send(url, data, None, None, None)

time.sleep(0.1)

Expand Down
7 changes: 4 additions & 3 deletions tests/transport/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ class DummyScheme(Transport):

scheme = ['mock']

def __init__(self, parsed_url, timeout=5):
self._parsed_url = parsed_url
def __init__(self, timeout=5):
self.timeout = timeout

def send(self, data, headers):
def send(self, url, data, headers):
"""
Sends a request to a remote webserver
"""
self._url = url
self._data = data
self._headers = headers

Expand Down Expand Up @@ -56,6 +56,7 @@ def test_custom_transport(self):
c.send(**data)

mock_cls = c._transport_cache['mock://some_username:some_password@localhost:8143/1'].get_transport()
print(mock_cls.__dict__)

expected_message = zlib.decompress(c.encode(data))
actual_message = zlib.decompress(mock_cls._data)
Expand Down
Loading

0 comments on commit ebb3924

Please sign in to comment.