Skip to content
/ ocamlmq Public

Small footprint, easy to use STOMP message broker designed for task queues and IPC

License

Notifications You must be signed in to change notification settings

mfp/ocamlmq

Repository files navigation

ocamlmq is a STOMP message broker with features that make it especially
suitable for implementing task queues and communication between subsystems:

* persistent queues, scaling over arbitrarily large numbers of queues with
  constant memory usage (i.e. supports millions of queues)
* queued messages need not fit in memory
* strong durability guarantees: messages can be guaranteed to have been saved
  to disk (and fsync'ed) by the time the sender gets a message receipt
* message priorities
* per-subscription prefetch limit for queue messages
* error handling and ACK timeout: if a subscriber doesn't ACK a message
  after a (message-specific) timeout, the message will be sent to another
  subscriber. Messages are also resent automatically if a subscriber dies and
  its connection expires.
* topic subscriptions (messages broadcast to all subscribers) with
  prefix matching
* support for high numbers of subscriptions: millions of subscriptions
  pose no problem
* simple extensions within the STOMP protocol to report the number of messages
  in a queue and the number of subscribers to a queue or topic

ocamlmq is written in OCaml, in ~1400 lines of code.  It is easy to extend and
fairly efficient.  The server is abstracted over a storage backend; there are
currently two backends:
* PostgreSQL's (150 LoC)
* sqlite with in-mem caching (350 LoC)

Scalability
===========

ocamlmq has been designed to support millions of queues and topic
subscriptions without undue memory usage.  This table summarizes the time
complexity of some STOMP operations:

      SEND to queue           O(log subscribers)
      SEND to topic           O(subscribers + log (total subs))
      SUBSCRIBE to queue      O(log (total subs))
      SUBSCRIBE to topic      O(log subscribers)
      ACK                     O(1)              

ocamlmq needs typically around 150 bytes per subscription, so 1 million
subscriptions will not take much more than 150 MB of memory.  No extra memory
is needed for queues, so you can use lots of them with no concerns for memory
usage.


Limitations
===========

ocamlmq works well in the intended use case (persistent queues and transient
topic destinations, with possibly many queues and subscriptions), but it has
some limitations which preclude its use in other domains:
* ocamlmq is not designed to scale beyond several hundred / a few thousand
  simultaneous connections (it will work, but performance will be affected)
* there is no flow control for topic messages (in the intended use case, topic
  messages are assumed to be relatively small and processed fast)
* messages are limited to 16 MB on 32-bit platforms
* ocamlmq does not support very high message rates (ocamlmq delivers only
  ~60K messages/second on a 3GHz AMD64 box)

If you need complex routing rules, scalability to many thousand simultaneous
connections or other _enterprise_ messaging features, you'll be better served
by AMPQ or JMS brokers. ActiveMQ, in particular, is very configurable, so
it'll most likely fit the bill if memory consumption and scaling to many
subscriptions are not a concern.

Building
========

You'll need a working OCaml environment plus the following libraries:
* Lwt
* extlib
* ocaml-sqlite3
* csv
* estring

Additionally, ocamlmq requires PostgreSQL both at compile- and run-time.

Just do

  $ omake

Running
=======

ocamlmq's configuration is given via the command line:

Usage: ocamlmq [options] [sqlite3 database (default: ocamlmq.db)]
  -port PORT         Port to listen at (default: 61613).
  -login LOGIN       Login expected in CONNECT.
  -passcode PASSCODE Passcode expected in CONNECT.
  -maxmsgs N         Flush to disk when there are more than N msgs in mem (default: 100000)
  -flush-period DT   Flush period in seconds (default: 1.0)
  -flush-wait-time N Wait for N milliseconds before flushing (default 0)
  -debug             Write debug info to stderr.
  -help              Display this list of options
  --help             Display this list of options

ocamlmq stores the messages in memory and flushes to disk (with full fsync)
when either:
* more than [maxmsgs] messages have been received since the last flush
* or it's been [flush-period] seconds since the last flush

You can force a full fsync after each message with -maxmsgs 0; this will be
slow, since it will be hitting the disk on each message with a "receipt"
header (otherwise, the message can be saved asynchronously).

If the ocamlmq server is receiving messages concurrently from several clients,
you can increase the overall throughput *while* ensuring that each message is
saved (and fsync'ed) to disk before the reply is sent to the client by using
-flush-wait-time; e.g. -flush-wait-time 50  will wait for 50 ms after
[maxmsgs] messages have been received, and if more messages are received from
other connections they will be flushed at once after the flush-wait-time. This
increases throughput at the expense of latency. Note that [flush-wait-time]
will only increase throughput if messages are being received concurrently
through several connections: if there's only a few of them, a too high
[flush-wait-time] value will *decrease* the overall throughput. In particular,
if there is only one connection, the throughput will be *less than* 
   (1000 / flush-wait-time (in ms))

Here follow some figures, taken on an oldish dual core AMD Athlon 64 X2 with a
7200 RPM SATA disk. In all cases, ocamlmq is run with -maxmsgs 0; messages are
received concurrently through several connections.

 flush-wait-time       concurrency         throughput
------------------------------------------------------
         0                     1           ~260 msg/s
         0                    10           ~260 msg/s
         0                    100          ~260 msg/s
         0                    100          ~260 msg/s
         0                    100          ~260 msg/s
        20                     1            ~43 msg/s
        20                     2            ~85 msg/s
        20                    10           ~390 msg/s
        20                    50           ~820 msg/s
        20                   100          ~1380 msg/s
        20                   200          ~1480 msg/s
        50                     1            ~19 msg/s
        50                     2            ~37 msg/s
        50                    10           ~177 msg/s
        50                    50           ~600 msg/s
        50                   100          ~1150 msg/s
        50                   200          ~1400 msg/s

Contrast with the throughput with other maxmsgs values:

 maxmsgs        throughput
 ----------------------------
   5000         >12000 msg/s
 "infty"        ~17000 msg/s

STOMP protocol specifics 
========================
ocamlmq uses the STOMP protocol as specified in 

  http://stomp.codehaus.org/Protocol

and uses a trailing newline (after the NULL byte) to delimit frames.

SEND
----
The ACK timeout period, after which queue messages are sent to another
subscriber, can be specified in the "ack-timeout" header (as a float in
seconds), e.g.

    SEND
    destination:/queue/test
    ack-timeout:3.14

    just testing
    ^@


SUBSCRIBE
---------

Clients can subscribe to topics (/topic/xxx) which have broadcast, non-durable
semantics, or to queues (/queue/xxx), which are persistent. It is also
possible to subscribe to all the topics matching a prefix, by using
/topic/someprefix* as the destination.

The prefetch limit (max. number of unacknowledged messages allowed by the
server) can be specified in the "prefetch" header.

BEGIN / COMMIT
--------------
They are not implemented, since they are ill-specified.

Control messages
================
A client can send messages to special "control" destinations and obtain the
response in the message receipt (iow., nothing is returned unless the
"receipt" header is set):

  /control/count-msgs/queue/name-of-the-queue
    
    returns the number of messages in the "num-messages" header

  /control/count-subscribers/queue/name-of-the-queue

    returns the number of suscribers in the "num-subscribers" header

  /control/count-subscribers/topic/name-of-the-topic

    returns the number of suscribers in the "num-subscribers" header

About

Small footprint, easy to use STOMP message broker designed for task queues and IPC

Resources

License

Stars

Watchers

Forks

Packages

No packages published