Skip to content


Listener federators and memory improvements (2600hz#6298)
Browse files Browse the repository at this point in the history
* Listener federators and memory improvements

Prior, when a gen_listener needed to federate its queue and bindings
to other zones, it would start_link the listener_federator (and trap
exits). However, should a listener_federator process actually exit,
the parent gen_listener would receive the EXIT message but failed to
have a handle_info clause to restart the process.

Introduce a sofo supervisor for starting listener_federator processes
and restart them if they die unexpectedly (non-normal exits).

Secondly, introduce memory consumption checks and GC gen_listeners
when they exceed 100K in total_heap_size. In addition, copy the binary
payloads as they arrive off the AMQP channel has shown to decelerate
the memory growth as the original binary can be garbage collected
sooner and not count against the gen_listener's "old" heap.

All told, these manual GC runs appear to be minimal and only affect
long-running and busy gen_listeners; even fewer runs needed to release
memory after the binary:copy/1 introduction.

* just ok

* handle state transition so Payload can be throw out sooner

* right state

* fun police

* monitor parent and terminate if parent dies
  • Loading branch information
jamesaimonetti authored Feb 10, 2020
1 parent 3b0f623 commit dc29520
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 76 deletions.
190 changes: 136 additions & 54 deletions core/kazoo_amqp/src/gen_listener.erl
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@

Expand Down Expand Up @@ -105,9 +106,10 @@


Expand All @@ -122,7 +124,7 @@

-type module_state() :: any().

-type federator_listener() :: {kz_term:ne_binary(), pid()}.
-type federator_listener() :: {kz_term:ne_binary(), {pid(), reference()}}.
-type federator_listeners() :: [federator_listener()].

-record(state, {queue :: kz_term:api_binary()
Expand Down Expand Up @@ -241,6 +243,14 @@ start_link(Name, Module, Params, InitArgs, Options) when is_atom(Module),
gen_server:start_link(Name, ?MODULE, [Module, Params, InitArgs], Options).

-spec stop(kz_types:server_ref()) -> 'ok'.
stop(Server) ->

-spec start_listener(pid(), kz_term:proplist()) -> 'ok'.
start_listener(Srv, Params) ->
gen_server:cast(Srv, {'start_listener', Params}).

-spec queue_name(kz_types:server_ref()) -> kz_term:api_ne_binary().
queue_name(Srv) -> gen_server:call(Srv, 'queue_name').

Expand Down Expand Up @@ -391,6 +401,10 @@ rm_binding(Srv, Binding, Props) ->
federated_event(Srv, JObj, Props) ->
gen_server:cast(Srv, {'federated_event', JObj, Props}).

-spec notify_of_federator_listener(pid(), {kz_term:ne_binary(), pid()}) -> 'ok'.
notify_of_federator_listener(Srv, {_Broker, _Pid}=Child) ->
gen_server:cast(Srv, {'federator_listener', Child}).

-spec execute(kz_types:server_ref(), module(), atom(), [any()]) -> 'ok'.
execute(Srv, Module, Function, Args) ->
gen_server:cast(Srv, {'$execute', Module, Function, Args}).
Expand Down Expand Up @@ -535,8 +549,9 @@ handle_cast({'federated_event', JObj, Props}, #state{params=Params}=State) ->
Deliver = props:get_value('deliver', Props),
Basic = props:get_value('basic', Props),
case props:is_true('spawn_handle_event', Params, 'false') of
'true' -> kz_process:spawn(fun distribute_event/3, [JObj, {Deliver, Basic}, State]),
{'noreply', State};
'true' ->
kz_process:spawn(fun distribute_event/3, [JObj, {Deliver, Basic}, State]),
{'noreply', State};
'false' -> distribute_event(JObj, {Deliver, Basic}, State)
handle_cast({'$execute', Module, Function, Args}
Expand All @@ -559,23 +574,23 @@ handle_cast({'$execute', Module, Function, Args}=Msg
) ->
erlang:apply(Module, Function, Args),
_ = [?MODULE:cast(Federator, Msg)
|| {_Broker, Federator} <- Federators
|| {_Broker, {Federator, _Ref}} <- Federators
{'noreply', State};
handle_cast({'$execute', Function, Args}=Msg
) ->
erlang:apply(Function, Args),
_ = [?MODULE:cast(Federator, Msg)
|| {_Broker, Federator} <- Federators
|| {_Broker, {Federator, _Ref}} <- Federators
{'noreply', State};
handle_cast({'$execute', Function}=Msg
) ->
_ = [?MODULE:cast(Federator, Msg)
|| {_Broker, Federator} <- Federators
|| {_Broker, {Federator, _Ref}} <- Federators
{'noreply', State};
handle_cast({'$client_cast', Message}, State) ->
Expand All @@ -601,38 +616,48 @@ handle_cast({'pause_consumers'}, #state{is_consuming='true', consumer_tags=Tags}
{'noreply', State};
handle_cast({'resume_consumers'}, #state{queue='undefined'}=State) ->
{'noreply', State};
handle_cast({'resume_consumers'}, #state{is_consuming='false'
}=State) ->
) ->
ConsumeOptions = props:get_value('consume_options', Params, []),
_ = start_consumer(Q, maybe_configure_auto_ack(ConsumeOptions, AutoAck)),
_ = [start_consumer(Q1, maybe_configure_auto_ack(props:get_value('consume_options', P, []), AutoAck))
|| {Q1, {_, P}} <- OtherQueues
{'noreply', State};
handle_cast({'federator_is_consuming', Broker, 'true'}, State) ->
lager:info("federator for ~p is consuming, waiting on: ~p", [Broker, State#state.waiting_federators]),

Filter = fun(X) ->
kz_amqp_connections:broker_available_connections(X) > 0

Waiting = lists:filter(Filter, State#state.waiting_federators),

lager:info("available waiting brokers: ~p", [Waiting]),

case lists:subtract(Waiting, [Broker]) of
[] ->
lager:info("all waiting federators are available!"),
lager:info("federator for ~s is consuming, all waiting federators are available!", [Broker]),
handle_module_cast({?MODULE, {'federators_consuming', 'true'}}, State);

Remaining ->
lager:info("still waiting for federators: ~p", [Remaining]),
lager:info("federator for ~s is consuming, still waiting for federators: ~p", [Broker, Remaining]),
{'noreply', State#state{waiting_federators = Remaining}}
handle_cast({'federator_listener', {Broker, Pid}}, #state{federators=Fs}=State) ->
case lists:keytake(Pid, 2, Fs) of
'false' ->
Ref = monitor('process', Pid),
lager:info("federator ~p(~p) on broker ~s started", [Pid, Ref, Broker]),

{'noreply', State#state{federators=[{Broker, {Pid, Ref}} | Fs]}};
{'value', _FL, _Feds} ->
lager:debug("ignoring federator_listener message about ~p; have ~p already"
,[Pid, _FL]
{'noreply', State}
handle_cast(Message, State) ->
handle_module_cast(Message, State).

Expand Down Expand Up @@ -681,14 +706,18 @@ handle_info({#'basic.deliver'{}=BD
,#state{params=Params, auto_ack=AutoAck}=State
) ->

_ = case AutoAck of
'true' -> (catch kz_amqp_util:basic_ack(BD));
'false' -> 'ok'
case props:is_true('spawn_handle_event', Params, 'false') of
'false' -> handle_event(Payload, {CT, CE}, {BD, Basic}, State);
'true' -> kz_process:spawn(fun handle_event/4, [Payload, {CT, CE}, {BD, Basic}, State]),
{'noreply', State}
'false' ->
Reply = handle_event(binary:copy(Payload), {CT, CE}, {BD, Basic}, State),
'true' ->
kz_process:spawn(fun handle_event/4, [Payload, {CT, CE}, {BD, Basic}, State]),
{'noreply', State}
Expand All @@ -699,7 +728,7 @@ handle_info({#'basic.return'{}=BR
) ->
handle_return(Payload, {CT, CE}, BR, State);
handle_return(binary:copy(Payload), {CT, CE}, BR, State);
) ->
Expand Down Expand Up @@ -741,6 +770,8 @@ handle_info({'kz_amqp_channel', {Client, Ref, 'consumer_tags'}}, #state{consumer
{'noreply', State};
handle_info(?CALLBACK_TIMEOUT_MSG, State) ->
handle_callback_info('timeout', State);
handle_info({'DOWN', Ref, 'process', Pid, Reason}, State) ->
handle_down({Pid, Ref}, Reason, State);
handle_info(Message, State) ->
handle_callback_info(Message, State).

Expand Down Expand Up @@ -824,7 +855,7 @@ terminate(Reason, #state{module=Module
}) ->
Terminated = (catch Module:terminate(Reason, ModuleState)),
_ = [listener_federator:stop(F) || {_Broker, F} <- Fs],
_ = [listener_federator:stop(F) || {_Broker, {F, _Ref}} <- Fs],
maybe_log_module_terminate(Module, Reason, Terminated).

-spec maybe_log_module_terminate(module(), any(), any()) -> 'ok'.
Expand All @@ -844,6 +875,18 @@ code_change(_OldVersion, State, _Extra) ->
%%% Internal functions

%% @doc
%% @end
handle_down({Pid, Ref}, Reason, #state{federators=Fs}=State) ->
case lists:keytake({Pid, Ref}, 2, Fs) of
'false' -> handle_callback_info({'DOWN', Ref, 'process', Pid, Reason}, State);
{'value', {_Broker, _PidRef}, UpdatedFs} ->
lager:warning("federator ~p to ~s down: ~p", [_PidRef, _Broker, Reason]),
{'noreply', State#state{federators=UpdatedFs}}

%% @doc
%% @end
Expand Down Expand Up @@ -1176,14 +1219,18 @@ handle_module_call(Request, From, #state{module=Module
_ = stop_timer(OldRef),
try Module:handle_call(Request, From, ModuleState) of
{'reply', Reply, ModuleState1} ->
{'reply', Reply
{'reply', Reply, ModuleState1, 'hibernate'} ->
{'reply', Reply
{'reply', Reply, ModuleState1, Timeout} ->
Expand Down Expand Up @@ -1306,39 +1353,44 @@ is_federated_binding(Props) ->
props:get_value('federate', Props) =:= 'true'.

-spec update_federated_bindings(state()) -> state().
update_federated_bindings(#state{}=State) ->
update_federated_bindings(State, kz_amqp_connections:federated_brokers()).

update_federated_bindings(#state{bindings=[{Binding, _Props}|_]}=State, []) ->
lager:debug("no federated brokers to connect to, skipping federating binding '~s'", [Binding]),
update_federated_bindings(#state{bindings=[{Binding, Props}|_]
}=State) ->
case kz_amqp_connections:federated_brokers() of
[] ->
lager:debug("no federated brokers to connect to, skipping federating binding '~s'", [Binding]),
FederatedBrokers ->
NonFederatedProps = props:delete('federate', Props),
{_Existing, New} = broker_connections(Fs, FederatedBrokers),
'ok' = update_existing_listeners_bindings(Fs, Binding, NonFederatedProps),
{'ok', NewListeners} = start_new_listeners(New, Binding, NonFederatedProps, State),
State#state{federators=NewListeners ++ Fs, waiting_federators=New ++ State#state.waiting_federators}
) ->
NonFederatedProps = props:delete('federate', Props),
{_Existing, New} = broker_connections(Fs, FederatedBrokers),
'ok' = update_existing_listeners_bindings(Fs, Binding, NonFederatedProps),
{'ok', NewListeners} = start_new_listeners(New, Binding, NonFederatedProps, State),
State#state{federators=NewListeners ++ Fs
,waiting_federators=New ++ State#state.waiting_federators

-spec maybe_remove_federated_binding(binding(), kz_term:proplist(), state()) -> 'ok'.
maybe_remove_federated_binding(Binding, Props, State) ->
maybe_remove_federated_binding(is_federated_binding(Props), Binding, Props, State).

-spec maybe_remove_federated_binding(boolean(), binding(), kz_term:proplist(), state()) -> 'ok'.
maybe_remove_federated_binding('true', Binding, Props, #state{federators=Fs}) when Fs =/= [] ->
maybe_remove_federated_binding(_Flag, _Binding, _Props, #state{federators=[]}) -> 'ok';
maybe_remove_federated_binding('true', Binding, Props, #state{federators=Fs}) ->
NonFederatedProps = props:delete('federate', Props),
remove_federated_binding(Fs, Binding, NonFederatedProps);

maybe_remove_federated_binding(_Flag, _Binging, _Props, _State) ->
maybe_remove_federated_binding(_Flag, _Binging, _Props, _State) -> 'ok'.

-spec broker_connections(federator_listeners(), kz_term:ne_binaries()) ->
{kz_term:ne_binaries(), kz_term:ne_binaries()}.
broker_connections(Listeners, Brokers) ->
lists:partition(fun(Broker) ->
props:get_value(Broker, Listeners) =/= 'undefined'
end, Brokers).

-spec start_new_listeners(kz_term:ne_binaries(), binding_module(), kz_term:proplist(), state()) ->
{'ok', federator_listeners()}.
Expand All @@ -1350,9 +1402,10 @@ start_new_listeners(Brokers, Binding, Props, State) ->
-spec start_new_listener(kz_term:ne_binary(), binding_module(), kz_term:proplist(), state()) -> federator_listener().
start_new_listener(Broker, Binding, Props, #state{params=Ps}) ->
FederateParams = create_federated_params({Binding, Props}, Ps),
{'ok', Pid} = listener_federator:start_link(self(), Broker, FederateParams),
lager:debug("started federated listener on broker ~s: ~p", [Broker, Pid]),
{Broker, Pid}.
{'ok', Pid} = kz_amqp_federated_listeners_sup:start_child(self(), Broker, FederateParams),
Ref = monitor('process', Pid),
lager:debug("started federated listener on broker ~s: ~p(~p)", [Broker, Pid, Ref]),
{Broker, {Pid, Ref}}.

-spec remove_federated_binding(federator_listeners(), binding_module(), kz_term:proplist()) -> 'ok'.
remove_federated_binding(Listeners, Binding, Props) ->
Expand All @@ -1369,7 +1422,7 @@ update_existing_listeners_bindings(Listeners, Binding, Props) ->

-spec update_existing_listener_bindings(federator_listener(), binding_module(), kz_term:proplist()) -> 'ok'.
update_existing_listener_bindings({_Broker, Pid}, Binding, Props) ->
update_existing_listener_bindings({_Broker, {Pid, _Ref}}, Binding, Props) ->
lager:debug("updating listener ~p with ~s", [Pid, Binding]),
?MODULE:add_binding(Pid, Binding, Props).

Expand Down Expand Up @@ -1535,10 +1588,39 @@ maybe_add_broker_connection(Broker, Count) when Count =:= 0 ->
maybe_add_broker_connection(Broker, _Count) ->
kz_amqp_channel:requisition(self(), Broker).

-spec start_listener(pid(), kz_term:proplist()) -> 'ok'.
start_listener(Srv, Params) ->
gen_server:cast(Srv, {'start_listener', Params}).

maybe_configure_auto_ack(Props, 'false') -> Props;
maybe_configure_auto_ack(Props, 'true') ->
[{'no_ack', 'false'} | props:delete('no_ack', Props)].

%% @doc force GC of process if total_heap_size exceeds 10K
%% In production, when compacting kz_amqp_workers and other
%% gen_listener processes, we see a reduction in total_heap_size to
%% about 4K on most processes.
%% Old heap is used for "long-lived" terms. One of the issues was that
%% when a binary Payload was received from AMQP, we would decode it
%% directly which causes refc binaries to be put in the gen_listener's
%% heap, keeping the binary alive. By using binary:copy/1 before
%% decoding, Payload can be freed sooner. I suspect that Payload
%% itself is a refc into the larger AMQP binary payload parsed by the
%% AMQP client library.
%% It is possible that apps may load more memory into their state
%% which would cause gen_listener's state to exceed 10K in the steady
%% state. We advise app developers to not do this :)
-spec maybe_gc(Reply) -> Reply.
maybe_gc(Reply) ->
maybe_gc(process_info(self(), ['total_heap_size'])
,100 * ?BYTES_K

maybe_gc([{'total_heap_size', Words}], MaxBytes) ->
maybe_gc(kz_term:words_to_bytes(Words), MaxBytes);
maybe_gc(Heap, Max) when Heap > Max ->
[{'total_heap_size', NewHeapWords}] = process_info(self(), ['total_heap_size']),
NewHeap = kz_term:words_to_bytes(NewHeapWords),
lager:debug("new heap size ~p (delta ~p)", [NewHeap, Heap-NewHeap]);
maybe_gc(_Heap, _Max) -> 'ok'.

0 comments on commit dc29520

Please sign in to comment.