Skip to content

Commit

Permalink
Revert wait_for to pre-1.3.0 implementation using a custom loop. Closes
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Jun 3, 2017
1 parent c750a26 commit d85568a
Show file tree
Hide file tree
Showing 10 changed files with 81 additions and 78 deletions.
11 changes: 9 additions & 2 deletions CHANGES
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
x.y.z ()
=====

Fixed:

- Revert `wait_for` implementation to pre-`1.3.0`, using a custom `select` loop (#453)

1.3.1 (28-05-2017)
=====

New:

- Allow any tags allowed in @"encoder.encoder.export"@ settings in vorbis streams (#418)
- Allow any tags allowed in `"encoder.encoder.export"` settings in vorbis streams (#418)

- Allow @"audio/mp3"@ mime-type for mp3 in file resolution protocol. (#451)
- Allow `"audio/mp3"` mime-type for mp3 in file resolution protocol. (#451)

Fixed:

Expand Down
2 changes: 1 addition & 1 deletion src/lang/lang_builtins.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1798,7 +1798,7 @@ let () =
in
let timed_out =
try
Tutils.wait_for [`Read out_pipe; `Delay timeout] ;
Tutils.wait_for (`Read out_pipe) timeout ;
(-1.)
with Tutils.Timeout f ->
Process_handler.kill p;
Expand Down
2 changes: 1 addition & 1 deletion src/sources/harbor_input.ml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ struct
try
let fd = Harbor.file_descr_of_socket socket in
(* Wait for `Read event on socket. *)
Tutils.wait_for ~log [`Read fd; `Delay timeout];
Tutils.wait_for ~log (`Read fd) timeout;
(* Now read. *)
relay_read socket len
with
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_source.ml
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ struct
| Some (socket,read,_) ->
begin
try
Http.wait_for ~log [`Read socket; `Delay timeout];
Http.wait_for ~log (`Read socket) timeout;
read len
with e -> self#log#f 2 "Error while reading from socket: \
%s" (Printexc.to_string e);
Expand Down
22 changes: 9 additions & 13 deletions src/tools/http.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ module type Transport_t =
sig
type connection
type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]
val default_port : int
val connect : ?bind_address:string -> string -> int -> connection
val wait_for : ?log:(string -> unit) -> event list -> unit
val wait_for : ?log:(string -> unit) -> event -> float -> unit
val write: connection -> Bytes.t -> int -> int -> int
val read: connection -> Bytes.t -> int -> int -> int
val disconnect: connection -> unit
Expand All @@ -19,10 +18,9 @@ module Unix_transport : Transport_t with type connection = Unix.file_descr =
struct
type connection = Unix.file_descr
type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]
exception Socket
let connect ?bind_address host port =
Expand Down Expand Up @@ -64,10 +62,9 @@ sig
val string_of_error : error -> string
type connection
type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]
type uri = {
host: string;
Expand All @@ -86,7 +83,7 @@ sig
val disconnect : connection -> unit
val read : connection -> Bytes.t -> int -> int -> int
val write : connection -> Bytes.t -> int -> int -> int
val wait_for : ?log:(string -> unit) -> event list -> unit
val wait_for : ?log:(string -> unit) -> event -> float -> unit
type status = string * int * string
type headers = (string*string) list
val read_crlf : ?log:(string -> unit) -> ?max:int -> ?count:int ->
Expand Down Expand Up @@ -159,10 +156,9 @@ struct
type connection = Transport.connection

type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]

type uri = {
Expand Down Expand Up @@ -287,7 +283,7 @@ struct
Pcre.get_substring s 1

let read_with_timeout ?(log=fun _ -> ()) ~timeout socket buflen =
Transport.wait_for ~log [`Read socket; `Delay timeout];
Transport.wait_for ~log (`Read socket) timeout;
match buflen with
| Some buflen ->
let buf = Bytes.create buflen in
Expand Down Expand Up @@ -326,7 +322,7 @@ struct
(* This is quite ridiculous but we have
* no way to know how much data is available
* in the socket.. *)
Transport.wait_for ~log [`Read socket; `Delay timeout];
Transport.wait_for ~log (`Read socket) timeout;
let h = Transport.read socket c 0 1 in
if h < 1 then
stop := true
Expand Down Expand Up @@ -367,7 +363,7 @@ struct
let request ?(log=fun _ -> ()) ~timeout socket request =
if
let len = String.length request in
Transport.wait_for ~log [`Write socket; `Delay timeout];
Transport.wait_for ~log (`Write socket) timeout;
Transport.write socket request 0 len < len
then
raise Socket ;
Expand Down
10 changes: 4 additions & 6 deletions src/tools/http.mli
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@ module type Transport_t =
sig
type connection
type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]
val default_port : int
val connect : ?bind_address:string -> string -> int -> connection
val wait_for : ?log:(string -> unit) -> event list -> unit
val wait_for : ?log:(string -> unit) -> event -> float -> unit
val write: connection -> Bytes.t -> int -> int -> int
val read: connection -> Bytes.t -> int -> int -> int
val disconnect: connection -> unit
Expand All @@ -27,10 +26,9 @@ module type Http_t =
type connection

type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]

type uri = {
Expand Down Expand Up @@ -76,7 +74,7 @@ module type Http_t =
val write : connection -> Bytes.t -> int -> int -> int

(** Wait until Read and/or Write will be non-blocking (mostly..) *)
val wait_for : ?log:(string -> unit) -> event list -> unit
val wait_for : ?log:(string -> unit) -> event -> float -> unit

(** Status of a request:
* version of the HTTP protocol, status number and status message. *)
Expand Down
17 changes: 8 additions & 9 deletions src/tools/https.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ struct
type connection = Ssl.socket

type event = [
| `Delay of float
| `Write of connection
| `Read of connection
| `Exception of connection
| `Both of connection
]

let default_port = 443
Expand Down Expand Up @@ -36,14 +35,14 @@ struct

let disconnect = Ssl.shutdown

let wait_for ?log events =
let events = List.map (function
| `Read s -> `Read (Ssl.file_descr_of_socket s)
| `Write s -> `Write (Ssl.file_descr_of_socket s)
| `Exception s -> `Exception (Ssl.file_descr_of_socket s)
| `Delay f -> `Delay f) events
let wait_for ?log event timeout =
let event =
match event with
| `Read s -> `Read (Ssl.file_descr_of_socket s)
| `Write s -> `Write (Ssl.file_descr_of_socket s)
| `Both s -> `Both (Ssl.file_descr_of_socket s)
in
Tutils.wait_for ?log events
Tutils.wait_for ?log event timeout

let read = Ssl.read

Expand Down
14 changes: 7 additions & 7 deletions src/tools/https_secure_transport.ml
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ struct
SecureTransport.close h.ctx;
Unix.close h.sock

let wait_for ?log events =
let events = List.map (function
| `Read s -> `Read (Ssl.file_descr_of_socket s)
| `Write s -> `Write (Ssl.file_descr_of_socket s)
| `Exception s -> `Exception (Ssl.file_descr_of_socket s)
| `Delay f -> `Delay f) events
let wait_for ?log event timeout =
let event =
match event with
| `Read s -> `Read (Ssl.file_descr_of_socket s)
| `Write s -> `Write (Ssl.file_descr_of_socket s)
| `Both s -> `Both (Ssl.file_descr_of_socket s)
in
Tutils.wait_for ?log events
Tutils.wait_for ?log event timeout

let read {ctx} buf ofs len =
SecureTransport.read ctx buf ofs len
Expand Down
69 changes: 33 additions & 36 deletions src/tools/tutils.ml
Original file line number Diff line number Diff line change
Expand Up @@ -323,45 +323,42 @@ let error_translator =

let () = Utils.register_error_translator error_translator

(* Wait some events: [`Read socket], [`Write socket] or [`Delay timeout]
* Raises [Timeout elapsed_time] if timeout is reached. *)
let wait_for ~log events =
let timed_out = ref None in
let m = Mutex.create () in
let c = Condition.create () in
let handler = mutexify m (fun l ->
List.iter (function
| `Delay _ ->
timed_out := Some (Unix.gettimeofday ())
| _ -> ()) l;
Condition.signal c;
[])
type event = [
| `Read of Unix.file_descr
| `Write of Unix.file_descr
| `Both of Unix.file_descr
]

(* Wait for [`Read socker], [`Write socket] or [`Both socket] for at most
* [timeout] seconds on the given [socket]. Raises [Timeout elapsed_time]
* if timeout is reached. *)
let wait_for ?(log=fun _ -> ()) event timeout =
let current_time =
Unix.gettimeofday ()
in
let task = {
Duppy.Task.
priority = Non_blocking;
events = events;
handler = handler
} in
let start_time =
Mutex.lock m;
Duppy.Task.add scheduler task;
let ret = Unix.gettimeofday () in
Condition.wait c m;
ret
let max_time =
Unix.gettimeofday () +. timeout
in
match !timed_out with
| Some t ->
let r, w =
match event with
| `Read socket -> [socket],[]
| `Write socket -> [],[socket]
| `Both socket -> [socket],[socket]
in
let rec wait t =
let l,l',_ = Unix.select r w [] t in
if l=[] && l'=[] then begin
let new_time = Unix.gettimeofday () in
if new_time >= max_time then
begin
log "Timeout reached!" ;
raise (Timeout (t -. start_time))
| _ -> ()

let wait_for ?(log=fun _ -> ()) events =
if has_started() then
wait_for ~log events
else
log "cannot wait_for if scheduler hasn't started!"

raise (Timeout (new_time -. current_time))
end
else
wait (min 1. (max_time -. new_time))
end
in
wait (min 1. timeout)

(** Wait for some thread to crash *)
let run = ref true
Expand Down
10 changes: 8 additions & 2 deletions src/tools/tutils.mli
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,15 @@ val mutexify : Mutex.t -> ('a -> 'b) -> ('a -> 'b)

exception Timeout of float

(* Wait some events: [`Read socket], [`Write socket] or [`Delay timeout]
type event = [
| `Read of Unix.file_descr
| `Write of Unix.file_descr
| `Both of Unix.file_descr
]

(* Wait some events: [`Read socket], [`Write socket] or [`Both timeout]
* Raises [Timeout elapsed_time] if timeout is reached. *)
val wait_for : ?log:(string -> unit) -> Duppy.Task.event list -> unit
val wait_for : ?log:(string -> unit) -> event -> float -> unit

(** [finalize ~k f] calls [f] and returns it result,
* and always executes [k], even when [f] raises an exception. *)
Expand Down

0 comments on commit d85568a

Please sign in to comment.