-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmq_impl.ml
170 lines (140 loc) · 6.57 KB
/
mq_impl.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
(* Copyright (c) 2009 Mauricio Fernández <[email protected]> *)
open Mq
module Make
(C : Mq_concurrency.THREAD)
(M : HIGH_LEVEL with type 'a thread = 'a C.t) =
struct
open C
class virtual ['tx] mq =
object
method virtual disconnect : unit M.thread
method virtual reconnect : unit M.thread
method virtual transaction_begin : 'tx M.thread
method virtual transaction_commit : 'tx -> unit M.thread
method virtual transaction_commit_all : unit M.thread
method virtual transaction_abort : 'tx -> unit M.thread
method virtual transaction_abort_all : unit M.thread
method virtual receive_msg : received_msg M.thread
method virtual ack_msg : ?transaction:'tx -> received_msg -> unit M.thread
method virtual ack : ?transaction:'tx -> string -> unit M.thread
method virtual send :
?transaction:'tx -> ?ack_timeout:float ->
destination:string -> string -> unit M.thread
method virtual send_no_ack :
?transaction:'tx -> ?ack_timeout:float ->
destination:string -> string -> unit M.thread
method virtual topic_send :
?transaction:'tx -> destination:string -> string -> unit M.thread
method virtual topic_send_no_ack :
?transaction:'tx -> destination:string -> string -> unit M.thread
method virtual create_queue : string -> unit M.thread
method virtual subscribe_queue : ?auto_delete:bool -> string -> unit M.thread
method virtual unsubscribe_queue : string -> unit M.thread
method virtual subscribe_topic : string -> unit M.thread
method virtual unsubscribe_topic : string -> unit M.thread
method virtual queue_size : string -> Int64.t option M.thread
method virtual queue_subscribers : string -> int option M.thread
method virtual topic_subscribers : string -> int option M.thread
end
module Tset = Set.Make(struct type t = M.transaction let compare = compare end)
type subscription = Queue of string | Topic of string
module Sset = Set.Make(struct type t = subscription let compare = compare end)
class simple_queue ?prefetch ~login ~passcode addr =
object(self)
inherit [M.transaction] mq
val mutable conn = None
val mutable subs = Sset.empty
method disconnect = match conn with
None -> return ()
| Some c -> conn <- None; M.disconnect c
method private reopen_conn =
let do_set_conn () =
M.connect ?prefetch ~login ~passcode addr >>= fun c ->
conn <- Some c;
self#with_conn
(fun c -> iter_serial
(function
Queue q -> M.subscribe_queue c q
| Topic t -> M.subscribe_topic c t)
(Sset.elements subs)) in
let rec set_conn () =
catch
(fun () -> do_set_conn ())
(function
Message_queue_error (_, _, Connection_error (Connection_refused | Closed)) ->
C.sleep 1. >>= fun () ->
set_conn ()
| e -> fail e)
in match conn with
None -> set_conn ()
| Some c -> self#disconnect >>= fun () -> set_conn ()
method reconnect = self#reopen_conn
method private with_conn : 'a. (M.connection -> 'a t) -> 'a t = fun f ->
let rec doit c =
catch
(fun () -> f c)
(function
(* FIXME: retry only N times? *)
Message_queue_error (Retry, _, _) -> doit c
| Message_queue_error (Reconnect, _, _) ->
self#reopen_conn >>= fun () -> self#with_conn f
| e -> fail e)
in match conn with
None -> self#reopen_conn >>= fun () -> self#with_conn f
| Some c -> doit c
method transaction_begin = self#with_conn M.transaction_begin
method transaction_commit tx = self#with_conn (fun c -> M.transaction_commit c tx)
method transaction_commit_all = self#with_conn M.transaction_commit_all
method transaction_abort tx = self#with_conn (fun c -> M.transaction_abort c tx)
method transaction_abort_all = self#with_conn M.transaction_abort_all
method receive_msg = self#with_conn M.receive_msg
method ack_msg ?transaction msg =
self#with_conn (fun c -> M.ack_msg c ?transaction msg)
method ack ?transaction msgid =
self#with_conn (fun c -> M.ack c ?transaction msgid)
method private aux_send f :
?transaction:M.transaction -> destination:string -> string -> unit M.thread =
fun ?transaction ~destination body ->
self#with_conn (fun c -> f c ?transaction ~destination body)
method send ?transaction ?ack_timeout ~destination body =
self#with_conn (fun c -> M.send c ?ack_timeout ?transaction ~destination body)
method send_no_ack ?transaction ?ack_timeout ~destination body =
self#with_conn
(fun c -> M.send_no_ack c ?ack_timeout ?transaction ~destination body)
method topic_send = self#aux_send M.topic_send
method topic_send_no_ack = self#aux_send M.topic_send_no_ack
method create_queue s = self#with_conn (fun c -> M.create_queue c s)
method queue_size s = self#with_conn (fun c -> M.queue_size c s)
method topic_subscribers s = self#with_conn (fun c -> M.topic_subscribers c s)
method queue_subscribers s = self#with_conn (fun c -> M.queue_subscribers c s)
method subscribe_queue ?(auto_delete = false) s =
self#with_conn (fun c -> M.subscribe_queue ~auto_delete c s) >>= fun () ->
subs <- Sset.add (Queue s) subs;
return ()
method unsubscribe_queue s =
subs <- Sset.remove (Queue s) subs;
match conn with
None -> return ()
| Some c ->
(* ignore any errors, since we have already removed it from the
* set of subscriptions, and won't be resubscribed to on reconn *)
catch (fun () -> M.unsubscribe_queue c s) (fun _ -> return ())
method subscribe_topic s =
self#with_conn (fun c -> M.subscribe_topic c s) >>= fun () ->
subs <- Sset.add (Topic s) subs;
return ()
method unsubscribe_topic s =
subs <- Sset.remove (Topic s) subs;
match conn with
None -> return ()
| Some c ->
(* ignore any errors, since we have already removed it from the
* set of subscriptions, and won't be resubscribed to on reconn *)
catch (fun () -> M.unsubscribe_topic c s) (fun _ -> return ())
end
let make_tcp_mq ?prefetch ~login ~passcode addr port =
new simple_queue ?prefetch ~login ~passcode
(Unix.ADDR_INET (Unix.inet_addr_of_string addr, port))
let make_unix_mq ?prefetch ~login ~passcode path =
new simple_queue ?prefetch ~login ~passcode (Unix.ADDR_UNIX path)
end