Skip to content

Commit

Permalink
fax storage and blocking gen calls (2600hz#2940)
Browse files Browse the repository at this point in the history
* cleanup log

* request a new rev for each save

* rework fax receiver so it doesn't block on save
  • Loading branch information
lazedo authored and k-anderson committed Dec 8, 2016
1 parent 7147345 commit 8b54668
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 100 deletions.
2 changes: 1 addition & 1 deletion applications/fax/src/fax_jobs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ lock_account_job(Doc, JObj, Jobs) ->
case kz_datamgr:save_doc(?KZ_FAXES_DB, UpdatedDoc, [{'rev', kz_doc:revision(Doc)}]) of
{'ok', _U} ->

lager:debug("UPDATED LOCKING JOB ~s / ~p", [kz_doc:id(_U), _U]),
lager:debug("fax job ~s locked for executing", [kz_doc:id(Doc)]),
[JObj | Jobs];
{'error', Error} ->
lager:debug("failed to lock jobid ~s", [kz_doc:id(Doc), Error]),
Expand Down
218 changes: 127 additions & 91 deletions applications/fax/src/fax_request.erl
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
%% API
-export([new_request/2]).

-export([start_link/2]).
-export([start_link/3]).

-export([cancel/1]).

%% gen_listener callbacks
-export([init/1
Expand All @@ -24,14 +26,12 @@
,code_change/3
]).

-export([handle_fax_event/2
,handle_execute_complete/2
]).

-include("fax.hrl").

-define(SERVER, ?MODULE).

-define(POLL_INTERVAL, 5000).

-record(state, {
call :: kapps_call:call()
,action = 'receive' :: 'receive' | 'transmit'
Expand All @@ -48,26 +48,27 @@
,fax_status :: api_object()
,page = 0 ::integer()
,status :: binary()
,monitor :: pid_ref()
}).
-type state() :: #state{}.

-type handle_cast_return() :: {'noreply', state()} |
{'stop', atom(), state()}.

-define(BINDINGS(CALL), [{'call', [{'callid', kapps_call:call_id(CALL)}
,{'restrict_to', [<<"CHANNEL_EXECUTE_COMPLETE">>
,<<"CHANNEL_FAX_STATUS">>
,{'restrict_to', [<<"CHANNEL_FAX_STATUS">>
,<<"CHANNEL_DESTROY">>
]}
]}
,{'self', []}
]).

-define(RESPONDERS, [{{?MODULE, 'handle_execute_complete'}
,[{<<"call_event">>, <<"CHANNEL_EXECUTE_COMPLETE">>}]
}
,{{?MODULE, 'handle_fax_event'}
-define(RESPONDERS, [{fun handle_fax_event/2
,[{<<"call_event">>, <<"CHANNEL_FAX_STATUS">>}]
}
,{fun handle_channel_event/2
,[{<<"call_event">>, <<"CHANNEL_DESTROY">>}]
}
]).

%%%===================================================================
Expand All @@ -77,28 +78,28 @@
%%--------------------------------------------------------------------
%% @doc Starts the server
%%--------------------------------------------------------------------
-spec start_link(kapps_call:call(), kz_json:object()) -> startlink_ret().
start_link(Call, JObj) ->
gen_listener:start_link(?SERVER
-spec start_link(kapps_call:call(), kz_json:object(), fax_storage()) -> startlink_ret().
start_link(Call, JObj, Storage) ->
gen_listener:start_link({'local', kz_util:to_atom(Storage#fax_storage.id, 'true')}
,?MODULE
,[{'bindings', ?BINDINGS(Call)}
,{'responders', ?RESPONDERS}
]
,[Call, JObj]
,[Call, JObj, Storage]
).

-spec handle_execute_complete(kz_json:object(), kz_proplist()) -> 'ok'.
handle_execute_complete(JObj, Props) ->
AppName = kz_json:get_value(<<"Application-Name">>, JObj),
AppResp = kz_json:get_value(<<"Application-Response">>, JObj),
Srv = props:get_value('server', Props),
gen_server:cast(Srv, {'exec_completed', AppName, AppResp, JObj}).

-spec handle_fax_event(kz_json:object(), kz_proplist()) -> 'ok'.
handle_fax_event(JObj, Props) ->
Srv = props:get_value('server', Props),
Event = kz_json:get_value(<<"Application-Event">>, JObj),
gen_server:cast(Srv, {'fax_status', Event , JObj}).

-spec handle_channel_event(kz_json:object(), kz_proplist()) -> 'ok'.
handle_channel_event(JObj, Props) ->
Srv = props:get_value('server', Props),
Event = kz_api:event_name(JObj),
gen_server:cast(Srv, {'channel_event', Event , JObj}).

%%%===================================================================
%%% gen_listener callbacks
%%%===================================================================
Expand All @@ -114,8 +115,8 @@ handle_fax_event(JObj, Props) ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
-spec init([kapps_call:call() | kz_json:object()]) -> {'ok', state()}.
init([Call, JObj]) ->
-spec init([kapps_call:call() | kz_json:object() | fax_storage()]) -> {'ok', state()}.
init([Call, JObj, Storage]) ->
kapps_call:put_callid(Call),
gen_listener:cast(self(), 'start_action'),
{'ok', #state{call = Call
Expand All @@ -124,6 +125,8 @@ init([Call, JObj]) ->
,faxbox_id = kz_json:get_value(<<"FaxBox-ID">>, JObj)
,fax_option = kz_json:get_value(<<"Fax-T38-Option">>, JObj, 'false')
,account_id = kapps_call:account_id(Call)
,fax_id=Storage#fax_storage.id
,storage=Storage
}}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -193,19 +196,40 @@ handle_cast({'fax_status', <<"result">>, JObj}, State) ->
handle_cast({'fax_status', Event, _JObj}, State) ->
lager:debug("fax status not handled - ~s",[Event]),
{'noreply', State};
handle_cast({'exec_completed', <<"receive_fax">>, Result, _JObj}, State) ->
lager:debug("Fax Receive Result ~s",[Result]),
{'noreply', State};
handle_cast({'exec_completed', App, Result, _JObj}, State) ->
lager:debug("Fax exec not handled - ~s / ~s ",[App,Result]),
{'noreply', State};
handle_cast({'gen_listener', {'created_queue', QueueName}}, State) ->
lager:debug("worker discovered queue name ~s", [QueueName]),
{'noreply', State};
handle_cast({'gen_listener',{'is_consuming',_}}, State) ->
start_receive_fax(State);

handle_cast('store_document', State) ->
{'noreply', State#state{status= <<"storing document">>}, ?POLL_INTERVAL};
handle_cast({'store_attachment', FaxDoc}, #state{monitor={_, Ref}}=State) ->
erlang:demonitor(Ref),
{'noreply', State#state{fax_doc=FaxDoc
,status= <<"storing document">>
}, ?POLL_INTERVAL};
handle_cast('success', State) ->
notify_success(State),
{'stop', 'normal', State};

handle_cast('cancel', #state{fax_result='undefined'} = State) ->
lager:warning("canceling active fax receiver"),
{'stop', 'normal', State};

handle_cast('cancel', State) ->
lager:warning("canceling fax receiver"),
{'stop', 'normal', State};

handle_cast({'channel_event', <<"CHANNEL_DESTROY">>, _JObj}, #state{monitor='undefined'} = State) ->
lager:warning("received channel destroy for fax receiver"),
{'stop', 'normal', State};

handle_cast({'channel_event', <<"CHANNEL_DESTROY">>, _JObj}, State) ->
{'noreply', State, ?POLL_INTERVAL};

handle_cast(_Msg, State) ->
lager:debug("unhandled cast: ~p", [_Msg]),
lager:debug("unhandled cast : ~p", [_Msg]),
{'noreply', State}.

%%--------------------------------------------------------------------
Expand All @@ -219,11 +243,17 @@ handle_cast(_Msg, State) ->
%% @end
%%--------------------------------------------------------------------
-spec handle_info(any(), state()) -> handle_info_ret_state(state()).
handle_info({'DOWN', _Ref, 'process', Pid, 'normal'}, State) ->
lager:debug("handler ~p down normally, request is done", [Pid]),
{'stop', 'normal', State};
handle_info({'DOWN', Ref, 'process', Pid, 'normal'}, #state{monitor={Pid, Ref}}=State) ->
{'noreply', State, ?POLL_INTERVAL};
handle_info({'DOWN', Ref, 'process', Pid, Reason}, #state{monitor={Pid, Ref}}=State) ->
lager:debug("process ~p down '~p', retrying", [Pid, Reason]),
{'noreply', State, ?POLL_INTERVAL};
handle_info('timeout', #state{fax_doc='undefined'}=State) ->
{'noreply', State#state{monitor=store_document(State)}};
handle_info('timeout', #state{}=State) ->
{'noreply', State#state{monitor=store_attachment(State)}};
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
lager:debug("unhandled info: ~p", [_Info]),
{'noreply', State}.

-spec handle_event(kz_json:object(), state()) -> gen_listener:handle_event_return().
Expand All @@ -243,8 +273,7 @@ handle_event(_JObj, _State) ->
%%--------------------------------------------------------------------
-spec terminate(any(), state()) -> 'ok'.
terminate(_Reason, #state{call=Call}) ->
kapps_call_command:hangup(Call),
lager:debug("fax request terminating: ~p", [_Reason]).
kapps_call_command:hangup(Call).

%%--------------------------------------------------------------------
%% @private
Expand All @@ -271,22 +300,14 @@ get_action(JObj) ->
-spec start_receive_fax(state()) -> {'noreply', state()}.
start_receive_fax(#state{call=Call
,fax_option=ReceiveFlag
,fax_id=FaxId
}=State) ->
kapps_call:put_callid(Call),
Storage = get_fax_storage(Call),
Props = [{<<"Fax-Doc-ID">>, Storage#fax_storage.id}
,{<<"Fax-Doc-DB">>, Storage#fax_storage.db}
],
NewCall = kapps_call:kvs_store_proplist(Props, Call),
NewState = maybe_update_fax_settings(State#state{storage=Storage
,fax_id=Storage#fax_storage.id
,call=NewCall
}),
NewState = maybe_update_fax_settings(State),
ResourceFlag = kapps_call:custom_channel_var(<<"Resource-Fax-Option">>, Call),
LocalFile = get_fs_filename(NewState),
send_status(NewState, list_to_binary(["New Fax from ", kapps_call:caller_id_number(Call)]), ?FAX_START, 'undefined'),
kapps_call_command:answer(Call),
lager:debug("receive fax t.38 ~p / ~p", [ResourceFlag, ReceiveFlag]),
lager:debug("receive fax ~s - t.38 ~p / ~p", [FaxId, ResourceFlag, ReceiveFlag]),
kapps_call_command:receive_fax(ResourceFlag, ReceiveFlag, LocalFile, Call),
{'noreply', NewState}.

Expand Down Expand Up @@ -432,56 +453,57 @@ overridden_fax_identity(Call, JObj) ->
end_receive_fax(JObj, #state{call=Call}=State) ->
kapps_call_command:hangup(Call),
case kz_json:is_true([<<"Application-Data">>,<<"Fax-Success">>], JObj, 'false') of
'true' -> maybe_store_fax(JObj, State);
'true' -> end_receive_fax(State#state{fax_result=JObj});
'false' ->
notify_failure(JObj, State),
{'stop', 'normal', State}
end.

-spec maybe_store_fax(kz_json:object(), state()) -> handle_cast_return().
maybe_store_fax(JObj, #state{storage=#fax_storage{id=FaxId}}=State) ->
case store_fax(JObj, State) of
{'ok', FaxDoc} ->
lager:debug("fax stored successfully into ~s / ~s", [kz_doc:account_db(FaxDoc), FaxId]),
store_attachment(State#state{fax_doc=FaxDoc, fax_result=JObj});
{'error', Error} ->
lager:debug("store fax other resp: ~p", [Error]),
notify_failure(JObj, Error, State),
{'stop', 'normal', State}
end.
-spec end_receive_fax(state()) -> handle_cast_return().
end_receive_fax(#state{}=State) ->
{'noreply', State#state{monitor=store_document(State)}}.

-spec store_document(state()) -> pid_ref().
store_document(#state{}=State) ->
kz_util:spawn_monitor(fun store_document/2, [self(), State]).

-spec store_fax(kz_json:object(), state() ) ->
{'ok', kz_json:object()} |
{'error', any()}.
store_fax(JObj, #state{storage=#fax_storage{attachment_id=_AttachmentId}
}=State) ->
-spec store_document(pid(), state() ) -> 'ok'.
store_document(Pid, #state{fax_result=JObj
,storage=#fax_storage{id=FaxId}
}=State) ->
case create_fax_doc(JObj, State) of
{'ok', _Doc} = OK -> OK;
Error -> Error
{'ok', FaxDoc} ->
lager:debug("fax document stored successfully into ~s / ~s", [kz_doc:account_db(FaxDoc), FaxId]),
gen_server:cast(Pid, {'store_attachment', FaxDoc});
{'error', Error} ->
lager:debug("error storing fax document ~s: ~p", [FaxId, Error]),
gen_server:cast(Pid, 'store_document')
end.

-spec get_fs_filename(state()) -> ne_binary().
get_fs_filename(#state{storage=#fax_storage{attachment_id=AttachmentId}}) ->
LocalPath = kapps_config:get_binary(?CONFIG_CAT, <<"fax_file_path">>, <<"/tmp/">>),
<<LocalPath/binary, AttachmentId/binary>>.
-spec store_attachment(state()) -> pid_ref().
store_attachment(#state{}=State) ->
kz_util:spawn_monitor(fun store_attachment/2, [self(), State]).

-spec store_attachment(state()) -> handle_cast_return().
store_attachment(#state{call=Call
,fax_result=FaxResultObj
,storage=#fax_storage{attachment_id=AttachmentId}
,fax_doc=FaxDoc
}=State) ->
-spec store_attachment(pid(), state()) -> 'ok'.
store_attachment(Pid, #state{call=Call
,storage=#fax_storage{attachment_id=AttachmentId}
,fax_doc=FaxDoc
,fax_id=FaxId
}=State) ->
FaxUrlFun = fun() -> kz_media_url:store(FaxDoc, AttachmentId) end,
FaxFile = get_fs_filename(State),
case kapps_call_command:store_file(FaxFile, FaxUrlFun, Call) of
'ok' ->
notify_success(FaxResultObj, State),
{'stop', 'normal', State};
{'error', Error} ->
notify_failure(FaxResultObj, Error, State),
{'stop', 'normal', State}
lager:debug("fax attachment stored successfully into ~s / ~s / ~s", [kz_doc:account_db(FaxDoc), FaxId, AttachmentId]),
gen_server:cast(Pid, 'success');
{'error', _} -> gen_server:cast(Pid, 'store_attachment')
end.

-spec get_fs_filename(state()) -> ne_binary().
get_fs_filename(#state{storage=#fax_storage{attachment_id=AttachmentId}}) ->
LocalPath = kapps_config:get_binary(?CONFIG_CAT, <<"fax_file_path">>, <<"/tmp/">>),
<<LocalPath/binary, AttachmentId/binary>>.

-spec create_fax_doc(kz_json:object(), state()) ->
{'ok', kz_json:object()} |
{'error', any()}.
Expand Down Expand Up @@ -596,14 +618,15 @@ notify_failure(JObj, Reason, #state{call=Call
]),
kapi_notifications:publish_fax_inbound_error(Message).

-spec notify_success(kz_json:object(), state()) -> 'ok'.
notify_success(JObj, #state{call=Call
,owner_id=OwnerId
,faxbox_id=FaxBoxId
,fax_notify=Notify
,account_id=AccountId
,storage=#fax_storage{id=FaxId, db=FaxDb}
}=State) ->
-spec notify_success(state()) -> 'ok'.
notify_success(#state{call=Call
,owner_id=OwnerId
,faxbox_id=FaxBoxId
,fax_notify=Notify
,account_id=AccountId
,fax_result=JObj
,storage=#fax_storage{id=FaxId, db=FaxDb}
}=State) ->
Data = kz_json:get_value(<<"Application-Data">>, JObj, kz_json:new()),
Status = <<"Fax Successfuly received">>,
send_status(State, Status, ?FAX_END, Data),
Expand Down Expand Up @@ -656,4 +679,17 @@ send_status(#state{call=Call
-spec new_request(kz_json:object(), kz_proplist()) -> sup_startchild_ret().
new_request(JObj, _Props) ->
'true' = kapi_fax:req_v(JObj),
fax_requests_sup:new(kapps_call:from_json(kz_json:get_value(<<"Call">>, JObj)), JObj).
Call = kapps_call:from_json(kz_json:get_value(<<"Call">>, JObj)),
Storage = get_fax_storage(Call),
Props = [{<<"Fax-Doc-ID">>, Storage#fax_storage.id}
,{<<"Fax-Doc-DB">>, Storage#fax_storage.db}
],
NewCall = kapps_call:kvs_store_proplist(Props, Call),
fax_requests_sup:new(NewCall, JObj, Storage).

-spec cancel(ne_binary()) -> 'ok'.
cancel(Id) ->
case whereis(kz_util:to_atom(Id, 'true')) of
'undefined' -> io:format("job ~s not found", [Id]);
Pid -> gen_server:cast(Pid, 'cancel')
end.
8 changes: 4 additions & 4 deletions applications/fax/src/fax_requests_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

%% API
-export([start_link/0]).
-export([new/2]).
-export([new/3]).
-export([workers/0]).

%% Supervisor callbacks
Expand All @@ -36,9 +36,9 @@
start_link() ->
supervisor:start_link({'local', ?SERVER}, ?MODULE, []).

-spec new(kapps_call:call(), kz_json:object()) -> sup_startchild_ret().
new(Call, JObj) ->
supervisor:start_child(?SERVER, [Call, JObj]).
-spec new(kapps_call:call(), kz_json:object(), fax_storage()) -> sup_startchild_ret().
new(Call, JObj, Storage) ->
supervisor:start_child(?SERVER, [Call, JObj, Storage]).

-spec workers() -> pids().
workers() ->
Expand Down
Loading

0 comments on commit 8b54668

Please sign in to comment.