Skip to content

Commit

Permalink
Revert "Implemented -flush-wait-time option."
Browse files Browse the repository at this point in the history
This reverts commit 768adfc.
  • Loading branch information
mfp committed Nov 13, 2010
1 parent 8f7d7a6 commit cb8b4aa
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 41 deletions.
45 changes: 13 additions & 32 deletions mq_sqlite_persistence.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type t = {
mutable flush_alarm : unit Lwt.u;
max_msgs_in_mem : int;
mutable unacks : SSET.t;
flush_wait_time : float;
mutable flush_finished_signal : unit Lwt.t;
}

let count_unmaterialized_pending_acks db =
Expand Down Expand Up @@ -111,37 +109,24 @@ let unack db msg_id =
sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE msg_id = %s"
msg_id

let make
?(max_msgs_in_mem = max_int)
?(flush_period = 1.0)
?(flush_wait_time = 0.)
file =
let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) file =
let wait_flush, awaken_flush = Lwt.wait () in
let flush_finished, awaken_flush_finished = Lwt.wait () in
let t =
{ db = Sqlexpr_sqlite.open_db file; in_mem = Hashtbl.create 13;
in_mem_msgs = Hashtbl.create 13; ack_pending = SSET.empty;
flush_alarm = awaken_flush;
max_msgs_in_mem = max_msgs_in_mem;
unacks = SSET.empty;
flush_finished_signal = flush_finished;
flush_wait_time = flush_wait_time;
} in
let flush_period = max flush_period 0.005 in
let wait_flush_time () =
if flush_wait_time > 0. then Lwt_unix.sleep flush_wait_time
else return () in
let rec loop_flush wait_flush awaken_flush_finished =
Lwt.choose [Lwt_unix.sleep flush_period; wait_flush >>= wait_flush_time] >>
begin
let wait, awaken = Lwt.wait () in
let ff, awaken_ff = Lwt.wait () in
flush t;
t.flush_alarm <- awaken;
t.flush_finished_signal <- ff;
Lwt.wakeup awaken_flush_finished ();
loop_flush wait awaken_ff
end in
let rec loop_flush wait_flush =
Lwt.choose [Lwt_unix.sleep flush_period; wait_flush] >>
begin
let wait, awaken = Lwt.wait () in
flush t;
t.flush_alarm <- awaken;
loop_flush wait
end in
let rec loop_flush_unacks () =
lwt () = Lwt_unix.sleep 0.1 in
if not (SSET.is_empty t.unacks) then
Expand All @@ -153,7 +138,7 @@ let make
loop_flush_unacks ()
in
ignore
(try_lwt loop_flush wait_flush awaken_flush_finished
(try_lwt loop_flush wait_flush
with e -> puts "EXCEPTION IN FLUSHER: %s" (Printexc.to_string e);
return ());
ignore
Expand Down Expand Up @@ -205,13 +190,9 @@ let do_save_msg t sent msg =
in Hashtbl.add t.in_mem dest p
end;
Hashtbl.add t.in_mem_msgs msg.msg_id msg;
if Hashtbl.length t.in_mem_msgs < t.max_msgs_in_mem then
return ()
else begin
let wait = t.flush_finished_signal in
(try Lwt.wakeup t.flush_alarm (); with Invalid_argument _ -> ());
wait
end
if Hashtbl.length t.in_mem_msgs > t.max_msgs_in_mem then
Lwt.wakeup t.flush_alarm ();
return ()

let save_msg t ?low_priority msg =
do_save_msg t false msg
Expand Down
5 changes: 1 addition & 4 deletions mq_sqlite_persistence.mli
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@

include Mq_server.PERSISTENCE

val make :
?max_msgs_in_mem:int ->
?flush_period:float ->
?flush_wait_time:float -> string -> t
val make : ?max_msgs_in_mem:int -> ?flush_period:float -> string -> t

(* Used for testing *)
val auto_check_db : Format.formatter -> bool
Expand Down
6 changes: 1 addition & 5 deletions ocamlmq.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ let debug = ref false
let login = ref None
let passcode = ref None
let db = ref None
let max_in_mem = ref 10000
let max_in_mem = ref 100000
let flush_period = ref 1.
let flush_wait_ms = ref 0

let params =
Arg.align
Expand All @@ -24,8 +23,6 @@ let params =
"N Flush to disk when there are more than N msgs in mem (default: 100000)";
"-flush-period", Arg.Set_float flush_period,
"DT Flush period in seconds (default: 1.0)";
"-flush-wait-time", Arg.Set_int flush_wait_ms,
"N Wait for N milliseconds before flushing (default 0)";
"-debug", Arg.Set debug, " Write debug info to stderr.";
]

Expand All @@ -51,7 +48,6 @@ let () =
Mq_sqlite_persistence.make
~max_msgs_in_mem:!max_in_mem
~flush_period:!flush_period
~flush_wait_time:(float !flush_wait_ms *. 1e-3)
(Option.default "ocamlmq.db" !db)
in
if !debug then eprintf "Connected to database.\n%!";
Expand Down

0 comments on commit cb8b4aa

Please sign in to comment.