From 5231dedf7c716e584473bd1c3de5ee883c5f32b6 Mon Sep 17 00:00:00 2001 From: Robert Erdin Date: Fri, 16 Nov 2018 17:30:17 +0000 Subject: [PATCH 1/2] Add support for arbitrary queue arguments --- event_consumer/handlers.py | 9 +++++++-- event_consumer/types.py | 4 ++-- tests/base.py | 1 + 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/event_consumer/handlers.py b/event_consumer/handlers.py index 657b7d8..212de96 100644 --- a/event_consumer/handlers.py +++ b/event_consumer/handlers.py @@ -54,7 +54,8 @@ def _validate_registration(register_key): # type: (QueueRegistration) -> None def message_handler(routing_keys, # type: Union[str, Iterable] queue=None, # type: Optional[str] - exchange=DEFAULT_EXCHANGE # type: str + exchange=DEFAULT_EXCHANGE, # type: str + **kwargs ): # type: (...) -> Callable[[Callable], Any] """ @@ -78,6 +79,7 @@ def message_handler(routing_keys, # type: Union[str, Iterable] default queue name without prepending `QUEUE_NAME_PREFIX`. exchange: The AMQP exchange config to use. This is a *key name* in the `settings.EXCHANGES` dict. + queue_arguments: Arbitrary arguments to be passed to the *primary* queue at creation time. Returns: Callable: function decorator @@ -120,7 +122,7 @@ def decorator(f): # type: (Callable) -> Callable # kombu.Consumer has no concept of routing-key (only queue name) so # so handler registrations must be unique on queue+exchange (otherwise # messages from the queue would be randomly sent to the duplicate handlers) - register_key = QueueRegistration(routing_key, queue_name, exchange) + register_key = QueueRegistration(routing_key, queue_name, exchange, kwargs.get('queue_arguments', {})) _validate_registration(register_key) REGISTRY[register_key] = f @@ -181,6 +183,7 @@ def get_handlers(self, channel): queue_registration.routing_key, queue_registration.queue, queue_registration.exchange, + queue_registration.queue_arguments, func, backoff_func=settings.BACKOFF_FUNC, ) @@ -203,6 +206,7 @@ def __init__(self, routing_key, # type: str queue, # type: str exchange, # type: str + queue_arguments, #type: Dict[str, str] func, # type: Callable[[Any], Any] backoff_func=None # type: Optional[Callable[[int], float]] ): @@ -230,6 +234,7 @@ def __init__(self, exchange=self.exchanges[exchange], routing_key=self.routing_key, channel=self.channel, + queue_arguments=queue_arguments, ) self.retry_queue = kombu.Queue( diff --git a/event_consumer/types.py b/event_consumer/types.py index adf074a..35531a8 100644 --- a/event_consumer/types.py +++ b/event_consumer/types.py @@ -1,8 +1,8 @@ -from typing import NamedTuple +from typing import NamedTuple, Dict # Used by handlers.REGISTRY as keys QueueRegistration = NamedTuple( 'QueueRegistration', - [('routing_key', str), ('queue', str), ('exchange', str)] + [('routing_key', str), ('queue', str), ('exchange', str), ('queue_arguments', Dict[str, str])] ) diff --git a/tests/base.py b/tests/base.py index ff0d9fc..2b2a4d2 100644 --- a/tests/base.py +++ b/tests/base.py @@ -41,6 +41,7 @@ def setUp(self): routing_key=self.routing_key, queue=self.routing_key, exchange=self.exchange, + queue_arguments={}, func=lambda body: None, backoff_func=lambda attempt: 0, ) From 55e4e8ec1651c0d8c8e7ec08a1b16794c4dc238f Mon Sep 17 00:00:00 2001 From: Anentropic Date: Tue, 20 Nov 2018 13:29:02 +0000 Subject: [PATCH 2/2] Refactor support for queue arguments, add test --- event_consumer/handlers.py | 51 +++++++++------ event_consumer/types.py | 21 ++++-- tests/test_consumer_step.py | 127 +++++++++++++++++++++++++++++------- 3 files changed, 151 insertions(+), 48 deletions(-) diff --git a/event_consumer/handlers.py b/event_consumer/handlers.py index 212de96..18d674b 100644 --- a/event_consumer/handlers.py +++ b/event_consumer/handlers.py @@ -20,7 +20,7 @@ from event_consumer.conf import settings from event_consumer.errors import InvalidQueueRegistration, NoExchange, PermanentFailure -from event_consumer.types import QueueRegistration +from event_consumer.types import HandlerRegistration, QueueKey if settings.USE_DJANGO: from django.core.signals import request_finished @@ -30,19 +30,18 @@ # Maps routing-keys to handlers -REGISTRY = {} # type: Dict[QueueRegistration, Callable] +REGISTRY = {} # type: Dict[QueueKey, HandlerRegistration] DEFAULT_EXCHANGE = 'default' -def _validate_registration(register_key): # type: (QueueRegistration) -> None +def _validate_registration(register_key): # type: (QueueKey) -> None """ Raises: InvalidQueueRegistration """ global REGISTRY - existing = {(r.queue, r.exchange) for r in REGISTRY.keys()} - if (register_key.queue, register_key.exchange) in existing: + if register_key in REGISTRY: raise InvalidQueueRegistration( 'Attempted duplicate registrations for messages with the queue name ' '"{queue}" and exchange "{exchange}"'.format( @@ -55,7 +54,7 @@ def _validate_registration(register_key): # type: (QueueRegistration) -> None def message_handler(routing_keys, # type: Union[str, Iterable] queue=None, # type: Optional[str] exchange=DEFAULT_EXCHANGE, # type: str - **kwargs + queue_arguments=None, # Optional[Dict[str, object]] ): # type: (...) -> Callable[[Callable], Any] """ @@ -79,7 +78,8 @@ def message_handler(routing_keys, # type: Union[str, Iterable] default queue name without prepending `QUEUE_NAME_PREFIX`. exchange: The AMQP exchange config to use. This is a *key name* in the `settings.EXCHANGES` dict. - queue_arguments: Arbitrary arguments to be passed to the *primary* queue at creation time. + queue_arguments: Arbitrary arguments to be passed to the *primary* queue + at creation time. Returns: Callable: function decorator @@ -113,6 +113,8 @@ def process_message(body): "separate handlers for each routing key in this case." ) + queue_arguments = queue_arguments or {} + def decorator(f): # type: (Callable) -> Callable global REGISTRY @@ -122,11 +124,22 @@ def decorator(f): # type: (Callable) -> Callable # kombu.Consumer has no concept of routing-key (only queue name) so # so handler registrations must be unique on queue+exchange (otherwise # messages from the queue would be randomly sent to the duplicate handlers) - register_key = QueueRegistration(routing_key, queue_name, exchange, kwargs.get('queue_arguments', {})) + register_key = QueueKey(queue=queue_name, exchange=exchange) _validate_registration(register_key) - REGISTRY[register_key] = f - _logger.debug('registered: %s to handler: %s.%s', register_key, f.__module__, f.__name__) + handler_registration = HandlerRegistration( + routing_key=routing_key, + queue_arguments=queue_arguments, + handler=f, + ) + REGISTRY[register_key] = handler_registration + + _logger.debug( + 'registered: %s to handler: %s.%s', + register_key, + f.__module__, + f.__name__ + ) return f @@ -148,7 +161,7 @@ class AMQPRetryConsumerStep(bootsteps.StartStopStep): def __init__(self, *args, **kwargs): self.handlers = [] # type: List[AMQPRetryHandler] - self._tasks = kwargs.pop('tasks', REGISTRY) # type: Dict[QueueRegistration, Callable] + self._tasks = kwargs.pop('tasks', REGISTRY) # type: Dict[QueueKey, HandlerRegistration] super(AMQPRetryConsumerStep, self).__init__(*args, **kwargs) def start(self, c): @@ -179,15 +192,15 @@ def _close(self, c, cancel_consumers=True): def get_handlers(self, channel): return [ AMQPRetryHandler( - channel, - queue_registration.routing_key, - queue_registration.queue, - queue_registration.exchange, - queue_registration.queue_arguments, - func, + channel=channel, + routing_key=handler_registration.routing_key, + queue=queue_key.queue, + exchange=queue_key.exchange, + queue_arguments=handler_registration.queue_arguments, + func=handler_registration.handler, backoff_func=settings.BACKOFF_FUNC, ) - for queue_registration, func in self._tasks.items() + for queue_key, handler_registration in self._tasks.items() ] @@ -206,7 +219,7 @@ def __init__(self, routing_key, # type: str queue, # type: str exchange, # type: str - queue_arguments, #type: Dict[str, str] + queue_arguments, # type: Dict[str, str] func, # type: Callable[[Any], Any] backoff_func=None # type: Optional[Callable[[int], float]] ): diff --git a/event_consumer/types.py b/event_consumer/types.py index 35531a8..fb18ed6 100644 --- a/event_consumer/types.py +++ b/event_consumer/types.py @@ -1,8 +1,21 @@ -from typing import NamedTuple, Dict +from typing import NamedTuple, Dict, Callable # Used by handlers.REGISTRY as keys -QueueRegistration = NamedTuple( - 'QueueRegistration', - [('routing_key', str), ('queue', str), ('exchange', str), ('queue_arguments', Dict[str, str])] +QueueKey = NamedTuple( + 'QueueKey', + [ + ('queue', str), + ('exchange', str), + ] +) + + +HandlerRegistration = NamedTuple( + 'HandlerRegistration', + [ + ('routing_key', str), + ('queue_arguments', Dict[str, str]), + ('handler', Callable), + ] ) diff --git a/tests/test_consumer_step.py b/tests/test_consumer_step.py index 6e922d2..c5697cd 100644 --- a/tests/test_consumer_step.py +++ b/tests/test_consumer_step.py @@ -7,7 +7,7 @@ from event_consumer import handlers as ec from event_consumer.conf import settings from event_consumer.errors import InvalidQueueRegistration -from event_consumer.types import QueueRegistration +from event_consumer.types import QueueKey def test_get_handlers_with_defaults(): @@ -27,8 +27,15 @@ def f2(body): assert len(reg) == 2 - assert f1 is reg[QueueRegistration('my.routing.key1', 'my.routing.key1', 'default')] - assert f2 is reg[QueueRegistration('my.routing.key2', 'my.routing.key2', 'default')] + handler_reg1 = reg[QueueKey(queue='my.routing.key1', exchange='default')] + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {} + + handler_reg2 = reg[QueueKey(queue='my.routing.key2', exchange='default')] + assert handler_reg2.handler is f2 + assert handler_reg2.routing_key == 'my.routing.key2' + assert handler_reg2.queue_arguments == {} step = ec.AMQPRetryConsumerStep(None) handlers = step.get_handlers(channel=mock.MagicMock()) @@ -40,8 +47,8 @@ def f2(body): assert len(handler.consumer.queues) == 1 assert len(handler.consumer.callbacks) == 1 assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) - key = (handler.routing_key, handler.queue, handler.exchange) - assert handler.consumer.callbacks[0].func is reg[key] + key = QueueKey(queue=handler.queue, exchange=handler.exchange) + assert handler.consumer.callbacks[0].func is reg[key].handler @override_settings(settings, QUEUE_NAME_PREFIX='myapp:') @@ -68,12 +75,19 @@ def f2(body): assert len(reg) == 2 - assert f1 is reg[ - QueueRegistration('my.routing.key1', 'myapp:my.routing.key1', 'custom') + handler_reg1 = reg[ + QueueKey(queue='myapp:my.routing.key1', exchange='custom') ] - assert f2 is reg[ - QueueRegistration('my.routing.key2', 'myapp:my.routing.key2', 'custom') + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {} + + handler_reg2 = reg[ + QueueKey(queue='myapp:my.routing.key2', exchange='custom') ] + assert handler_reg2.handler is f2 + assert handler_reg2.routing_key == 'my.routing.key2' + assert handler_reg2.queue_arguments == {} step = ec.AMQPRetryConsumerStep(None) handlers = step.get_handlers(channel=mock.MagicMock()) @@ -85,8 +99,8 @@ def f2(body): assert len(handler.consumer.queues) == 1 assert len(handler.consumer.callbacks) == 1 assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) - key = (handler.routing_key, handler.queue, handler.exchange) - assert handler.consumer.callbacks[0].func is reg[key] + key = QueueKey(queue=handler.queue, exchange=handler.exchange) + assert handler.consumer.callbacks[0].func is reg[key].handler @override_settings(settings, EXCHANGES={'my.exchange1': {}, 'my.exchange2': {}}) @@ -117,9 +131,27 @@ def f3(body): return None assert len(reg) == 3 - assert f1 is reg[QueueRegistration('my.routing.key1', 'my.queue1', 'my.exchange1')] - assert f2 is reg[QueueRegistration('my.routing.key2', 'my.queue2', 'my.exchange1')] - assert f3 is reg[QueueRegistration('my.routing.key2', 'my.queue2', 'my.exchange2')] + + handler_reg1 = reg[ + QueueKey(queue='my.queue1', exchange='my.exchange1') + ] + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {} + + handler_reg2 = reg[ + QueueKey(queue='my.queue2', exchange='my.exchange1') + ] + assert handler_reg2.handler is f2 + assert handler_reg2.routing_key == 'my.routing.key2' + assert handler_reg2.queue_arguments == {} + + handler_reg3 = reg[ + QueueKey(queue='my.queue2', exchange='my.exchange2') + ] + assert handler_reg3.handler is f3 + assert handler_reg3.routing_key == 'my.routing.key2' + assert handler_reg3.queue_arguments == {} step = ec.AMQPRetryConsumerStep(None) handlers = step.get_handlers(channel=mock.MagicMock()) @@ -131,8 +163,37 @@ def f3(body): assert len(handler.consumer.queues) == 1 assert len(handler.consumer.callbacks) == 1 assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) - key = (handler.routing_key, handler.queue, handler.exchange) - assert handler.consumer.callbacks[0].func is reg[key] + key = QueueKey(queue=handler.queue, exchange=handler.exchange) + assert handler.consumer.callbacks[0].func is reg[key].handler + + +def test_get_handlers_with_queue_arguments(): + """ + Should build handlers from tasks decorated with `@message_handler` + and pass the `queue_arguments` through to `kombu.Queue` constructor. + """ + with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: + + @message_handler('my.routing.key1', queue_arguments={'x-fake-header': 'wtf'}) + def f1(body): + return None + + handler_reg1 = reg[QueueKey(queue='my.routing.key1', exchange='default')] + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {'x-fake-header': 'wtf'} + + with mock.patch('kombu.Queue'): + step = ec.AMQPRetryConsumerStep(None) + handlers = step.get_handlers(channel=mock.MagicMock()) + + assert len(reg) == 1 + assert len(handlers) == len(reg) + + handler = handlers[0] + assert isinstance(handler, ec.AMQPRetryHandler) + assert isinstance(handler.worker_queue, mock.MagicMock) + handler.worker_queue.queue_arguments = {'x-fake-header': 'wtf'} def test_get_handlers_no_exchange(): @@ -168,7 +229,12 @@ def f2(body): assert len(reg) == 1 - assert f1 is reg[QueueRegistration('my.routing.key1', 'custom_queue', 'custom')] + handler_reg1 = reg[ + QueueKey(queue='custom_queue', exchange='custom') + ] + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {} step = ec.AMQPRetryConsumerStep(None) handlers = step.get_handlers(channel=mock.MagicMock()) @@ -180,15 +246,14 @@ def f2(body): assert len(handler.consumer.queues) == 1 assert len(handler.consumer.callbacks) == 1 assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) - key = (handler.routing_key, handler.queue, handler.exchange) - assert handler.consumer.callbacks[0].func is reg[key] + key = QueueKey(queue=handler.queue, exchange=handler.exchange) + assert handler.consumer.callbacks[0].func is reg[key].handler @override_settings(settings, EXCHANGES={'my.exchange1': {}, 'my.exchange2': {}}) def test_get_handlers_with_multiple_routes(*mocks): """ - Should build handlers from tasks decorated with `@message_handler` - using the specified routing key, queue and exchange + Can connect the handler to multiple routing keys, each having a queue. """ with mock.patch.object(ec, 'REGISTRY', new=dict()) as reg: @@ -203,8 +268,20 @@ def f1(body): return None assert len(reg) == 2 - assert f1 is reg[QueueRegistration('my.routing.key1', 'my.routing.key1', 'default')] - assert f1 is reg[QueueRegistration('my.routing.key2', 'my.routing.key2', 'default')] + + handler_reg1 = reg[ + QueueKey(queue='my.routing.key1', exchange='default') + ] + assert handler_reg1.handler is f1 + assert handler_reg1.routing_key == 'my.routing.key1' + assert handler_reg1.queue_arguments == {} + + handler_reg2 = reg[ + QueueKey(queue='my.routing.key2', exchange='default') + ] + assert handler_reg2.handler is f1 + assert handler_reg2.routing_key == 'my.routing.key2' + assert handler_reg2.queue_arguments == {} step = ec.AMQPRetryConsumerStep(None) handlers = step.get_handlers(channel=mock.MagicMock()) @@ -216,5 +293,5 @@ def f1(body): assert len(handler.consumer.queues) == 1 assert len(handler.consumer.callbacks) == 1 assert isinstance(handler.consumer.callbacks[0], ec.AMQPRetryHandler) - key = (handler.routing_key, handler.queue, handler.exchange) - assert handler.consumer.callbacks[0].func is reg[key] + key = QueueKey(queue=handler.queue, exchange=handler.exchange) + assert handler.consumer.callbacks[0].func is reg[key].handler