Skip to content

Commit

Permalink
v0.15~preview.125.03+403
Browse files Browse the repository at this point in the history
  • Loading branch information
aalekseyev committed Aug 27, 2021
1 parent 07b5f4c commit 209af88
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 114 deletions.
1 change: 1 addition & 0 deletions async.opam
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ depends: [
"core"
"core_kernel"
"ppx_jane"
"ppx_log"
"textutils"
"dune" {>= "2.0.0"}
]
Expand Down
18 changes: 12 additions & 6 deletions async_rpc/src/rpc_transport.ml
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,27 @@ module Unix_writer = struct
;;
end

(* unfortunately, copied from reader0.ml *)
let default_max_message_size = 100 * 1024 * 1024
let default_max_message_size =
Lazy.from_fun (fun () ->
match Sys.getenv "ASYNC_RPC_MAX_MESSAGE_SIZE" with
| None ->
(* unfortunately, copied from reader0.ml *)
100 * 1024 * 1024
| Some max_message_size -> Int.of_string max_message_size)
;;

module Reader = struct
include Kernel_transport.Reader

let of_reader ?(max_message_size = default_max_message_size) reader =
let of_reader ?(max_message_size = force default_max_message_size) reader =
pack (module Unix_reader) (Unix_reader.create ~reader ~max_message_size)
;;
end

module Writer = struct
include Kernel_transport.Writer

let of_writer ?(max_message_size = default_max_message_size) writer =
let of_writer ?(max_message_size = force default_max_message_size) writer =
pack (module Unix_writer) (Unix_writer.create ~writer ~max_message_size)
;;
end
Expand Down Expand Up @@ -235,7 +241,7 @@ module Tcp = struct
?backlog
?drop_incoming_connections
?time_source
?(max_message_size = default_max_message_size)
?(max_message_size = force default_max_message_size)
?(make_transport = default_transport_maker)
?(auth = fun _ -> true)
?(on_handler_error = `Ignore)
Expand Down Expand Up @@ -281,7 +287,7 @@ module Tcp = struct
;;
let connect
?(max_message_size = default_max_message_size)
?(max_message_size = force default_max_message_size)
?(make_transport = default_transport_maker)
?(tcp_connect_timeout =
Async_rpc_kernel.Async_rpc_kernel_private.default_handshake_timeout)
Expand Down
15 changes: 7 additions & 8 deletions async_rpc/src/rpc_transport_low_latency.ml
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,13 @@ module Config = struct
[@@deriving sexp]

let validate t =
if
t.initial_buffer_size <= 0
|| t.max_message_size <= 0
|| t.initial_buffer_size > t.max_buffer_size
|| t.max_message_size > t.max_buffer_size
|| t.buffering_threshold_in_bytes < 0
|| t.start_batching_after_num_messages < 0
|| Time_ns.Span.( <= ) t.write_timeout Time_ns.Span.zero
if t.initial_buffer_size <= 0
|| t.max_message_size <= 0
|| t.initial_buffer_size > t.max_buffer_size
|| t.max_message_size > t.max_buffer_size
|| t.buffering_threshold_in_bytes < 0
|| t.start_batching_after_num_messages < 0
|| Time_ns.Span.( <= ) t.write_timeout Time_ns.Span.zero
then
failwiths
~here:[%here]
Expand Down
9 changes: 4 additions & 5 deletions async_rpc/test-bin/rpc_connection_inspector.ml
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ let copy conn_number desc reader writer =
let len = (len : Bin_prot.Nat0.t :> int) in
let disp_len = min 16 len in
Sexp.List
(Sexp.Atom (sprintf "len=%d" len)
::
List.init disp_len ~f:(fun i ->
let x = Char.to_int buf.{!pos_ref + i} in
Sexp.Atom (sprintf "%02x" x))
((Sexp.Atom (sprintf "len=%d" len)
:: List.init disp_len ~f:(fun i ->
let x = Char.to_int buf.{!pos_ref + i} in
Sexp.Atom (sprintf "%02x" x)))
@ if len > disp_len then [ Atom "..." ] else [])
in
[%sexp (msg : data P.Message.t)])
Expand Down
92 changes: 92 additions & 0 deletions async_rpc/test/import.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
open Core
open Async

let () = Backtrace.elide := true
let max_message_size = 1_000_000

let test ~make_transport ~imp1 ~imp2 ~state1 ~state2 ~f () =
let%bind `Reader r1, `Writer w2 = Unix.pipe (Info.of_string "rpc_test 1") in
let%bind `Reader r2, `Writer w1 = Unix.pipe (Info.of_string "rpc_test 2") in
let t1 = make_transport (r1, w1) in
let t2 = make_transport (r2, w2) in
let s imp =
if List.length imp > 0
then
Some
(Rpc.Implementations.create_exn
~implementations:imp
~on_unknown_rpc:`Close_connection)
else None
in
let s1 = s imp1 in
let s2 = s imp2 in
let conn1_ivar = Ivar.create () in
let f2_done =
Async_rpc_kernel.Rpc.Connection.with_close
?implementations:s2
t2
~dispatch_queries:(fun conn2 ->
let%bind conn1 = Ivar.read conn1_ivar in
f conn1 conn2)
~connection_state:(fun _ -> state2)
~on_handshake_error:`Raise
in
Async_rpc_kernel.Rpc.Connection.with_close
?implementations:s1
t1
~dispatch_queries:(fun conn1 ->
Ivar.fill conn1_ivar conn1;
f2_done)
~connection_state:(fun _ -> state1)
~on_handshake_error:`Raise
;;

let test1 ~make_transport ~imp ~state ~f =
test ~make_transport ~imp1:imp ~state1:state ~imp2:[] ~state2:() ~f
;;

module Pipe_count_error = struct
type t = [ `Argument_must_be_positive ] [@@deriving bin_io]
end

let pipe_count_rpc =
Rpc.Pipe_rpc.create
~name:"pipe_count"
~version:0
~bin_query:Int.bin_t
~bin_response:Int.bin_t
~bin_error:Pipe_count_error.bin_t
()
;;

let pipe_wait_rpc =
Rpc.Pipe_rpc.create
~name:"pipe_wait"
~version:0
~bin_query:Unit.bin_t
~bin_response:Unit.bin_t
~bin_error:Unit.bin_t
()
;;

let pipe_count_imp =
Rpc.Pipe_rpc.implement pipe_count_rpc (fun () n ->
if n < 0
then return (Error `Argument_must_be_positive)
else (
let pipe_r, pipe_w = Pipe.create () in
upon
(Deferred.List.iter (List.init n ~f:Fn.id) ~how:`Sequential ~f:(fun i ->
Pipe.write pipe_w i))
(fun () -> Pipe.close pipe_w);
return (Ok pipe_r)))
;;

let pipe_wait_imp ivar =
Rpc.Pipe_rpc.implement pipe_wait_rpc (fun () () ->
let pipe_r, pipe_w = Pipe.create () in
(Pipe.write pipe_w ()
>>> fun () ->
Ivar.read ivar >>> fun () -> Pipe.write pipe_w () >>> fun () -> Pipe.close pipe_w);
return (Ok pipe_r))
;;
38 changes: 38 additions & 0 deletions async_rpc/test/test_max_message_size.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
open! Core
open Async
open Import

(* This test must be in a file by itself because of lazy evaluation of the environment
variable *)
let%expect_test "default_max_message_size" =
Unix.putenv ~key:"ASYNC_RPC_MAX_MESSAGE_SIZE" ~data:"1";
let make_transport_default_size (fd_r, fd_w) : Rpc.Transport.t =
{ reader = Reader.create fd_r |> Rpc.Transport.Reader.of_reader
; writer = Writer.create fd_w |> Rpc.Transport.Writer.of_writer
}
in
let%map () =
match%map
Monitor.try_with (fun () ->
test1
~make_transport:make_transport_default_size
~imp:[ pipe_count_imp ]
~state:()
~f:(fun _ conn -> Rpc.Pipe_rpc.dispatch_exn pipe_count_rpc conn 1)
())
with
| Error exn -> print_s ([%sexp_of: Exn.t] exn)
| _ -> ()
in
[%expect
{|
(monitor.ml.Error
("Message cannot be sent"
((reason (Message_too_big ((size 7) (max_message_size 1))))
(connection
((description <created-directly>)
(writer
((t ((file_descr _) (info (writer "rpc_test 1")) (kind Fifo)))
(max_message_size 1)))))))
("<backtrace elided in test>")) |}]
;;
1 change: 1 addition & 0 deletions async_rpc/test/test_max_message_size.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
(*_ This signature is deliberately empty. *)
90 changes: 0 additions & 90 deletions async_rpc/test/test_rpc.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,96 +4,6 @@ open! Async
open! Import
module Debug = Async_kernel_private.Debug

let () = Backtrace.elide := true
let max_message_size = 1_000_000

let test ~make_transport ~imp1 ~imp2 ~state1 ~state2 ~f () =
let%bind `Reader r1, `Writer w2 = Unix.pipe (Info.of_string "rpc_test 1") in
let%bind `Reader r2, `Writer w1 = Unix.pipe (Info.of_string "rpc_test 2") in
let t1 = make_transport (r1, w1) in
let t2 = make_transport (r2, w2) in
let s imp =
if List.length imp > 0
then
Some
(Rpc.Implementations.create_exn
~implementations:imp
~on_unknown_rpc:`Close_connection)
else None
in
let s1 = s imp1 in
let s2 = s imp2 in
let conn1_ivar = Ivar.create () in
let f2_done =
Async_rpc_kernel.Rpc.Connection.with_close
?implementations:s2
t2
~dispatch_queries:(fun conn2 ->
let%bind conn1 = Ivar.read conn1_ivar in
f conn1 conn2)
~connection_state:(fun _ -> state2)
~on_handshake_error:`Raise
in
Async_rpc_kernel.Rpc.Connection.with_close
?implementations:s1
t1
~dispatch_queries:(fun conn1 ->
Ivar.fill conn1_ivar conn1;
f2_done)
~connection_state:(fun _ -> state1)
~on_handshake_error:`Raise
;;

let test1 ~make_transport ~imp ~state ~f =
test ~make_transport ~imp1:imp ~state1:state ~imp2:[] ~state2:() ~f
;;

module Pipe_count_error = struct
type t = [ `Argument_must_be_positive ] [@@deriving bin_io]
end

let pipe_count_rpc =
Rpc.Pipe_rpc.create
~name:"pipe_count"
~version:0
~bin_query:Int.bin_t
~bin_response:Int.bin_t
~bin_error:Pipe_count_error.bin_t
()
;;

let pipe_wait_rpc =
Rpc.Pipe_rpc.create
~name:"pipe_wait"
~version:0
~bin_query:Unit.bin_t
~bin_response:Unit.bin_t
~bin_error:Unit.bin_t
()
;;

let pipe_count_imp =
Rpc.Pipe_rpc.implement pipe_count_rpc (fun () n ->
if n < 0
then return (Error `Argument_must_be_positive)
else (
let pipe_r, pipe_w = Pipe.create () in
upon
(Deferred.List.iter (List.init n ~f:Fn.id) ~how:`Sequential ~f:(fun i ->
Pipe.write pipe_w i))
(fun () -> Pipe.close pipe_w);
return (Ok pipe_r)))
;;

let pipe_wait_imp ivar =
Rpc.Pipe_rpc.implement pipe_wait_rpc (fun () () ->
let pipe_r, pipe_w = Pipe.create () in
(Pipe.write pipe_w ()
>>> fun () ->
Ivar.read ivar >>> fun () -> Pipe.write pipe_w () >>> fun () -> Pipe.close pipe_w);
return (Ok pipe_r))
;;

let make_tests ~make_transport ~transport_name =
List.mapi
~f:(fun i f -> sprintf "rpc-%s-%d" transport_name i, f)
Expand Down
5 changes: 3 additions & 2 deletions src/async.ml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ open! Async_kernel

(** {2 Async_kernel} *)

include Async_kernel
(** @open *)
include Async_kernel (** @open *)

module Deferred = struct
include Deferred
Expand Down Expand Up @@ -44,3 +43,5 @@ module Expect_test_config = Expect_test_config_with_unit_expect

module Expect_test_config_with_unit_expect_or_error =
Expect_test_config_with_unit_expect_or_error

module Ppx_log_syntax = Ppx_log_syntax
2 changes: 1 addition & 1 deletion src/dune
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
(library (name async) (public_name async)
(libraries async_command async_kernel async_quickcheck async_rpc async_unix
core_kernel.core)
core_kernel.core ppx_log.types)
(preprocess (pps ppx_jane)))
5 changes: 3 additions & 2 deletions src/expect_test_config_with_unit_expect.mli
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
(** An alternative to [Async.Expect_test_config] in which [%expect] has type [unit] rather
than [unit Deferred.t]. This lets one write:
(** The default expect test config in code which has [open Async].
[%expect] has type [unit] rather than [unit Deferred.t]. This lets one write:
{[
[%expect {| |};
Expand Down
Loading

0 comments on commit 209af88

Please sign in to comment.