Skip to content
This repository has been archived by the owner on Mar 21, 2023. It is now read-only.

Commit

Permalink
More fixes to pipelining
Browse files Browse the repository at this point in the history
  • Loading branch information
cmullaparthi committed Aug 8, 2014
1 parent 502f73b commit ba6652f
Showing 1 changed file with 90 additions and 42 deletions.
132 changes: 90 additions & 42 deletions src/ibrowse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ stop() ->
%% respHeader() = {headerName(), headerValue()}
%% headerName() = string()
%% headerValue() = string()
%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
%% response() = {ok, Status, ResponseHeaders, ResponseBody} | {ibrowse_req_id, req_id() } | {error, Reason}
%% req_id() = term()
%% ResponseBody = string() | {file, Filename}
%% Reason = term()
Expand Down Expand Up @@ -252,6 +252,11 @@ send_req(Url, Headers, Method, Body) ->
%% headers. Not quite sure why someone would want this, but one of my
%% users asked for it, so here it is. </li>
%%
%% <li> The <code>preserve_status_line</code> option is to get the raw status line as a custom header
%% in the response. The status line is returned as a tuple {ibrowse_status_line, Status_line_binary}
%% If both the <code>give_raw_headers</code> and <code>preserve_status_line</code> are specified
%% in a request, only the <code>give_raw_headers</code> is honoured. </li>
%%
%% <li> The <code>preserve_chunked_encoding</code> option enables the caller
%% to receive the raw data stream when the Transfer-Encoding of the server
%% response is Chunked.
Expand Down Expand Up @@ -336,7 +341,7 @@ send_req(Url, Headers, Method, Body, Options, Timeout) ->
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, 0);
Headers, Method, Body, Options_1, Timeout, Timeout, os:timestamp(), 0);
Err ->
{error, {url_parsing_failed, Err}}
end.
Expand All @@ -345,29 +350,41 @@ try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Try_count) when Try_count < 3 ->
Headers, Method, Body, Options_1, Timeout,
Ori_timeout, Req_start_time, Try_count) when Try_count =< 3 ->
ProcessOptions = get_value(worker_process_options, Options_1, []),
case ibrowse_lb:spawn_connection(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
ProcessOptions) of
{ok, Conn_Pid} ->
{ok, {_Pid_cur_spec_size, _, Conn_Pid}} ->
case do_send_req(Conn_Pid, Parsed_url, Headers,
Method, Body, Options_1, Timeout) of
{error, sel_conn_closed} ->
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1, Timeout, Try_count + 1);
Time_now = os:timestamp(),
Time_taken_so_far = trunc(round(timer:now_diff(Time_now, Req_start_time)/1000)),
Time_remaining = Ori_timeout - Time_taken_so_far,
Time_remaining_percent = trunc(round((Time_remaining/Ori_timeout)*100)),
%% io:format("~p -- Time_remaining: ~p (~p%)~n", [self(), Time_remaining, Time_remaining_percent]),
case (Time_remaining > 0) andalso (Time_remaining_percent >= 5) of
true ->
try_routing_request(Lb_pid, Parsed_url,
Max_sessions,
Max_pipeline_size,
{SSLOptions, IsSSL},
Headers, Method, Body, Options_1,
Time_remaining, Ori_timeout, Req_start_time, Try_count + 1);
false ->
{error, retry_later}
end;
Res ->
Res
end;
Err ->
Err
end;
try_routing_request(_, _, _, _, _, _, _, _, _, _, _) ->
try_routing_request(_, _, _, _, _, _, _, _, _, _, _, _, _) ->
{error, retry_later}.

merge_options(Host, Port, Options) ->
Expand Down Expand Up @@ -441,14 +458,29 @@ do_send_req(Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout) ->
Headers, Method, ensure_bin(Body),
Options, Timeout) of
{'EXIT', {timeout, _}} ->
P_info = case catch erlang:process_info(Conn_Pid, [messages, message_queue_len, backtrace]) of
[_|_] = Conn_Pid_info_list ->
Conn_Pid_info_list;
_ ->
process_info_not_available
end,
(catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call timeout.~nProcess info: ~p~n",
[[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], P_info])),
{error, req_timedout};
{'EXIT', {noproc, {gen_server, call, [Conn_Pid, _, _]}}} ->
{error, sel_conn_closed};
{'EXIT', {normal, _}} ->
{error, sel_conn_closed};
{'EXIT', {connection_closed, _}} ->
{'EXIT', {normal, _}} = Ex_rsn ->
(catch lager:error("{ibrowse_http_client, send_req, ~1000.p} gen_server call got ~1000.p~n",
[[Conn_Pid, Parsed_url, Headers, Method, Body, Options, Timeout], Ex_rsn])),
{error, req_timedout};
{error, X} when X == connection_closed;
X == {send_failed, {error, enotconn}};
X == {send_failed,{error,einval}};
X == {send_failed,{error,closed}};
X == connection_closing;
((X == connection_closed_no_retry) andalso ((Method == get) orelse (Method == head))) ->
{error, sel_conn_closed};
{error, connection_closed} ->
{error, connection_closed_no_retry} ->
{error, connection_closed};
{error, {'EXIT', {noproc, _}}} ->
{error, sel_conn_closed};
{'EXIT', Reason} ->
{error, {'EXIT', Reason}};
Expand Down Expand Up @@ -637,49 +669,65 @@ show_dest_status(Url) ->
%% included.
show_dest_status(Host, Port) ->
case get_metrics(Host, Port) of
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, _},
{Last_p_sz, _}}} ->
{Lb_pid, MsgQueueSize,
Tid, Size,
{{First_p_sz, First_p_sz},
{Last_p_sz, Last_p_sz}}} ->
io:format("Load Balancer Pid : ~p~n"
"LB process msg q size : ~p~n"
"LB ETS table id : ~p~n"
"Num Connections : ~p~n"
"Smallest pipeline : ~p:~p~n"
"Largest pipeline : ~p:~p~n",
"Smallest pipeline : ~p~n"
"Largest pipeline : ~p~n",
[Lb_pid, MsgQueueSize, Tid, Size,
First_p_sz, First_p_sz,
Last_p_sz, Last_p_sz
]);
First_p_sz, Last_p_sz]);
_Err ->
io:format("Metrics not available~n", [])
end.

get_metrics() ->
[get_metrics(Host, Port) || #lb_pid{host_port = {Host, Port}} <-
ets:tab2list(ibrowse_lb),
is_list(Host),
is_integer(Port)].
Dests = lists:filter(
fun(#lb_pid{host_port = {Host, Port}}) when is_list(Host),
is_integer(Port) ->
true;
(_) ->
false
end, ets:tab2list(ibrowse_lb)),
lists:foldl(
fun(#lb_pid{host_port = {X_host, X_port}}, X_acc) ->
case get_metrics(X_host, X_port) of
{_, _, _, _, _} = X_res ->
[X_res | X_acc];
_X_res ->
X_acc
end
end, [], Dests).

get_metrics(Host, Port) ->
case ets:lookup(ibrowse_lb, {Host, Port}) of
[] ->
no_active_processes;
[#lb_pid{pid = Lb_pid, ets_tid = Tid}] ->
MsgQueueSize = (catch process_info(Lb_pid, message_queue_len)),
try
Size = ets:info(Tid, size),
case Size of
0 ->
ok;
_ ->
{First_p_sz, _} = ets:first(Tid),
{Last_p_sz, _} = ets:last(Tid),
{Lb_pid, MsgQueueSize, Tid, Size,
{{First_p_sz, First_p_sz},
{Last_p_sz, Last_p_sz}}}
end
catch _:_ ->
not_available
case Tid of
undefined ->
{Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
_ ->
try
Size = ets:info(Tid, size),
case Size of
0 ->
{Lb_pid, MsgQueueSize, undefined, 0, {{0, 0}, {0, 0}}};
_ ->
{First_p_sz, _, _} = ets:first(Tid),
{Last_p_sz, _, _} = ets:last(Tid),
{Lb_pid, MsgQueueSize,
Tid, Size,
{{First_p_sz, First_p_sz}, {Last_p_sz, Last_p_sz}}}
end
catch _:_Err ->
not_available
end
end
end.

Expand Down

0 comments on commit ba6652f

Please sign in to comment.