Skip to content

Commit

Permalink
Added support for queue/topic subscriber count control message.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfp committed Dec 13, 2009
1 parent d7fd9ff commit 9f33267
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 3 deletions.
2 changes: 2 additions & 0 deletions mq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,6 @@ sig
val unsubscribe_topic : connection -> string -> unit thread

val queue_size : connection -> string -> Int64.t option thread
val queue_subscribers : connection -> string -> int option thread
val topic_subscribers : connection -> string -> int option thread
end
2 changes: 2 additions & 0 deletions mq_adapter_base.ml
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,6 @@ struct
end

let queue_size conn queue = return None
let queue_subscribers conn queue = return None
let topic_subscribers conn topic = return None
end
4 changes: 4 additions & 0 deletions mq_impl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ struct
method virtual unsubscribe_topic : string -> unit M.thread

method virtual queue_size : string -> Int64.t option M.thread
method virtual queue_subscribers : string -> int option M.thread
method virtual topic_subscribers : string -> int option M.thread
end

module Tset = Set.Make(struct type t = M.transaction let compare = compare end)
Expand Down Expand Up @@ -127,6 +129,8 @@ struct
method create_queue s = self#with_conn (fun c -> M.create_queue c s)

method queue_size s = self#with_conn (fun c -> M.queue_size c s)
method topic_subscribers s = self#with_conn (fun c -> M.topic_subscribers c s)
method queue_subscribers s = self#with_conn (fun c -> M.queue_subscribers c s)

method subscribe_queue ?(auto_delete = false) s =
self#with_conn (fun c -> M.subscribe_queue ~auto_delete c s) >>= fun () ->
Expand Down
4 changes: 4 additions & 0 deletions mq_impl.mli
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ module Make :
method virtual unsubscribe_queue : string -> unit M.thread
method virtual unsubscribe_topic : string -> unit M.thread
method virtual queue_size : string -> Int64.t option M.thread
method virtual queue_subscribers : string -> int option M.thread
method virtual topic_subscribers : string -> int option M.thread
end

class simple_queue :
Expand Down Expand Up @@ -80,6 +82,8 @@ module Make :
method unsubscribe_queue : string -> unit M.thread
method unsubscribe_topic : string -> unit M.thread
method queue_size : string -> Int64.t option M.thread
method queue_subscribers : string -> int option M.thread
method topic_subscribers : string -> int option M.thread
end

val make_tcp_message_queue :
Expand Down
17 changes: 14 additions & 3 deletions mq_ocamlmq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,29 @@ struct
open CONC
include Mq_adapter_base.Make_STOMP(CONC)

let queue_size conn queue =
let control_msg_aux conn dest f field =
let c = conn.c_conn in
let rid = B.receipt_id () in
B.expect_receipt c rid;
B.send_no_ack c
~headers:["receipt", rid]
~destination:("/control/count-msgs/queue/" ^ queue) "" >>= fun () ->
~destination:("/control/" ^ dest) "" >>= fun () ->
B.receive_receipt c rid >>= fun r ->
try
return (Some (Int64.of_string (List.assoc "num-messages" r.B.r_headers)))
return (Some (f (List.assoc field r.B.r_headers)))
with _ -> return None

let queue_size conn queue =
control_msg_aux conn ("count-msgs/queue/" ^ queue) Int64.of_string "num-messages"

let queue_subscribers conn queue =
control_msg_aux conn
("count-subscribers/queue/" ^ queue) int_of_string "num-subscribers"

let topic_subscribers conn topic =
control_msg_aux conn
("count-subscribers/topic/" ^ topic) int_of_string "num-subscribers"

let timeout_headers =
Option.map_default (fun timeout -> ["ack-timeout", string_of_float timeout]) []

Expand Down
2 changes: 2 additions & 0 deletions mq_rabbitmq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -136,4 +136,6 @@ struct
return ()

let queue_size conn queue = return None
let queue_subscribers conn queue = return None
let topic_subscribers conn topic = return None
end

0 comments on commit 9f33267

Please sign in to comment.