From cc39ac060865c61db1dfb32de12cf7cc08411726 Mon Sep 17 00:00:00 2001 From: Daniel Finke Date: Thu, 4 Jul 2019 20:19:34 +0000 Subject: [PATCH] PISTON-841: performance improvements: acdc_queue_handler:presence_probe (#5916) * PISTON-841: performance improvements: acdc_queue_handler:presence_probe - filter out Usernames that are not 32 char hex binary (so can't be queue presence probes) - serialize get_account_by_realm lookup to allow cache to warm when Kamailio nodes are restarted and many probes come in due to stale presence cache in Kamailio - request queue_size from queue manager as AMQP queue size will give wrong result due to unacked msgs in consumers * PISTON-821: make fmt * PISTON-841: omit unneeded undefined clause for 32 char hex check --- .../acdc/src/acdc_presence_realm_lookup.erl | 127 ++++++++++++++++++ applications/acdc/src/acdc_queue_handler.erl | 67 +++++---- applications/acdc/src/acdc_queue_manager.erl | 7 + applications/acdc/src/acdc_sup.erl | 1 + 4 files changed, 168 insertions(+), 34 deletions(-) create mode 100644 applications/acdc/src/acdc_presence_realm_lookup.erl diff --git a/applications/acdc/src/acdc_presence_realm_lookup.erl b/applications/acdc/src/acdc_presence_realm_lookup.erl new file mode 100644 index 00000000000..d917d261f9f --- /dev/null +++ b/applications/acdc/src/acdc_presence_realm_lookup.erl @@ -0,0 +1,127 @@ +%%%----------------------------------------------------------------------------- +%%% @copyright (C) 2019-, Voxter Communications Inc +%%% @doc Serialize requests to look up an account ID by realm in order to reduce +%%% overhead when Kamailio nodes are restarted and presence probes are performed +%%% for all new registrations +%%% +%%% @end +%%% @author Daniel Finke +%%%----------------------------------------------------------------------------- +-module(acdc_presence_realm_lookup). + +-behaviour(gen_server). + +%% API +-export([start_link/0 + ,lookup/1 + ]). + +%% gen_server callbacks +-export([init/1 + ,handle_call/3 + ,handle_cast/2 + ,handle_info/2 + ,terminate/2 + ,code_change/3 + ]). + +-include("acdc.hrl"). + +-define(SERVER, ?MODULE). + +-record(state, {}). +-type state() :: #state{}. + +%%%============================================================================= +%%% API +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Starts the server +%% +%% @end +%%------------------------------------------------------------------------------ +-spec start_link() -> kz_types:startlink_ret(). +start_link() -> + gen_server:start_link({'local', ?SERVER}, ?MODULE, [], []). + +%%------------------------------------------------------------------------------ +%% @doc Look up the account ID corresponding to a given realm +%% +%% @end +%%------------------------------------------------------------------------------ +-spec lookup(kz_term:ne_binary()) -> kz_term:ne_binary() | 'not_found'. +lookup(Realm) -> + gen_server:call(?SERVER, {'lookup', Realm}). + +%%%============================================================================= +%%% gen_server callbacks +%%%============================================================================= + +%%------------------------------------------------------------------------------ +%% @doc Initializes the server +%% +%% @end +%%------------------------------------------------------------------------------ +-spec init([]) -> {'ok', state()}. +init([]) -> + {'ok', #state{}}. + +%%------------------------------------------------------------------------------ +%% @doc Handling call messages +%% +%% @end +%%------------------------------------------------------------------------------ +-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()). +handle_call({'lookup', Realm}, _, State) -> + case kapps_util:get_account_by_realm(Realm) of + {'ok', AcctDb} -> + AccountId = kz_util:format_account_id(AcctDb, 'raw'), + {'reply', AccountId, State}; + _ -> {'reply', 'not_found', State} + end; +handle_call(_Request, _From, State) -> + {'reply', 'ok', State}. + +%%------------------------------------------------------------------------------ +%% @doc Handling cast messages +%% +%% @end +%%------------------------------------------------------------------------------ +-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()). +handle_cast(_Msg, State) -> + {'noreply', State}. + +%%------------------------------------------------------------------------------ +%% @doc Handling all non call/cast messages +%% +%% @end +%%------------------------------------------------------------------------------ +-spec handle_info(any(), state()) -> kz_types:handle_info_ret_state(state()). +handle_info(_Info, State) -> + {'noreply', State}. + +%%------------------------------------------------------------------------------ +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +%% +%% @end +%%------------------------------------------------------------------------------ +-spec terminate(any(), state()) -> 'ok'. +terminate(_Reason, _State) -> + 'ok'. + +%%------------------------------------------------------------------------------ +%% @doc Convert process state when code is changed +%% +%% @end +%%------------------------------------------------------------------------------ +-spec code_change(any(), state(), any()) -> {'ok', state()}. +code_change(_OldVsn, State, _Extra) -> + {'ok', State}. + +%%%============================================================================= +%%% Internal functions +%%%============================================================================= diff --git a/applications/acdc/src/acdc_queue_handler.erl b/applications/acdc/src/acdc_queue_handler.erl index 44a6fa3b94e..9e08dc2ac0b 100644 --- a/applications/acdc/src/acdc_queue_handler.erl +++ b/applications/acdc/src/acdc_queue_handler.erl @@ -105,49 +105,48 @@ handle_queue_change(_, AccountId, QueueId, ?DOC_DELETED) -> acdc_queue_sup:stop(P) end. +%%------------------------------------------------------------------------------ +%% @doc Handle presence probes for queues by filtering out those that cannot be +%% queue probes (do not look like 32 character hex binaries) and those that do +%% not correspond to an existing account ID/queue ID pair +%% +%% @end +%%------------------------------------------------------------------------------ -spec handle_presence_probe(kz_json:object(), kz_term:proplist()) -> 'ok'. handle_presence_probe(JObj, _Props) -> 'true' = kapi_presence:probe_v(JObj), - Realm = kz_json:get_value(<<"Realm">>, JObj), - case kapps_util:get_account_by_realm(Realm) of - {'ok', AcctDb} -> - AccountId = kz_util:format_account_id(AcctDb, 'raw'), - maybe_respond_to_presence_probe(JObj, AccountId); - _ -> 'ok' + Username = kz_json:get_ne_binary_value(<<"Username">>, JObj), + case potentially_queue_presence_probe(Username) of + 'true' -> + Realm = kz_json:get_ne_binary_value(<<"Realm">>, JObj), + maybe_respond_to_presence_probe( + acdc_presence_realm_lookup:lookup(Realm) + ,Username + ); + 'false' -> 'ok' end. -maybe_respond_to_presence_probe(JObj, AcctId) -> - case kz_json:get_value(<<"Username">>, JObj) of +-spec potentially_queue_presence_probe(kz_term:api_binary()) -> boolean(). +potentially_queue_presence_probe(<<_:32/binary>>) -> 'true'; +potentially_queue_presence_probe(_) -> 'false'. + +-spec maybe_respond_to_presence_probe(kz_term:ne_binary() | 'not_found', kz_term:ne_binary()) -> 'ok'. +maybe_respond_to_presence_probe('not_found', _) -> 'ok'; +maybe_respond_to_presence_probe(AcctId, QueueId) -> + case acdc_queues_sup:find_queue_supervisor(AcctId, QueueId) of 'undefined' -> 'ok'; - QueueId -> - update_probe(JObj - ,acdc_queues_sup:find_queue_supervisor(AcctId, QueueId) - ,AcctId, QueueId - ) + QueueSup -> + Manager = acdc_queue_sup:manager(QueueSup), + update_probe(Manager, AcctId, QueueId) end. -update_probe(_JObj, 'undefined', _, _) -> 'ok'; -update_probe(JObj, _Sup, AcctId, QueueId) -> - case kapi_acdc_queue:queue_size(AcctId, QueueId) of +-spec update_probe(kz_term:api_pid(), kz_term:ne_binary(), kz_term:ne_binary()) -> 'ok'. +update_probe(Manager, AcctId, QueueId) -> + case acdc_queue_manager:queue_size(Manager) of 0 -> lager:debug("no calls in queue, ignore!"), - send_probe(JObj, ?PRESENCE_GREEN); - N when is_integer(N), N > 0 -> + acdc_util:presence_update(AcctId, QueueId, ?PRESENCE_GREEN); + N -> lager:debug("~b calls in queue, redify!", [N]), - send_probe(JObj, ?PRESENCE_RED_FLASH); - _E -> - lager:debug("unhandled size return: ~p", [_E]) + acdc_util:presence_update(AcctId, QueueId, ?PRESENCE_RED_FLASH) end. - -send_probe(JObj, State) -> - To = <<(kz_json:get_value(<<"Username">>, JObj))/binary - ,"@" - ,(kz_json:get_value(<<"Realm">>, JObj))/binary - >>, - PresenceUpdate = - [{<<"State">>, State} - ,{<<"Presence-ID">>, To} - ,{<<"Call-ID">>, kz_term:to_hex_binary(crypto:hash('md5', To))} - | kz_api:default_headers(?APP_NAME, ?APP_VERSION) - ], - kapi_presence:publish_update(PresenceUpdate). diff --git a/applications/acdc/src/acdc_queue_manager.erl b/applications/acdc/src/acdc_queue_manager.erl index e1b222c2036..2cc70844956 100644 --- a/applications/acdc/src/acdc_queue_manager.erl +++ b/applications/acdc/src/acdc_queue_manager.erl @@ -23,6 +23,7 @@ ,handle_queue_member_remove/2 ,are_agents_available/1 ,handle_config_change/2 + ,queue_size/1 ,should_ignore_member_call/3, should_ignore_member_call/4 ,up_next/2 ,config/1 @@ -241,6 +242,10 @@ handle_queue_member_remove(JObj, Prop) -> handle_config_change(Srv, JObj) -> gen_listener:cast(Srv, {'update_queue_config', JObj}). +-spec queue_size(kz_types:server_ref()) -> kz_term:non_neg_integer(). +queue_size(Srv) -> + gen_listener:call(Srv, 'queue_size'). + -spec should_ignore_member_call(kz_types:server_ref(), kapps_call:call(), kz_json:object()) -> boolean(). should_ignore_member_call(Srv, Call, CallJObj) -> should_ignore_member_call(Srv @@ -332,6 +337,8 @@ init(Super, AccountId, QueueId, QueueJObj) -> %% @end %%------------------------------------------------------------------------------ -spec handle_call(any(), kz_term:pid_ref(), mgr_state()) -> kz_types:handle_call_ret_state(mgr_state()). +handle_call('queue_size', _, #state{current_member_calls=Calls}=State) -> + {'reply', length(Calls), State}; handle_call({'should_ignore_member_call', {AccountId, QueueId, CallId}=K}, _, #state{ignored_member_calls=Dict ,account_id=AccountId ,queue_id=QueueId diff --git a/applications/acdc/src/acdc_sup.erl b/applications/acdc/src/acdc_sup.erl index c9bffb5c327..e834d0f7bbb 100644 --- a/applications/acdc/src/acdc_sup.erl +++ b/applications/acdc/src/acdc_sup.erl @@ -19,6 +19,7 @@ -define(SERVER, ?MODULE). -define(CHILDREN, [?CACHE(?CACHE_NAME) + ,?WORKER('acdc_presence_realm_lookup') ,?SUPER('acdc_recordings_sup') ,?SUPER('acdc_agents_sup') ,?SUPER('acdc_queues_sup')