Skip to content

Commit

Permalink
Priority support for MongoDB transport. Transport table update. Priority
Browse files Browse the repository at this point in the history
conversion unification.
  • Loading branch information
daevaorn committed May 8, 2014
1 parent ddb320e commit 3dd3d71
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 65 deletions.
51 changes: 26 additions & 25 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,31 +81,31 @@ and the `Wikipedia article about AMQP`_.
Transport Comparison
====================

+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
+---------------+----------+------------+------------+---------------+--------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
+---------------+----------+------------+------------+---------------+--------------+
| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
+---------------+----------+------------+------------+---------------+--------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
+---------------+----------+------------+------------+---------------+--------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+


.. [#f1] Declarations only kept in memory, so exchanges/queues
Expand All @@ -115,6 +115,7 @@ Transport Comparison
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
.. [#f3] AMQP Message priority support depends on broker implementation.
Documentation
-------------
Expand Down
50 changes: 27 additions & 23 deletions docs/userguide/connections.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,29 +145,31 @@ transport URL, or use ``amqp`` to have the fallback.
Transport Comparison
====================

+---------------+----------+------------+------------+---------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** |
+---------------+----------+------------+------------+---------------+
| *amqp* | Native | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) |
+---------------+----------+------------+------------+---------------+
| *mongodb* | Virtual | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ |
+---------------+----------+------------+------------+---------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No |
+---------------+----------+------------+------------+---------------+
+---------------+----------+------------+------------+---------------+--------------+
| **Client** | **Type** | **Direct** | **Topic** | **Fanout** | **Priority** |
+---------------+----------+------------+------------+---------------+--------------+
| *amqp* | Native | Yes | Yes | Yes | Yes [#f3]_ |
+---------------+----------+------------+------------+---------------+--------------+
| *redis* | Virtual | Yes | Yes | Yes (PUB/SUB) | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *mongodb* | Virtual | Yes | Yes | Yes | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *beanstalk* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *SQS* | Virtual | Yes | Yes [#f1]_ | Yes [#f2]_ | No |
+---------------+----------+------------+------------+---------------+--------------+
| *couchdb* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *zookeeper* | Virtual | Yes | Yes [#f1]_ | No | Yes |
+---------------+----------+------------+------------+---------------+--------------+
| *in-memory* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *django* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *sqlalchemy* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+
| *SLMQ* | Virtual | Yes | Yes [#f1]_ | No | No |
+---------------+----------+------------+------------+---------------+--------------+


.. [#f1] Declarations only kept in memory, so exchanges/queues
Expand All @@ -176,3 +178,5 @@ Transport Comparison
.. [#f2] Fanout supported via storing routing tables in SimpleDB.
Disabled by default, but can be enabled by using the
``supports_fanout`` transport option.
.. [#f3] AMQP Message priority support depends on broker implementation.
16 changes: 16 additions & 0 deletions kombu/tests/transport/virtual/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,22 @@ def test_queue_declare_passive(self):
with self.assertRaises(ChannelError):
self.channel.queue_declare(queue='21wisdjwqe', passive=True)

def test_get_message_priority(self):
def _message(priority):
return self.channel.prepare_message('the message with priority',
priority=priority)

self.assertEqual(self.channel._get_message_priority(_message(5)),
5)
self.assertEqual(self.channel._get_message_priority(_message(self.channel.min_priority - 10)),
self.channel.min_priority)
self.assertEqual(self.channel._get_message_priority(_message(self.channel.max_priority + 10)),
self.channel.max_priority)
self.assertEqual(self.channel._get_message_priority(_message('foobar')),
self.channel.default_priority)
self.assertEqual(self.channel._get_message_priority(_message(2), reverse=True),
self.channel.max_priority - 2)


class test_Transport(Case):

Expand Down
2 changes: 1 addition & 1 deletion kombu/transport/beanstalk.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def _parse_job(self, job):

def _put(self, queue, message, **kwargs):
extra = {}
priority = message['properties']['delivery_info']['priority']
priority = self._get_message_priority(message)
ttr = message['properties'].get('ttr')
if ttr is not None:
extra['ttr'] = ttr
Expand Down
9 changes: 6 additions & 3 deletions kombu/transport/mongodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ def _get(self, queue):
else:
msg = self.get_messages().find_and_modify(
query={'queue': queue},
sort={'_id': pymongo.ASCENDING},
sort=[('priority', pymongo.ASCENDING),
('_id', pymongo.ASCENDING)],
remove=True,
)

Expand All @@ -116,7 +117,9 @@ def _size(self, queue):

def _put(self, queue, message, **kwargs):
self.get_messages().insert({'payload': dumps(message),
'queue': queue})
'queue': queue,
'priority': self._get_message_priority(message,
reverse=True)})

def _purge(self, queue):
size = self._size(queue)
Expand Down Expand Up @@ -202,7 +205,7 @@ def _create_broadcast(self, database, options):
def _ensure_indexes(self):
'''Ensure indexes on collections.'''
self.get_messages().ensure_index(
[('queue', 1), ('_id', 1)], background=True,
[('queue', 1), ('priority', 1), ('_id', 1)], background=True,
)
self.get_broadcast().ensure_index([('queue', 1)])
self.get_routing().ensure_index([('queue', 1), ('exchange', 1)])
Expand Down
7 changes: 2 additions & 5 deletions kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -662,11 +662,8 @@ def priority(self, n):

def _put(self, queue, message, **kwargs):
"""Deliver message."""
try:
pri = max(min(int(
message['properties']['delivery_info']['priority']), 9), 0)
except (TypeError, ValueError, KeyError):
pri = 0
pri = self._get_message_priority(message)

with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))

Expand Down
20 changes: 19 additions & 1 deletion kombu/transport/virtual/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,11 @@ class Channel(AbstractChannel, base.StdChannel):
# List of options to transfer from :attr:`transport_options`.
from_transport_options = ('body_encoding', 'deadletter_queue')

# Priority defaults
default_priority = 0
min_priority = 0
max_priority = 9

def __init__(self, connection, **kwargs):
self.connection = connection
self._consumers = set()
Expand Down Expand Up @@ -653,7 +658,7 @@ def prepare_message(self, body, priority=None, content_type=None,
"""Prepare message data."""
properties = properties or {}
info = properties.setdefault('delivery_info', {})
info['priority'] = priority or 0
info['priority'] = priority or self.default_priority

return {'body': body,
'content-encoding': content_encoding,
Expand Down Expand Up @@ -723,6 +728,19 @@ def cycle(self):
self._reset_cycle()
return self._cycle

def _get_message_priority(self, message, reverse=False):
"""Gets priority from message and converts it to the bounds: [0, 9].
Higher value has more priority.
"""
try:
priority = max(min(int(message['properties']['delivery_info']['priority']),
self.max_priority),
self.min_priority)
except (TypeError, ValueError, KeyError):
priority = self.default_priority

return (self.max_priority - priority) if reverse else priority


class Management(base.Management):

Expand Down
8 changes: 1 addition & 7 deletions kombu/transport/zookeeper.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

from . import virtual

MAX_PRIORITY = 9

try:
import kazoo
Expand Down Expand Up @@ -103,13 +102,8 @@ def _get_queue(self, queue_name):
return queue

def _put(self, queue, message, **kwargs):
try:
priority = message['properties']['delivery_info']['priority']
except KeyError:
priority = 0

queue = self._get_queue(queue)
queue.put(dumps(message), priority=(MAX_PRIORITY - priority))
queue.put(dumps(message), priority=self._get_message_priority(message, reverse=True))

def _get(self, queue):
queue = self._get_queue(queue)
Expand Down

0 comments on commit 3dd3d71

Please sign in to comment.