Skip to content

Commit

Permalink
Conference related updates (2600hz#6147)
Browse files Browse the repository at this point in the history
* Conference related updates

note when collection times out

bring the parent's call-id along

when a federated listener is spun up, it defaulted to all 0s for the
log ID, making it hard to correlate the listener to the parent
gen_listener.

Add the zone tag to the call-id of the parent to help associate the
listener but also differentiate it from other federated listeners.

* diaspora
  • Loading branch information
jamesaimonetti authored and icehess committed Nov 11, 2019
1 parent f697858 commit c3de305
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
37 changes: 26 additions & 11 deletions applications/crossbar/src/modules/cb_conferences.erl
Original file line number Diff line number Diff line change
Expand Up @@ -930,22 +930,30 @@ request_conference_details(ConferenceId) ->
{'error', _E} ->
lager:debug("unable to lookup conference details: ~p", [_E]),
kz_json:new();
{_, JObjs} -> find_conference_details(JObjs)
{'ok', JObjs} -> find_conference_details(JObjs);
{'timeout', JObjs} ->
lager:info("failed to hear from all expected nodes, using what was received"),
find_conference_details(JObjs)
end.

-spec find_conference_details(kz_json:objects()) -> kz_json:object().
find_conference_details(JObjs) ->
ValidRespones = [JObj || JObj <- JObjs, kapi_conference:search_resp_v(JObj)],
case lists:sort(fun(A, B) ->
run_time(A) > run_time(B)
end
,ValidRespones
)
of
ValidResponses = [JObj || JObj <- JObjs, kapi_conference:search_resp_v(JObj)],
case find_most_recent_conference(ValidResponses) of
'undefined' -> kz_json:new();
Latest -> Latest
end.

-spec find_most_recent_conference(kz_json:objects()) -> kz_term:api_object().
find_most_recent_conference(ValidResponses) ->
case lists:sort(fun sort_by_runtime/2, ValidResponses) of
[Latest|_] -> Latest;
_Else -> kz_json:new()
[] -> 'undefined'
end.

-spec sort_by_runtime(kz_json:object(), kz_json:object()) -> boolean().
sort_by_runtime(A, B) -> run_time(A) > run_time(B).

%%%=============================================================================
%%% Utility functions
%%%=============================================================================
Expand Down Expand Up @@ -1001,10 +1009,17 @@ search_conferences(Context) ->
lager:debug("error searching conferences for account ~s: ~p", [AccountId, _E]),
cb_context:store(Context, 'conferences', kz_json:new());
{'ok', JObjs} ->
Res = lists:foldl(fun search_conferences_fold/2, kz_json:new(), JObjs),
cb_context:store(Context, 'conferences', Res)
handle_search_resp(Context, JObjs);
{'timeout', JObjs} ->
lager:info("failed to hear from all expected nodes, using what was received"),
handle_search_resp(Context, JObjs)
end.

-spec handle_search_resp(cb_context:context(), kz_json:objects()) -> cb_context:context().
handle_search_resp(Context, JObjs) ->
Res = lists:foldl(fun search_conferences_fold/2, kz_json:new(), JObjs),
cb_context:store(Context, 'conferences', Res).

-spec search_conferences_fold(kz_json:object(), kz_json:object()) ->
kz_json:object().
search_conferences_fold(JObj, Acc) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,16 +104,16 @@ init([Node, ConferenceId, InstanceId]) ->
%%------------------------------------------------------------------------------
-spec handle_call(any(), kz_term:pid_ref(), state()) -> kz_types:handle_call_ret_state(state()).
handle_call(_Request, _From, State) ->
{reply, ok, State}.
{'reply', 'ok', State}.

%%------------------------------------------------------------------------------
%% @doc Handling cast messages.
%% @end
%%------------------------------------------------------------------------------
-spec handle_cast(any(), state()) -> kz_types:handle_cast_ret_state(state()).
handle_cast({'gen_listener',{'created_queue',_QueueName}}, State) ->
handle_cast({'gen_listener', {'created_queue', _QueueName}}, State) ->
{'noreply', State};
handle_cast({'gen_listener',{'is_consuming',_IsConsuming}}, State) ->
handle_cast({'gen_listener', {'is_consuming', _IsConsuming}}, State) ->
{'noreply', State};
handle_cast(_Msg, State) ->
{'noreply', State}.
Expand Down
11 changes: 8 additions & 3 deletions core/kazoo_amqp/src/listener_federator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@
%%------------------------------------------------------------------------------
-spec start_link(pid(), kz_term:ne_binary(), kz_term:proplist()) -> kz_types:startlink_ret().
start_link(Parent, Broker, Params) ->
gen_listener:start_link(?SERVER, Params, [Parent, Broker]).
ParentCallId = kz_log:get_callid(),
gen_listener:start_link(?SERVER, Params, [Parent, ParentCallId, Broker]).

-spec broker(kz_types:server_ref()) -> kz_term:ne_binary().
broker(Pid) ->
Expand All @@ -66,10 +67,14 @@ stop(Pid) ->
%% @end
%%------------------------------------------------------------------------------
-spec init([pid() | kz_term:ne_binary()]) -> {'ok', state()}.
init([Parent, Broker]=L) ->
lager:debug("federating listener ~p on broker ~s", L),
init([Parent, ParentCallId, Broker]=L) ->
lager:debug("federating listener ~p(~s) on broker ~s", L),
_ = kz_amqp_channel:consumer_broker(Broker),
Zone = kz_term:to_binary(kz_amqp_connections:broker_zone(Broker)),

CallId = kz_binary:join([ParentCallId, Zone], <<"-">>),
kz_log:put_callid(CallId),

{'ok', #state{parent=Parent
,broker=Broker
,zone=Zone
Expand Down

0 comments on commit c3de305

Please sign in to comment.