Skip to content

Commit

Permalink
Merge pull request esl#3 from psytale/more_xmpp_errors
Browse files Browse the repository at this point in the history
Reduction of escalus crashes.
  • Loading branch information
dcorbacho committed Oct 2, 2014
2 parents a07db6e + cb2cbcd commit b790b91
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 55 deletions.
81 changes: 48 additions & 33 deletions src/escalus_bosh.erl
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@
terminated = false,
event_client,
client,
on_reply}).
on_reply,
timeout}).

%%%===================================================================
%%% API
Expand Down Expand Up @@ -240,27 +241,33 @@ init([Args, Owner]) ->
Port = proplists:get_value(port, Args, 5280),
Path = proplists:get_value(path, Args, <<"/http-bind">>),
Wait = proplists:get_value(bosh_wait, Args, ?DEFAULT_WAIT),
Timeout = proplists:get_value(timeout, Args, infinity),
EventClient = proplists:get_value(event_client, Args),
HostStr = host_to_list(Host),
OnReplyFun = proplists:get_value(on_reply, Args, fun(_) -> ok end),
OnConnectFun = proplists:get_value(on_connect, Args, fun(_) -> ok end),
{MS, S, MMS} = now(),
InitRid = MS * 1000000 * 1000000 + S * 1000000 + MMS,
{ok, Parser} = exml_stream:new_parser(),
{ok, Client} = fusco_cp:start_link({HostStr, Port, false},
[{on_connect, OnConnectFun}],
case fusco_cp:start_link({HostStr, Port, false},
[{on_connect, OnConnectFun},
{connect_timeout, Timeout}],
%% Max two connections as per BOSH rfc
2),
{ok, #state{owner = Owner,
url = Path,
parser = Parser,
rid = InitRid,
keepalive = proplists:get_value(keepalive, Args, true),
wait = Wait,
event_client = EventClient,
client = Client,
on_reply = OnReplyFun}}.

2) of
{ok, Client} ->
{ok, #state{owner = Owner,
url = Path,
parser = Parser,
rid = InitRid,
keepalive = proplists:get_value(keepalive, Args, true),
wait = Wait,
event_client = EventClient,
client = Client,
on_reply = OnReplyFun,
timeout = Timeout}};
_Error ->
{stop, normal}
end.

handle_call(get_transport, _From, State) ->
{reply, transport(State), State};
Expand Down Expand Up @@ -320,20 +327,25 @@ handle_cast(reset_parser, #state{parser = Parser} = State) ->


%% Handle async HTTP request replies.
handle_info({http_reply, Ref, Body, Transport}, S) ->
handle_info({http_reply, Ref, Body, Transport}, #state{owner = Owner} = S) ->
NewRequests = lists:keydelete(Ref, 1, S#state.requests),
{ok, #xmlel{attrs=Attrs} = XmlBody} = exml:parse(Body),
NS = handle_data(XmlBody, S#state{requests = NewRequests}),
NNS = case {detect_type(Attrs), NS#state.keepalive, NS#state.requests == []}
of
{streamend, _, _} -> close_requests(NS#state{terminated=true});
{_, false, _} -> NS;
{_, true, true} -> send(Transport,
empty_body(NS#state.rid, NS#state.sid),
NS);
{_, true, false} -> NS
end,
{noreply, NNS};
case exml:parse(Body) of
{ok, #xmlel{attrs=Attrs} = XmlBody} ->
NS = handle_data(XmlBody, S#state{requests = NewRequests}),
NNS = case {detect_type(Attrs), NS#state.keepalive, NS#state.requests == []}
of
{streamend, _, _} -> close_requests(NS#state{terminated=true});
{_, false, _} -> NS;
{_, true, true} -> send(Transport,
empty_body(NS#state.rid, NS#state.sid),
NS);
{_, true, false} -> NS
end,
{noreply, NNS};
_Error ->
Owner ! {error, parse_error},
{noreply, S}
end;
handle_info(_, State) ->
{noreply, State}.

Expand All @@ -351,11 +363,11 @@ code_change(_OldVsn, State, _Extra) ->
%%% Helpers
%%%===================================================================

request(#client{socket = {Client, Path}}, Body, OnReplyFun) ->
request(#client{socket = {Client, Path}}, Body, OnReplyFun, Timeout) ->
Headers = [{<<"Content-Type">>, <<"text/xml; charset=utf-8">>}],
Reply =
fusco_cp:request(Client, Path, "POST", Headers, exml:to_iolist(Body),
2, infinity),
2, Timeout),
OnReplyFun(Reply),
case Reply of
{ok, {_Status, _Headers, RBody, _Size, _Time}} ->
Expand All @@ -375,11 +387,13 @@ send(_, _, _, #state{terminated = true} = S) ->
%% Sending anything to a terminated session is pointless.
%% We leave it in its current state to pick up any pending replies.
S;
send(Transport, Body, NewRid, #state{requests = Requests, on_reply = OnReplyFun} = S) ->
send(Transport, Body, NewRid, #state{requests = Requests,
on_reply = OnReplyFun,
timeout = Timeout} = S) ->
Ref = make_ref(),
Self = self(),
AsyncReq = fun() ->
case request(Transport, Body, OnReplyFun) of
case request(Transport, Body, OnReplyFun, Timeout) of
{ok, Reply} ->
Self ! {http_reply, Ref, Reply, Transport};
_Error ->
Expand All @@ -392,8 +406,9 @@ send(Transport, Body, NewRid, #state{requests = Requests, on_reply = OnReplyFun}
sync_send(_, _, S=#state{terminated = true}) ->
%% Sending anything to a terminated session is pointless. We're done.
{ok, already_terminated, S};
sync_send(Transport, Body, S=#state{on_reply = OnReplyFun}) ->
case request(Transport, Body, OnReplyFun) of
sync_send(Transport, Body, S=#state{on_reply = OnReplyFun,
timeout = Timeout}) ->
case request(Transport, Body, OnReplyFun, Timeout) of
{ok, Reply} ->
{ok, Reply, S#state{rid = S#state.rid+1}};
Error ->
Expand Down
2 changes: 1 addition & 1 deletion src/escalus_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ connection_step(Step, {Conn, Props, Features}) ->
apply(Fun, [Conn, Props, Features])
end
catch
Error ->
_:Error ->
(Conn#client.module):stop(Conn),
throw({connection_step_failed, {Step, Conn, Props, Features}, Error})
end.
Expand Down
53 changes: 32 additions & 21 deletions src/escalus_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ init([Args, Owner]) ->
Host = proplists:get_value(host, Args, <<"localhost">>),
Port = proplists:get_value(port, Args, 5222),
EventClient = proplists:get_value(event_client, Args),
Timeout = proplists:get_value(timeout, Args, infinity),

OnReplyFun = proplists:get_value(on_reply, Args, fun(_) -> ok end),
OnRequestFun = proplists:get_value(on_request, Args, fun(_) -> ok end),
Expand All @@ -151,16 +152,20 @@ init([Args, Owner]) ->
end,

Address = host_to_inet(Host),
Opts = [binary, {active, once}],
{ok, Socket} = do_connect(Address, Port, Opts, OnConnectFun),
{ok, Parser} = exml_stream:new_parser(),
{ok, #state{owner = Owner,
socket = Socket,
parser = Parser,
sm_state = SM,
event_client = EventClient,
on_reply = OnReplyFun,
on_request = OnRequestFun}}.
Opts = [binary, {active, once}, {send_timeout, Timeout}],
case do_connect(Address, Port, Opts, OnConnectFun, Timeout) of
{ok, Socket} ->
{ok, Parser} = exml_stream:new_parser(),
{ok, #state{owner = Owner,
socket = Socket,
parser = Parser,
sm_state = SM,
event_client = EventClient,
on_reply = OnReplyFun,
on_request = OnRequestFun}};
_Error ->
{stop, normal}
end.

handle_call(get_sm_h, _From, #state{sm_state = {_, H, _}} = State) ->
{reply, H, State};
Expand Down Expand Up @@ -250,22 +255,28 @@ code_change(_OldVsn, State, _Extra) ->
handle_data(Socket, Data, #state{parser = Parser,
socket = Socket,
compress = Compress,
on_reply = OnReplyFun} = State) ->
on_reply = OnReplyFun,
owner = Owner} = State) ->
OnReplyFun({erlang:byte_size(Data)}),
{ok, NewParser, Stanzas} =
case Compress of
Parsed = case Compress of
false ->
exml_stream:parse(Parser, Data);
{zlib, {Zin,_}} ->
Decompressed = iolist_to_binary(zlib:inflate(Zin, Data)),
exml_stream:parse(Parser, Decompressed)
end,
NewState = State#state{parser = NewParser},
case State#state.active of
true ->
forward_to_owner(Stanzas, NewState);
false ->
store_reply(Stanzas, NewState)
case Parsed of
{ok, NewParser, Stanzas} ->
NewState = State#state{parser = NewParser},
case State#state.active of
true ->
forward_to_owner(Stanzas, NewState);
false ->
store_reply(Stanzas, NewState)
end;
_Error ->
Owner ! {error, parse_error},
State
end.

forward_to_owner(Stanzas0, #state{owner = Owner,
Expand Down Expand Up @@ -402,9 +413,9 @@ send_stream_end(#state{socket = Socket, ssl = Ssl, compress = Compress}) ->
gen_tcp:send(Socket, exml:to_iolist(StreamEnd))
end.

do_connect(Address, Port, Opts, OnConnectFun) ->
do_connect(Address, Port, Opts, OnConnectFun, Timeout) ->
TimeB = os:timestamp(),
Reply = gen_tcp:connect(Address, Port, Opts),
Reply = gen_tcp:connect(Address, Port, Opts, Timeout),
TimeA = os:timestamp(),
ConnectionTime = timer:now_diff(TimeA, TimeB),
case Reply of
Expand Down

0 comments on commit b790b91

Please sign in to comment.