From cb8b4aa49f2e5f956fe74adb515af182d415658c Mon Sep 17 00:00:00 2001 From: Mauricio Fernandez Date: Sat, 13 Nov 2010 19:34:31 +0100 Subject: [PATCH] Revert "Implemented -flush-wait-time option." This reverts commit 768adfc57cdabc28f0999b176c59961c9b60e101. --- mq_sqlite_persistence.ml | 45 +++++++++++---------------------------- mq_sqlite_persistence.mli | 5 +---- ocamlmq.ml | 6 +----- 3 files changed, 15 insertions(+), 41 deletions(-) diff --git a/mq_sqlite_persistence.ml b/mq_sqlite_persistence.ml index 9680e54..db59959 100644 --- a/mq_sqlite_persistence.ml +++ b/mq_sqlite_persistence.ml @@ -23,8 +23,6 @@ type t = { mutable flush_alarm : unit Lwt.u; max_msgs_in_mem : int; mutable unacks : SSET.t; - flush_wait_time : float; - mutable flush_finished_signal : unit Lwt.t; } let count_unmaterialized_pending_acks db = @@ -111,37 +109,24 @@ let unack db msg_id = sqlc"UPDATE ocamlmq_msgs SET ack_pending = 0 WHERE msg_id = %s" msg_id -let make - ?(max_msgs_in_mem = max_int) - ?(flush_period = 1.0) - ?(flush_wait_time = 0.) - file = +let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) file = let wait_flush, awaken_flush = Lwt.wait () in - let flush_finished, awaken_flush_finished = Lwt.wait () in let t = { db = Sqlexpr_sqlite.open_db file; in_mem = Hashtbl.create 13; in_mem_msgs = Hashtbl.create 13; ack_pending = SSET.empty; flush_alarm = awaken_flush; max_msgs_in_mem = max_msgs_in_mem; unacks = SSET.empty; - flush_finished_signal = flush_finished; - flush_wait_time = flush_wait_time; } in let flush_period = max flush_period 0.005 in - let wait_flush_time () = - if flush_wait_time > 0. then Lwt_unix.sleep flush_wait_time - else return () in - let rec loop_flush wait_flush awaken_flush_finished = - Lwt.choose [Lwt_unix.sleep flush_period; wait_flush >>= wait_flush_time] >> - begin - let wait, awaken = Lwt.wait () in - let ff, awaken_ff = Lwt.wait () in - flush t; - t.flush_alarm <- awaken; - t.flush_finished_signal <- ff; - Lwt.wakeup awaken_flush_finished (); - loop_flush wait awaken_ff - end in + let rec loop_flush wait_flush = + Lwt.choose [Lwt_unix.sleep flush_period; wait_flush] >> + begin + let wait, awaken = Lwt.wait () in + flush t; + t.flush_alarm <- awaken; + loop_flush wait + end in let rec loop_flush_unacks () = lwt () = Lwt_unix.sleep 0.1 in if not (SSET.is_empty t.unacks) then @@ -153,7 +138,7 @@ let make loop_flush_unacks () in ignore - (try_lwt loop_flush wait_flush awaken_flush_finished + (try_lwt loop_flush wait_flush with e -> puts "EXCEPTION IN FLUSHER: %s" (Printexc.to_string e); return ()); ignore @@ -205,13 +190,9 @@ let do_save_msg t sent msg = in Hashtbl.add t.in_mem dest p end; Hashtbl.add t.in_mem_msgs msg.msg_id msg; - if Hashtbl.length t.in_mem_msgs < t.max_msgs_in_mem then - return () - else begin - let wait = t.flush_finished_signal in - (try Lwt.wakeup t.flush_alarm (); with Invalid_argument _ -> ()); - wait - end + if Hashtbl.length t.in_mem_msgs > t.max_msgs_in_mem then + Lwt.wakeup t.flush_alarm (); + return () let save_msg t ?low_priority msg = do_save_msg t false msg diff --git a/mq_sqlite_persistence.mli b/mq_sqlite_persistence.mli index 4027f6c..99f2d4a 100644 --- a/mq_sqlite_persistence.mli +++ b/mq_sqlite_persistence.mli @@ -1,10 +1,7 @@ include Mq_server.PERSISTENCE -val make : - ?max_msgs_in_mem:int -> - ?flush_period:float -> - ?flush_wait_time:float -> string -> t +val make : ?max_msgs_in_mem:int -> ?flush_period:float -> string -> t (* Used for testing *) val auto_check_db : Format.formatter -> bool diff --git a/ocamlmq.ml b/ocamlmq.ml index def3c77..208bf65 100644 --- a/ocamlmq.ml +++ b/ocamlmq.ml @@ -10,9 +10,8 @@ let debug = ref false let login = ref None let passcode = ref None let db = ref None -let max_in_mem = ref 10000 +let max_in_mem = ref 100000 let flush_period = ref 1. -let flush_wait_ms = ref 0 let params = Arg.align @@ -24,8 +23,6 @@ let params = "N Flush to disk when there are more than N msgs in mem (default: 100000)"; "-flush-period", Arg.Set_float flush_period, "DT Flush period in seconds (default: 1.0)"; - "-flush-wait-time", Arg.Set_int flush_wait_ms, - "N Wait for N milliseconds before flushing (default 0)"; "-debug", Arg.Set debug, " Write debug info to stderr."; ] @@ -51,7 +48,6 @@ let () = Mq_sqlite_persistence.make ~max_msgs_in_mem:!max_in_mem ~flush_period:!flush_period - ~flush_wait_time:(float !flush_wait_ms *. 1e-3) (Option.default "ocamlmq.db" !db) in if !debug then eprintf "Connected to database.\n%!";