A simple implementation of STOMP with Django.
In theory it can work with any broker which supports STOMP with none or minor adjustments.
pip install django_stomp
Add django_stomp
in your INSTALLED_APPS
and so be it.
Not yet fully available, but feel free to see our tests to get insights.
First you must create a function which receives an parameter of type django_stomp.services.consumer.Payload
. Let's suppose the module app.sample
with the following content:
import logging
from django_stomp.services.consumer import Payload
logger = logging.getLogger(__name__)
def my_honest_logic(payload: Payload) -> None:
logger.info("Yeah, I received a payload from django-stomp!")
my_payload = payload.body
my_header = payload.headers
if my_payload.get("my-dict-key"):
payload.ack()
else:
logger.info("To DLQ!")
payload.nack()
Now you must provide broker connection details filling out the following parameters at least:
- STOMP_SERVER_HOST
- STOMP_SERVER_PORT
- STOMP_USE_SSL
And just create the job issuing the following command:
python manage.py pubsub "/queue/your-stuff" app.sample.my_honest_logic
That's it ✌️
Here is a list of parameters that you can set in your Django project settings:
STOMP_SERVER_HOST
The hostname of the STOMP server.
STOMP_SERVER_PORT
The STOMP server port used to allow STOMP connections.
STOMP_SERVER_USER
The client username to connect to a STOMP server.
STOMP_SERVER_PASSWORD
The client password to connect to a STOMP server.
STOMP_USE_SSL
Set to True, true, 1, T, t, Y or y
in order to all STOMP connections use a SSL/TLS tunnel.
STOMP_SERVER_STANDBY_HOST
The hostname of the STOMP standby server.
STOMP_SERVER_STANDBY_PORT
The STOMP standby server port used to allow STOMP connections.
STOMP_SERVER_VHOST
The virtual host used in the STOMP server.
STOMP_SUBSCRIPTION_ID
Used to identify the subscription in the connection between client and server. See the STOMP protocol specification for more information.
STOMP_OUTGOING_HEARTBEAT
A positive integer to indicates what is the period (in milliseconds) the client will send a frame to the server
that indicates its still alive. Set to 0
to means that it cannot send any heart-beat frame. See the STOMP
protocol specification for more information.
Defaults to 10000 ms.
STOMP_INCOMING_HEARTBEAT
A positive integer to indicates what is the period (in milliseconds) the client will await for a server frame until
it assumes that the server is still alive. Set to 0
to means that it do not want to receive heart-beats. See
the STOMP protocol specification for more
information. Defaults to 10000 ms.
STOMP_WAIT_TO_CONNECT
A positive integer to indicates how long it needs to await to try to reconnect if an Exception
in the listener
logic is not properly handled.
STOMP_DURABLE_TOPIC_SUBSCRIPTION
Set to True, true, 1, T, t, Y or y
in order to all STOMP topic subscription be durable. See the RabbitMQ take on it or the
ActiveMQ for more information.
STOMP_LISTENER_CLIENT_ID
A string that represents the client id for a durable subscriber or the listener prefix client id in a non-durable subscription in ActiveMQ.
STOMP_CORRELATION_ID_REQUIRED
A flag that indicates if correlation-id
header must be required or not. By default this flag is true (good practice
thinking in future troubleshooting).
Set to False, false, 0, F, f, N or n
in order to allow consume messages without correlation-id
header. If it's
false django-stomp
generates a correlation-id header for the message automatically.
STOMP_PROCESS_MSG_ON_BACKGROUND
A flag to indicate if it should process a received message on background, enabling the broker-consumer communication
to still take place.
Set to True, true, 1, T, t, Y or y
in order to have the message processing on background.
STOMP_PROCESS_MSG_WORKERS
Optional parameter that controls how many workers the pool that manage the background processing should create. If defined, this parameter must be an integer!
In order to execute tests for ActiveMQ, execute the following:
docker-compose up -d broker-activemq
Or for RabbitMQ:
docker-compose up -d broker-rabbitmq
Then at last:
pipenv run tox
For every message that a django-stomp
consumer receives, it opens a new DB connection if it needs to, keeping it open until it exceeds the maximum age defined by CONN_MAX_AGE
or when the connection becomes unusable.
- Currently, we assume that all dead lettered messages are sent to a queue with the same name as its original
destination but prefixed with
DLQ.
, i.e., if your queue is/queue/some-queue
, the dead letter destination is asssumed to be/queue/DLQ.some-queue
. - Be cautious with the heartbeat functionality! If your consumer is slow, it could prevent the client to receive
and process any
heart-beat
frame sent by the server, causing the client to terminate the connection due to a false positive heartbeat timeout. You can workaround it with theSTOMP_PROCESS_MSG_ON_BACKGROUND
parameter that uses a thread pool to process the message. - For the RabbitMQ users: the django-stomp consumer always try to connect to a durable queue, so if your queue is not durable, the RabbitMQ broker will not allow the subscription.
- For versions prior to 5.0.0: Any database connection management in the consumer side is up to its callback. If you have any long-running consumer that relies on a DB connection, be sure that you manage it properly, otherwise if a connection becomes unusable, you'll have to restart the consumer entirely just to setup a new DB connection.