forked from celery/kombu
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_SLMQ.py
29 lines (24 loc) · 921 Bytes
/
test_SLMQ.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from funtests import transport
from nose import SkipTest
import os
class test_SLMQ(transport.TransportCase):
transport = "SLMQ"
prefix = "slmq"
event_loop_max = 100
message_size_limit = 4192
reliable_purge = False
#: does not guarantee FIFO order, even in simple cases.
suppress_disorder_warning = True
def before_connect(self):
if "SLMQ_ACCOUNT" not in os.environ:
raise SkipTest("Missing envvar SLMQ_ACCOUNT")
if "SL_USERNAME" not in os.environ:
raise SkipTest("Missing envvar SL_USERNAME")
if "SL_API_KEY" not in os.environ:
raise SkipTest("Missing envvar SL_API_KEY")
if "SLMQ_HOST" not in os.environ:
raise SkipTest("Missing envvar SLMQ_HOST")
if "SLMQ_SECURE" not in os.environ:
raise SkipTest("Missing envvar SLMQ_SECURE")
def after_connect(self, connection):
pass