Skip to content

Commit

Permalink
don't block in gen_listener terminate (2600hz#6163)
Browse files Browse the repository at this point in the history
  • Loading branch information
lazedo authored Nov 17, 2019
1 parent b412a5a commit 54e63d2
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
12 changes: 8 additions & 4 deletions core/kazoo_amqp/src/gen_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -800,11 +800,15 @@ terminate(Reason, #state{module=Module
,federators=Fs
,consumer_tags=Tags
}) ->
_ = (catch(lists:foreach(fun kz_amqp_util:basic_cancel/1, Tags))),
_Terminated = (catch Module:terminate(Reason, ModuleState)),
_ = (catch kz_amqp_channel:release()),
Terminated = (catch Module:terminate(Reason, ModuleState)),
kz_amqp_assignments:release_consumer(Tags),
_ = [listener_federator:stop(F) || {_Broker, F} <- Fs],
lager:debug("~s terminated (~p): ~p", [Module, Reason, _Terminated]).
maybe_log_module_terminate(Module, Reason, Terminated).

-spec maybe_log_module_terminate(module(), any(), any()) -> 'ok'.
maybe_log_module_terminate(_Module, _Reason, 'ok') -> 'ok';
maybe_log_module_terminate(Module, Reason, Terminated) ->
lager:debug("~s terminated (~p): ~p", [Module, Reason, Terminated]).

%%------------------------------------------------------------------------------
%% @doc Convert process state when code is changed.
Expand Down
29 changes: 27 additions & 2 deletions core/kazoo_amqp/src/kz_amqp_assignments.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
-export([request_channel/2]).
-export([add_channel/3]).
-export([release/1]).
-export([release_consumer/1, release_consumer/2, release_consumer/3]).
-export([init/1
,handle_call/3
,handle_cast/2
Expand Down Expand Up @@ -717,9 +718,11 @@ handle_down_msg(Matches, _Pid, Reason) ->
lists:foreach(fun(M) -> handle_down_match(M, Reason) end, Matches).

-spec handle_down_match(down_match(), any()) -> 'ok'.
handle_down_match({'consumer', _}
handle_down_match({'consumer', #kz_amqp_assignment{consumer=Consumer}}
,'shutdown'
) -> 'ok';
) ->
Pattern = #kz_amqp_assignment{consumer=Consumer, _='_'},
release_assignments(ets:match_object(?TAB, Pattern, 1));
handle_down_match({'consumer', #kz_amqp_assignment{consumer=Consumer}=Assignment}
,_Reason
) ->
Expand Down Expand Up @@ -935,3 +938,25 @@ release_handlers({[#kz_amqp_assignment{channel=Channel}]
release_handlers(ets:match(Continuation));
release_handlers({[#kz_amqp_assignment{}], Continuation}) ->
release_handlers(ets:match(Continuation)).

-spec release_consumer(kz_term:ne_binaries()) -> 'ok'.
release_consumer(Tags) ->
case kz_amqp_channel:is_consumer_channel_valid() of
'true' -> release_consumer(self(), kz_amqp_channel:consumer_channel(), Tags);
'false' -> gen_server:cast(?SERVER, {'release_assignments', self()})
end.

-spec release_consumer(pid(), kz_term:ne_binaries()) -> 'ok'.
release_consumer(Channel, Tags) ->
case is_process_alive(Channel) of
'true' -> release_consumer(self(), Channel, Tags);
'false' -> gen_server:cast(?SERVER, {'release_assignments', self()})
end.

-spec release_consumer(pid(), pid(), kz_term:ne_binaries()) -> 'ok'.
release_consumer(Consumer, Channel, Tags) ->
amqp_channel:unregister_return_handler(Channel),
amqp_channel:unregister_confirm_handler(Channel),
amqp_channel:unregister_flow_handler(Channel),
_ = [amqp_channel:cast( Channel, #'basic.cancel'{consumer_tag=Tag}) || Tag <- Tags],
gen_server:cast(?SERVER, {'release_assignments', Consumer}).
5 changes: 4 additions & 1 deletion core/kazoo_amqp/src/kz_amqp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,10 @@ remove_consumer_pid() ->
-spec consumer_channel() -> pid() | kz_amqp_assignment() | {'error', 'timeout'}.
consumer_channel() ->
case get('$kz_amqp_consumer_channel') of
'undefined' -> kz_amqp_assignments:get_channel();
'undefined' ->
#kz_amqp_assignment{channel=Channel} = kz_amqp_assignments:get_channel(),
put('$kz_amqp_consumer_channel', Channel),
Channel;
Channel -> Channel
end.

Expand Down

0 comments on commit 54e63d2

Please sign in to comment.