Skip to content

Commit

Permalink
Mq_sqlite_persistence: write messages to stderr.
Browse files Browse the repository at this point in the history
  • Loading branch information
mfp committed Nov 12, 2010
1 parent 5f62008 commit 792515e
Showing 1 changed file with 10 additions and 9 deletions.
19 changes: 10 additions & 9 deletions mq_sqlite_persistence.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,18 @@ let count_unmaterialized_pending_acks db =
let count_acked_messages db =
select_one db sqlc"SELECT @L{COUNT(*)} FROM acked_msgs"

let pr fmt = ksprintf (eprintf "%s%!") fmt
let puts fmt = ksprintf prerr_endline fmt

let flush_acked_msgs ?(verbose = false) db =
if verbose then
printf "Flushing %Ld ACKs\n%!" (count_acked_messages db);
if verbose then puts "Flushing %Ld ACK" (count_acked_messages db);
execute db
sqlc"DELETE FROM ocamlmq_msgs WHERE msg_id IN (SELECT * FROM acked_msgs)";
execute db sqlc"DELETE FROM acked_msgs"

let materialize_pending_acks ?(verbose = false) db =
if verbose then
printf "Materializing %Ld pending ACKs in DB\n%!"
(count_unmaterialized_pending_acks db);
puts "Materializing %Ld pending ACKs in DB" (count_unmaterialized_pending_acks db);
execute db sqlc"UPDATE ocamlmq_msgs SET ack_pending = 1
WHERE msg_id IN (SELECT msg_id FROM pending_acks)";
execute db sqlc"DELETE FROM pending_acks"
Expand All @@ -58,12 +59,12 @@ let rec flush t =
unmaterialized_ack_pendings <> 0L || acked_msgs <> 0L
then begin
flushed := true;
printf "Flushing to disk: %d msgs, %d + %Ld pending ACKS, %Ld ACKS%!"
pr "Flushing to disk: %d msgs, %d + %Ld pending ACKS, %Ld ACKS"
in_mem_msgs ack_pending unmaterialized_ack_pendings acked_msgs;
do_flush t db;
end
end;
if !flushed then printf " (%8.5fs)\n%!" (Unix.gettimeofday () -. t0)
if !flushed then puts " (%8.5fs)" (Unix.gettimeofday () -. t0)

and do_flush t db =
Hashtbl.iter
Expand Down Expand Up @@ -121,18 +122,18 @@ let make ?(max_msgs_in_mem = max_int) ?(flush_period = 1.0) file =
if not (SSET.is_empty t.unacks) then
transaction t.db
(fun db ->
printf "UnACKing %d messages in DB\n%!" (SSET.cardinal t.unacks);
puts "UnACKing %d messages in DB" (SSET.cardinal t.unacks);
SSET.iter (unack db) t.unacks);
t.unacks <- SSET.empty;
loop_flush_unacks ()
in
ignore
(try_lwt loop_flush wait_flush
with e -> printf "EXCEPTION IN FLUSHER: %s\n%!" (Printexc.to_string e);
with e -> puts "EXCEPTION IN FLUSHER: %s" (Printexc.to_string e);
return ());
ignore
(try_lwt loop_flush_unacks ()
with e -> printf "EXCEPTION IN UNACK FLUSHER: %s\n%!" (Printexc.to_string e);
with e -> puts "EXCEPTION IN UNACK FLUSHER: %s" (Printexc.to_string e);
return ());
t

Expand Down

0 comments on commit 792515e

Please sign in to comment.