Skip to content


PISTON-841: performance improvements: acdc_queue_handler:presence_pro…
Browse files Browse the repository at this point in the history
…be (2600hz#5916)

* PISTON-841: performance improvements: acdc_queue_handler:presence_probe

    - filter out Usernames that are not 32 char hex binary (so can't be
      queue presence probes)
    - serialize get_account_by_realm lookup to allow cache to warm when
      Kamailio nodes are restarted and many probes come in due to stale
      presence cache in Kamailio
    - request queue_size from queue manager as AMQP queue size will give
      wrong result due to unacked msgs in consumers

* PISTON-821: make fmt

* PISTON-841: omit unneeded undefined clause for 32 char hex check
  • Loading branch information
danielfinke authored and jamesaimonetti committed Jul 18, 2019
1 parent a6db64d commit cc39ac0
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 34 deletions.
127 changes: 127 additions & 0 deletions applications/acdc/src/acdc_presence_realm_lookup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
%%% @copyright (C) 2019-, Voxter Communications Inc
%%% @doc Serialize requests to look up an account ID by realm in order to reduce
%%% overhead when Kamailio nodes are restarted and presence probes are performed
%%% for all new registrations
%%% @end
%%% @author Daniel Finke


%% API

%% gen_server callbacks


-define(SERVER, ?MODULE).

-record(state, {}).
-type state() :: #state{}.

%%% API

%% @doc Starts the server
%% @end
-spec start_link() -> kz_types:startlink_ret().
start_link() ->
gen_server:start_link({'local', ?SERVER}, ?MODULE, [], []).

%% @doc Look up the account ID corresponding to a given realm
%% @end
-spec lookup(kz_term:ne_binary()) -> kz_term:ne_binary() | 'not_found'.
lookup(Realm) ->
gen_server:call(?SERVER, {'lookup', Realm}).

%%% gen_server callbacks

%% @doc Initializes the server
%% @end
-spec init([]) -> {'ok', state()}.
init([]) ->
{'ok', #state{}}.

%% @doc Handling call messages
%% @end
-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call({'lookup', Realm}, _, State) ->
case kapps_util:get_account_by_realm(Realm) of
{'ok', AcctDb} ->
AccountId = kz_util:format_account_id(AcctDb, 'raw'),
{'reply', AccountId, State};
_ -> {'reply', 'not_found', State}
handle_call(_Request, _From, State) ->
{'reply', 'ok', State}.

%% @doc Handling cast messages
%% @end
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast(_Msg, State) ->
{'noreply', State}.

%% @doc Handling all non call/cast messages
%% @end
-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()).
handle_info(_Info, State) ->
{'noreply', State}.

%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
%% @end
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, _State) ->

%% @doc Convert process state when code is changed
%% @end
-spec code_change(any(), state(), any()) -> {'ok', state()}.
code_change(_OldVsn, State, _Extra) ->
{'ok', State}.

%%% Internal functions
67 changes: 33 additions & 34 deletions applications/acdc/src/acdc_queue_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,49 +105,48 @@ handle_queue_change(_, AccountId, QueueId, ?DOC_DELETED) ->

%% @doc Handle presence probes for queues by filtering out those that cannot be
%% queue probes (do not look like 32 character hex binaries) and those that do
%% not correspond to an existing account ID/queue ID pair
%% @end
-spec handle_presence_probe(kz_json:object(), kz_term:proplist()) -> 'ok'.
handle_presence_probe(JObj, _Props) ->
'true' = kapi_presence:probe_v(JObj),
Realm = kz_json:get_value(<<"Realm">>, JObj),
case kapps_util:get_account_by_realm(Realm) of
{'ok', AcctDb} ->
AccountId = kz_util:format_account_id(AcctDb, 'raw'),
maybe_respond_to_presence_probe(JObj, AccountId);
_ -> 'ok'
Username = kz_json:get_ne_binary_value(<<"Username">>, JObj),
case potentially_queue_presence_probe(Username) of
'true' ->
Realm = kz_json:get_ne_binary_value(<<"Realm">>, JObj),
'false' -> 'ok'

maybe_respond_to_presence_probe(JObj, AcctId) ->
case kz_json:get_value(<<"Username">>, JObj) of
-spec potentially_queue_presence_probe(kz_term:api_binary()) -> boolean().
potentially_queue_presence_probe(<<_:32/binary>>) -> 'true';
potentially_queue_presence_probe(_) -> 'false'.

-spec maybe_respond_to_presence_probe(kz_term:ne_binary() | 'not_found', kz_term:ne_binary()) -> 'ok'.
maybe_respond_to_presence_probe('not_found', _) -> 'ok';
maybe_respond_to_presence_probe(AcctId, QueueId) ->
case acdc_queues_sup:find_queue_supervisor(AcctId, QueueId) of
'undefined' -> 'ok';
QueueId ->
,acdc_queues_sup:find_queue_supervisor(AcctId, QueueId)
,AcctId, QueueId
QueueSup ->
Manager = acdc_queue_sup:manager(QueueSup),
update_probe(Manager, AcctId, QueueId)

update_probe(_JObj, 'undefined', _, _) -> 'ok';
update_probe(JObj, _Sup, AcctId, QueueId) ->
case kapi_acdc_queue:queue_size(AcctId, QueueId) of
-spec update_probe(kz_term:api_pid(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'.
update_probe(Manager, AcctId, QueueId) ->
case acdc_queue_manager:queue_size(Manager) of
0 ->
lager:debug("no calls in queue, ignore!"),
send_probe(JObj, ?PRESENCE_GREEN);
N when is_integer(N), N > 0 ->
acdc_util:presence_update(AcctId, QueueId, ?PRESENCE_GREEN);
N ->
lager:debug("~b calls in queue, redify!", [N]),
send_probe(JObj, ?PRESENCE_RED_FLASH);
_E ->
lager:debug("unhandled size return: ~p", [_E])
acdc_util:presence_update(AcctId, QueueId, ?PRESENCE_RED_FLASH)

send_probe(JObj, State) ->
To = <<(kz_json:get_value(<<"Username">>, JObj))/binary
,(kz_json:get_value(<<"Realm">>, JObj))/binary
PresenceUpdate =
[{<<"State">>, State}
,{<<"Presence-ID">>, To}
,{<<"Call-ID">>, kz_term:to_hex_binary(crypto:hash('md5', To))}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
7 changes: 7 additions & 0 deletions applications/acdc/src/acdc_queue_manager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
,should_ignore_member_call/3, should_ignore_member_call/4
Expand Down Expand Up @@ -241,6 +242,10 @@ handle_queue_member_remove(JObj, Prop) ->
handle_config_change(Srv, JObj) ->
gen_listener:cast(Srv, {'update_queue_config', JObj}).

-spec queue_size(kz_types:server_ref()) -> kz_term:non_neg_integer().
queue_size(Srv) ->
gen_listener:call(Srv, 'queue_size').

-spec should_ignore_member_call(kz_types:server_ref(), kapps_call:call(), kz_json:object()) -> boolean().
should_ignore_member_call(Srv, Call, CallJObj) ->
Expand Down Expand Up @@ -332,6 +337,8 @@ init(Super, AccountId, QueueId, QueueJObj) ->
%% @end
-spec handle_call(any(), kz_term:pid_ref(), mgr_state()) -> kz_types:handle_call_ret_state(mgr_state()).
handle_call('queue_size', _, #state{current_member_calls=Calls}=State) ->
{'reply', length(Calls), State};
handle_call({'should_ignore_member_call', {AccountId, QueueId, CallId}=K}, _, #state{ignored_member_calls=Dict
Expand Down
1 change: 1 addition & 0 deletions applications/acdc/src/acdc_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
-define(SERVER, ?MODULE).

Expand Down

0 comments on commit cc39ac0

Please sign in to comment.