Skip to content

Commit

Permalink
Merge pull request #13 from depop/queue-argument-support
Browse files Browse the repository at this point in the history
Add support for arbitrary queue arguments
  • Loading branch information
anentropic authored Dec 4, 2018
2 parents 700b968 + 55e4e8e commit 9034be8
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 45 deletions.
50 changes: 34 additions & 16 deletions event_consumer/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

from event_consumer.conf import settings
from event_consumer.errors import InvalidQueueRegistration, NoExchange, PermanentFailure
from event_consumer.types import QueueRegistration
from event_consumer.types import HandlerRegistration, QueueKey

if settings.USE_DJANGO:
from django.core.signals import request_finished
Expand All @@ -30,19 +30,18 @@


# Maps routing-keys to handlers
REGISTRY = {} # type: Dict[QueueRegistration, Callable]
REGISTRY = {} # type: Dict[QueueKey, HandlerRegistration]

DEFAULT_EXCHANGE = 'default'


def _validate_registration(register_key): # type: (QueueRegistration) -> None
def _validate_registration(register_key): # type: (QueueKey) -> None
"""
Raises:
InvalidQueueRegistration
"""
global REGISTRY
existing = {(r.queue, r.exchange) for r in REGISTRY.keys()}
if (register_key.queue, register_key.exchange) in existing:
if register_key in REGISTRY:
raise InvalidQueueRegistration(
'Attempted duplicate registrations for messages with the queue name '
'"{queue}" and exchange "{exchange}"'.format(
Expand All @@ -54,7 +53,8 @@ def _validate_registration(register_key): # type: (QueueRegistration) -> None

def message_handler(routing_keys, # type: Union[str, Iterable]
queue=None, # type: Optional[str]
exchange=DEFAULT_EXCHANGE # type: str
exchange=DEFAULT_EXCHANGE, # type: str
queue_arguments=None, # Optional[Dict[str, object]]
):
# type: (...) -> Callable[[Callable], Any]
"""
Expand All @@ -78,6 +78,8 @@ def message_handler(routing_keys, # type: Union[str, Iterable]
default queue name without prepending `QUEUE_NAME_PREFIX`.
exchange: The AMQP exchange config to use. This is a *key name*
in the `settings.EXCHANGES` dict.
queue_arguments: Arbitrary arguments to be passed to the *primary* queue
at creation time.
Returns:
Callable: function decorator
Expand Down Expand Up @@ -111,6 +113,8 @@ def process_message(body):
"separate handlers for each routing key in this case."
)

queue_arguments = queue_arguments or {}

def decorator(f): # type: (Callable) -> Callable
global REGISTRY

Expand All @@ -120,11 +124,22 @@ def decorator(f): # type: (Callable) -> Callable
# kombu.Consumer has no concept of routing-key (only queue name) so
# so handler registrations must be unique on queue+exchange (otherwise
# messages from the queue would be randomly sent to the duplicate handlers)
register_key = QueueRegistration(routing_key, queue_name, exchange)
register_key = QueueKey(queue=queue_name, exchange=exchange)
_validate_registration(register_key)

REGISTRY[register_key] = f
_logger.debug('registered: %s to handler: %s.%s', register_key, f.__module__, f.__name__)
handler_registration = HandlerRegistration(
routing_key=routing_key,
queue_arguments=queue_arguments,
handler=f,
)
REGISTRY[register_key] = handler_registration

_logger.debug(
'registered: %s to handler: %s.%s',
register_key,
f.__module__,
f.__name__
)

return f

Expand All @@ -146,7 +161,7 @@ class AMQPRetryConsumerStep(bootsteps.StartStopStep):

def __init__(self, *args, **kwargs):
self.handlers = [] # type: List[AMQPRetryHandler]
self._tasks = kwargs.pop('tasks', REGISTRY) # type: Dict[QueueRegistration, Callable]
self._tasks = kwargs.pop('tasks', REGISTRY) # type: Dict[QueueKey, HandlerRegistration]
super(AMQPRetryConsumerStep, self).__init__(*args, **kwargs)

def start(self, c):
Expand Down Expand Up @@ -177,14 +192,15 @@ def _close(self, c, cancel_consumers=True):
def get_handlers(self, channel):
return [
AMQPRetryHandler(
channel,
queue_registration.routing_key,
queue_registration.queue,
queue_registration.exchange,
func,
channel=channel,
routing_key=handler_registration.routing_key,
queue=queue_key.queue,
exchange=queue_key.exchange,
queue_arguments=handler_registration.queue_arguments,
func=handler_registration.handler,
backoff_func=settings.BACKOFF_FUNC,
)
for queue_registration, func in self._tasks.items()
for queue_key, handler_registration in self._tasks.items()
]


Expand All @@ -203,6 +219,7 @@ def __init__(self,
routing_key, # type: str
queue, # type: str
exchange, # type: str
queue_arguments, # type: Dict[str, str]
func, # type: Callable[[Any], Any]
backoff_func=None # type: Optional[Callable[[int], float]]
):
Expand Down Expand Up @@ -230,6 +247,7 @@ def __init__(self,
exchange=self.exchanges[exchange],
routing_key=self.routing_key,
channel=self.channel,
queue_arguments=queue_arguments,
)

self.retry_queue = kombu.Queue(
Expand Down
21 changes: 17 additions & 4 deletions event_consumer/types.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
from typing import NamedTuple
from typing import NamedTuple, Dict, Callable


# Used by handlers.REGISTRY as keys
QueueRegistration = NamedTuple(
'QueueRegistration',
[('routing_key', str), ('queue', str), ('exchange', str)]
QueueKey = NamedTuple(
'QueueKey',
[
('queue', str),
('exchange', str),
]
)


HandlerRegistration = NamedTuple(
'HandlerRegistration',
[
('routing_key', str),
('queue_arguments', Dict[str, str]),
('handler', Callable),
]
)
1 change: 1 addition & 0 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def setUp(self):
routing_key=self.routing_key,
queue=self.routing_key,
exchange=self.exchange,
queue_arguments={},
func=lambda body: None,
backoff_func=lambda attempt: 0,
)
Expand Down
127 changes: 102 additions & 25 deletions tests/test_consumer_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from event_consumer import handlers as ec
from event_consumer.conf import settings
from event_consumer.errors import InvalidQueueRegistration
from event_consumer.types import QueueRegistration
from event_consumer.types import QueueKey


def test_get_handlers_with_defaults():
Expand All @@ -27,8 +27,15 @@ def f2(body):

assert len(reg) == 2

assert f1 is reg[QueueRegistration('my.routing.key1', 'my.routing.key1', 'default')]
assert f2 is reg[QueueRegistration('my.routing.key2', 'my.routing.key2', 'default')]
handler_reg1 = reg[QueueKey(queue='my.routing.key1', exchange='default')]
assert handler_reg1.handler is f1
assert handler_reg1.routing_key == 'my.routing.key1'
assert handler_reg1.queue_arguments == {}

handler_reg2 = reg[QueueKey(queue='my.routing.key2', exchange='default')]
assert handler_reg2.handler is f2
assert handler_reg2.routing_key == 'my.routing.key2'
assert handler_reg2.queue_arguments == {}

step = ec.AMQPRetryConsumerStep(None)
handlers = step.get_handlers(channel=mock.MagicMock())
Expand All @@ -40,8 +47,8 @@ def f2(body):
assert len(handler.consumer.queues) == 1
assert len(handler.consumer.callbacks) == 1
assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler)
key = (handler.routing_key, handler.queue, handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key]
key = QueueKey(queue=handler.queue, exchange=handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key].handler


@override_settings(settings, QUEUE_NAME_PREFIX='myapp:')
Expand All @@ -68,12 +75,19 @@ def f2(body):

assert len(reg) == 2

assert f1 is reg[
QueueRegistration('my.routing.key1', 'myapp:my.routing.key1', 'custom')
handler_reg1 = reg[
QueueKey(queue='myapp:my.routing.key1', exchange='custom')
]
assert f2 is reg[
QueueRegistration('my.routing.key2', 'myapp:my.routing.key2', 'custom')
assert handler_reg1.handler is f1
assert handler_reg1.routing_key == 'my.routing.key1'
assert handler_reg1.queue_arguments == {}

handler_reg2 = reg[
QueueKey(queue='myapp:my.routing.key2', exchange='custom')
]
assert handler_reg2.handler is f2
assert handler_reg2.routing_key == 'my.routing.key2'
assert handler_reg2.queue_arguments == {}

step = ec.AMQPRetryConsumerStep(None)
handlers = step.get_handlers(channel=mock.MagicMock())
Expand All @@ -85,8 +99,8 @@ def f2(body):
assert len(handler.consumer.queues) == 1
assert len(handler.consumer.callbacks) == 1
assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler)
key = (handler.routing_key, handler.queue, handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key]
key = QueueKey(queue=handler.queue, exchange=handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key].handler


@override_settings(settings, EXCHANGES={'my.exchange1': {}, 'my.exchange2': {}})
Expand Down Expand Up @@ -117,9 +131,27 @@ def f3(body):
return None

assert len(reg) == 3
assert f1 is reg[QueueRegistration('my.routing.key1', 'my.queue1', 'my.exchange1')]
assert f2 is reg[QueueRegistration('my.routing.key2', 'my.queue2', 'my.exchange1')]
assert f3 is reg[QueueRegistration('my.routing.key2', 'my.queue2', 'my.exchange2')]

handler_reg1 = reg[
QueueKey(queue='my.queue1', exchange='my.exchange1')
]
assert handler_reg1.handler is f1
assert handler_reg1.routing_key == 'my.routing.key1'
assert handler_reg1.queue_arguments == {}

handler_reg2 = reg[
QueueKey(queue='my.queue2', exchange='my.exchange1')
]
assert handler_reg2.handler is f2
assert handler_reg2.routing_key == 'my.routing.key2'
assert handler_reg2.queue_arguments == {}

handler_reg3 = reg[
QueueKey(queue='my.queue2', exchange='my.exchange2')
]
assert handler_reg3.handler is f3
assert handler_reg3.routing_key == 'my.routing.key2'
assert handler_reg3.queue_arguments == {}

step = ec.AMQPRetryConsumerStep(None)
handlers = step.get_handlers(channel=mock.MagicMock())
Expand All @@ -131,8 +163,37 @@ def f3(body):
assert len(handler.consumer.queues) == 1
assert len(handler.consumer.callbacks) == 1
assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler)
key = (handler.routing_key, handler.queue, handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key]
key = QueueKey(queue=handler.queue, exchange=handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key].handler


def test_get_handlers_with_queue_arguments():
"""
Should build handlers from tasks decorated with `@message_handler`
and pass the `queue_arguments` through to `kombu.Queue` constructor.
"""
with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg:

@message_handler('my.routing.key1', queue_arguments={'x-fake-header': 'wtf'})
def f1(body):
return None

handler_reg1 = reg[QueueKey(queue='my.routing.key1', exchange='default')]
assert handler_reg1.handler is f1
assert handler_reg1.routing_key == 'my.routing.key1'
assert handler_reg1.queue_arguments == {'x-fake-header': 'wtf'}

with mock.patch('kombu.Queue'):
step = ec.AMQPRetryConsumerStep(None)
handlers = step.get_handlers(channel=mock.MagicMock())

assert len(reg) == 1
assert len(handlers) == len(reg)

handler = handlers[0]
assert isinstance(handler, ec.AMQPRetryHandler)
assert isinstance(handler.worker_queue, mock.MagicMock)
handler.worker_queue.queue_arguments = {'x-fake-header': 'wtf'}


def test_get_handlers_no_exchange():
Expand Down Expand Up @@ -168,7 +229,12 @@ def f2(body):

assert len(reg) == 1

assert f1 is reg[QueueRegistration('my.routing.key1', 'custom_queue', 'custom')]
handler_reg1 = reg[
QueueKey(queue='custom_queue', exchange='custom')
]
assert handler_reg1.handler is f1
assert handler_reg1.routing_key == 'my.routing.key1'
assert handler_reg1.queue_arguments == {}

step = ec.AMQPRetryConsumerStep(None)
handlers = step.get_handlers(channel=mock.MagicMock())
Expand All @@ -180,15 +246,14 @@ def f2(body):
assert len(handler.consumer.queues) == 1
assert len(handler.consumer.callbacks) == 1
assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler)
key = (handler.routing_key, handler.queue, handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key]
key = QueueKey(queue=handler.queue, exchange=handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key].handler


@override_settings(settings, EXCHANGES={'my.exchange1': {}, 'my.exchange2': {}})
def test_get_handlers_with_multiple_routes(*mocks):
"""
Should build handlers from tasks decorated with `@message_handler`
using the specified routing key, queue and exchange
Can connect the handler to multiple routing keys, each having a queue.
"""
with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg:

Expand All @@ -203,8 +268,20 @@ def f1(body):
return None

assert len(reg) == 2
assert f1 is reg[QueueRegistration('my.routing.key1', 'my.routing.key1', 'default')]
assert f1 is reg[QueueRegistration('my.routing.key2', 'my.routing.key2', 'default')]

handler_reg1 = reg[
QueueKey(queue='my.routing.key1', exchange='default')
]
assert handler_reg1.handler is f1
assert handler_reg1.routing_key == 'my.routing.key1'
assert handler_reg1.queue_arguments == {}

handler_reg2 = reg[
QueueKey(queue='my.routing.key2', exchange='default')
]
assert handler_reg2.handler is f1
assert handler_reg2.routing_key == 'my.routing.key2'
assert handler_reg2.queue_arguments == {}

step = ec.AMQPRetryConsumerStep(None)
handlers = step.get_handlers(channel=mock.MagicMock())
Expand All @@ -216,5 +293,5 @@ def f1(body):
assert len(handler.consumer.queues) == 1
assert len(handler.consumer.callbacks) == 1
assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler)
key = (handler.routing_key, handler.queue, handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key]
key = QueueKey(queue=handler.queue, exchange=handler.exchange)
assert handler.consumer.callbacks[0].func is reg[key].handler

0 comments on commit 9034be8

Please sign in to comment.