Skip to content

Commit

Permalink
KAZOO-543: add start_link with explicit list of queues to log into
Browse files Browse the repository at this point in the history
  • Loading branch information
James Aimonetti committed Apr 22, 2013
1 parent 8af97be commit 89c26c1
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 22 deletions.
4 changes: 3 additions & 1 deletion whistle_apps/apps/acdc/src/acdc_agent.erl
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ start_link(Supervisor, AgentJObj) ->
start_link(Supervisor, AgentJObj, AcctId, AgentId, Queues).
start_link(Supervisor, AgentJObj, AcctId, AgentId, Queues) ->
case acdc_util:agent_status(AcctId, AgentId) of
<<"logout">> -> {'error', 'logged_out'};
<<"logout">> ->
lager:debug("agent ~s in ~s is logged out, ignoring", [AgentId, AcctId]),
{'error', 'logged_out'};
_S ->
lager:debug("start bindings for ~s(~s) in ~s", [AcctId, AgentId, _S]),
gen_listener:start_link(?MODULE
Expand Down
44 changes: 26 additions & 18 deletions whistle_apps/apps/acdc/src/acdc_agent_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
-include_lib("eunit/include/eunit.hrl").

%% API
-export([start_link/2, start_link/3, start_link/4
-export([start_link/2, start_link/3, start_link/4, start_link/5
,call_event/4
,member_connect_req/2
,member_connect_win/2
Expand Down Expand Up @@ -262,23 +262,26 @@ status(FSM) -> gen_fsm:sync_send_event(FSM, 'status').
-spec start_link(ne_binary(), ne_binary(), pid(), wh_proplist()) -> startlink_ret().

start_link(Supervisor, AgentJObj) when is_pid(Supervisor) ->
start_link(wh_json:get_value(<<"pvt_account_id">>, AgentJObj)
,wh_json:get_value(<<"_id">>, AgentJObj)
,Supervisor
,[]
,'false'
).
pvt_start_link(wh_json:get_value(<<"pvt_account_id">>, AgentJObj)
,wh_json:get_value(<<"_id">>, AgentJObj)
,Supervisor
,[]
,'false'
).
start_link(Supervisor, ThiefCall, _QueueId) ->
start_link(whapps_call:account_id(ThiefCall)
,whapps_call:owner_id(ThiefCall)
,Supervisor
,[]
,'true'
).
pvt_start_link(whapps_call:account_id(ThiefCall)
,whapps_call:owner_id(ThiefCall)
,Supervisor
,[]
,'true'
).
start_link(AcctId, AgentId, Supervisor, Props) ->
start_link(AcctId, AgentId, Supervisor, Props, 'false').
pvt_start_link(AcctId, AgentId, Supervisor, Props, 'false').

start_link(AcctId, AgentId, Supervisor, Props, IsThief) ->
start_link(Supervisor, _AgentJObj, AcctId, AgentId, _Queues) ->
pvt_start_link(AcctId, AgentId, Supervisor, [], 'false').

pvt_start_link(AcctId, AgentId, Supervisor, Props, IsThief) ->
gen_fsm:start_link(?MODULE, [AcctId, AgentId, Supervisor, Props, IsThief], []).

new_endpoint(FSM, EP) ->
Expand Down Expand Up @@ -517,14 +520,21 @@ ready({'member_connect_win', JObj}, #state{agent_proc=Srv
lager:debug("trying to ring agent ~s on ~s to connect to caller in queue ~s", [AgentId, AgentCallId, QueueId]),

case get_endpoints(OrigEPs, Srv, Call, AgentId) of
{'error', 'no_endpoints'} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
acdc_agent:logout_agent(Srv),
acdc_stats:agent_inactive(AcctId, AgentId),
acdc_agent:member_connect_retry(Srv, JObj),
{'next_state', 'ready', State};
{'error', _E} ->
lager:debug("can't to take the call, skip me: ~p", [_E]),
lager:debug("can't take the call, skip me: ~p", [_E]),
acdc_agent:member_connect_retry(Srv, JObj),
{'next_state', 'ready', State#state{connect_failures=CF+1}};
{'ok', []} ->
lager:info("agent ~s has no endpoints assigned; logging agent out", [AgentId]),
acdc_agent:logout_agent(Srv),
acdc_stats:agent_inactive(AcctId, AgentId),
acdc_agent:member_connect_retry(Srv, JObj),
{'next_state', 'ready', State};
{'ok', UpdatedEPs} ->
acdc_agent:bridge_to_member(Srv, Call, JObj, UpdatedEPs, CDRUrl, RecordingUrl),
Expand Down Expand Up @@ -1618,8 +1628,6 @@ maybe_remove_endpoint(EPId, EPs, AcctId, Srv) ->
get_endpoints(OrigEPs, Srv, Call, AgentId) ->
case catch acdc_util:get_endpoints(Call, AgentId) of
[] ->
lager:debug("no endpoints for this agent, going down"),
acdc_agent:stop(Srv),
{'error', 'no_endpoints'};
[_|_]=EPs ->
AcctId = whapps_call:account_id(Call),
Expand Down
8 changes: 7 additions & 1 deletion whistle_apps/apps/acdc/src/acdc_agent_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,14 @@ handle_status_update(JObj, _Props) ->
end.

maybe_agent_queue_change(AcctId, AgentId, <<"login_queue">>, QueueId) ->
lager:debug("queue login for agent ~s into ~s", [AgentId, QueueId]),
update_agent(acdc_agents_sup:find_agent_supervisor(AcctId, AgentId)
,QueueId
,fun acdc_agent:add_acdc_queue/2
,AcctId, AgentId
);
maybe_agent_queue_change(AcctId, AgentId, <<"logout_queue">>, QueueId) ->
lager:debug("queue logout for agent ~s into ~s", [AgentId, QueueId]),
update_agent(acdc_agents_sup:find_agent_supervisor(AcctId, AgentId)
,QueueId
,fun acdc_agent:rm_acdc_queue/2
Expand All @@ -71,14 +73,18 @@ maybe_agent_queue_change(_AcctId, _AgentId, _Evt, _QueueId) ->
lager:debug("unhandled evt: ~s for ~s", [_Evt, _QueueId]).

update_agent('undefined', QueueId, _F, AcctId, AgentId) ->
lager:debug("new agent process needs starting"),
{'ok', AgentJObj} = couch_mgr:open_cache_doc(wh_util:format_account_id(AcctId, 'encoded')
,AgentId
),
lager:debug("agent loaded"),
acdc_stats:agent_active(AcctId, AgentId),
acdc_agents_sup:new(AcctId, AgentId, AgentJObj, [QueueId]);
update_agent(Super, Q, F, _, _) when is_pid(Super) ->
lager:debug("agent super ~p", [Super]),
F(acdc_agent_sup:agent(Super), Q).

update_agent('undefined', _QueueId, _F) -> 'ok';
update_agent('undefined', _QueueId, _F) -> lager:debug("agent's supervisor not around, ignoring for queue ~s", [_QueueId]);
update_agent(Super, Q, F) when is_pid(Super) ->
F(acdc_agent_sup:agent(Super), Q).

Expand Down
5 changes: 3 additions & 2 deletions whistle_apps/apps/acdc/src/acdc_agent_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
-include("acdc.hrl").

%% API
-export([start_link/1, start_link/2
-export([start_link/1, start_link/2, start_link/4
,agent/1
,fsm/1, start_fsm/3, start_fsm/4
,stop/1
Expand Down Expand Up @@ -41,8 +41,9 @@
-spec start_link(wh_json:object()) -> startlink_ret().
-spec start_link(whapps_call:call(), ne_binary()) -> startlink_ret().
start_link(AgentJObj) -> supervisor:start_link(?MODULE, [AgentJObj]).

start_link(ThiefCall, QueueId) -> supervisor:start_link(?MODULE, [ThiefCall, QueueId]).
start_link(AcctId, AgentId, AgentJObj, Queues) ->
supervisor:start_link(?MODULE, [AcctId, AgentId, AgentJObj, Queues]).

-spec stop(pid()) -> 'ok' | {'error', 'not_found'}.
stop(Supervisor) -> supervisor:terminate_child('acdc_agents_sup', Supervisor).
Expand Down

0 comments on commit 89c26c1

Please sign in to comment.