Skip to content

Commit

Permalink
[Fix #kazoo-5636] Split kz_account_crawler tasks into task modules (2…
Browse files Browse the repository at this point in the history
…600hz#4358)

* [kazoo-5636] Fix dialyzer warnings

* [Fix #kazoo-5636] Split kz_account_crawler tasks into task modules and trigger this new tasks along doodle checks

* [Fix #kazoo-5636] Add fixes according to Lazedo's PR review

* [Fix #kazoo-5636] Add fix according to circle-ci checks
  • Loading branch information
harenson authored and lazedo committed Nov 22, 2017
1 parent 8702e17 commit 7edd54c
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 242 deletions.
8 changes: 8 additions & 0 deletions applications/doodle/src/doodle_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
-behaviour(application).

-include("doodle.hrl").
-define(ACCOUNT_CRAWLER_BINDING, <<"tasks.account_crawler">>).

-export([start/2, stop/1]).

Expand All @@ -29,6 +30,10 @@ start(_Type, _Args) ->
{'error', Err} -> lager:error("default sms is undefined and cannot read default from file: ~p", [Err]);
JObj -> kapps_config:set(?CONFIG_CAT, <<"reschedule">>, JObj)
end,
lager:debug("Start listening for tasks.account_crawler trigger"),
_ = kazoo_bindings:bind(?ACCOUNT_CRAWLER_BINDING,
'doodle_maintenance',
'start_check_sms_by_account'),
doodle_sup:start_link().

%%--------------------------------------------------------------------
Expand All @@ -37,6 +42,9 @@ start(_Type, _Args) ->
%%--------------------------------------------------------------------
-spec stop(any()) -> any().
stop(_State) ->
_ = kazoo_bindings:unbind(?ACCOUNT_CRAWLER_BINDING,
'doodle_maintenance',
'start_check_sms_by_account'),
'ok'.

-spec declare_exchanges() -> 'ok'.
Expand Down
2 changes: 1 addition & 1 deletion applications/doodle/src/doodle_maintenance.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ check_pending_sms_for_offnet_delivery(AccountId) ->
replay_sms(AccountId, JObjs) ->
lager:debug("starting sms offnet delivery for account ~s", [AccountId]),
F = fun (JObj) ->
doodle_util:replay_sms(AccountId, kz_doc:id(JObj)),
_ = doodle_util:replay_sms(AccountId, kz_doc:id(JObj)),
timer:sleep(200)
end,
lists:foreach(F, JObjs).
Expand Down
245 changes: 5 additions & 240 deletions applications/tasks/src/kz_account_crawler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,6 @@
-define(TIME_BETWEEN_WHOLE_CRAWLS,
kapps_config:get_integer(?CONFIG_CAT, <<"cycle_delay_time_ms">>, 5 * ?MILLISECONDS_IN_MINUTE)).

-define(SHOULD_CRAWL_FOR_FIRST_OCCURRENCE,
kapps_config:get_is_true(?CONFIG_CAT, <<"should_crawl_for_first_occurrence">>, 'true')).

-define(SHOULD_CRAWL_FOR_LOW_BALANCE,
kapps_config:get_is_true(?CONFIG_CAT, <<"should_crawl_for_low_balance">>, 'true')).

-define(LOW_BALANCE_REPEAT,
kapps_config:get_integer(?CONFIG_CAT, <<"low_balance_repeat_s">>, 1 * ?SECONDS_IN_DAY)).

%%%===================================================================
%%% API
%%%===================================================================
Expand All @@ -67,7 +58,7 @@ check(Account)
AccountId = kz_util:format_account_id(Account),
case kz_datamgr:open_doc(?KZ_ACCOUNTS_DB, AccountId) of
{'ok', AccountJObj} ->
process_account(AccountId, kz_doc:account_db(AccountJObj), AccountJObj);
process_account(AccountId, AccountJObj);
{'error', _R} ->
lager:warning("unable to open account definition for ~s: ~p", [AccountId, _R])
end;
Expand Down Expand Up @@ -210,239 +201,13 @@ check_then_process_account(AccountId, {'ok', AccountJObj}) ->
'true' ->
lager:debug("not processing account ~p (soft-destroyed)", [AccountId]);
'false' ->
process_account(AccountId, kz_doc:account_db(AccountJObj), AccountJObj)
process_account(AccountId, AccountJObj)
end;
check_then_process_account(AccountId, {'error', _R}) ->
lager:warning("unable to open account definition for ~s: ~p", [AccountId, _R]).

-spec process_account(ne_binary(), ne_binary(), kz_account:doc()) -> 'ok'.
process_account(AccountId, AccountDb, AccountJObj) ->
-spec process_account(ne_binary(), kz_account:doc()) -> 'ok'.
process_account(AccountId, AccountJObj) ->
lager:debug("account crawler processing account ~s", [AccountId]),
_ = maybe_test_for_initial_occurrences(AccountId, AccountDb, AccountJObj),
_ = maybe_test_for_low_balance(AccountId, AccountJObj),
_ = doodle_maintenance:start_check_sms_by_account(AccountId, AccountJObj),
'ok'.

%%% Initial Occurrence

-spec maybe_test_for_initial_occurrences(ne_binary(), ne_binary(), kz_account:doc()) -> 'ok'.
maybe_test_for_initial_occurrences(AccountId, AccountDb, AccountJObj) ->
case ?SHOULD_CRAWL_FOR_FIRST_OCCURRENCE of
'false' -> 'ok';
'true' ->
maybe_test_for_registrations(AccountId, AccountJObj),
maybe_test_for_initial_call(AccountId, AccountDb, AccountJObj)
end.

%% First registration
-spec maybe_test_for_registrations(ne_binary(), kz_account:doc()) -> 'ok'.
maybe_test_for_registrations(AccountId, AccountJObj) ->
Realm = kz_account:realm(AccountJObj),
case Realm =:= 'undefined'
orelse kz_account:sent_initial_registration(AccountJObj)
of
'true' -> 'ok';
'false' -> test_for_registrations(AccountId, Realm)
end.

-spec test_for_registrations(ne_binary(), ne_binary()) -> 'ok'.
test_for_registrations(AccountId, Realm) ->
lager:debug("looking for any registrations in realm ~s", [Realm]),
Reg = [{<<"Realm">>, Realm}
,{<<"Fields">>, [<<"Account-ID">>]}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
case kapps_util:amqp_pool_collect(Reg
,fun kapi_registration:publish_query_req/1
,{'ecallmgr', fun kapi_registration:query_resp_v/1}
)
of
{'error', _} -> 'ok';
{_, JObjs} ->
case lists:any(fun kapi_registration:query_resp_v/1, JObjs) of
'false' -> 'ok';
'true' ->
lager:debug("found initial registration for account ~s (~s)", [AccountId, Realm]),
handle_initial_registration(AccountId)
end
end.

-spec handle_initial_registration(ne_binary()) -> 'ok'.
handle_initial_registration(AccountId) ->
case kz_account:fetch(AccountId) of
{'ok', AccountJObj} -> notify_initial_registration(AccountJObj);
_E -> 'ok'
end.

-spec notify_initial_registration(kz_account:doc()) -> 'ok'.
notify_initial_registration(AccountJObj) ->
UpdatedAccountJObj = kz_account:set_initial_registration_sent(AccountJObj, 'true'),
_ = kz_util:account_update(UpdatedAccountJObj),
Req = [{<<"Account-ID">>, kz_doc:id(AccountJObj)}
,{<<"Occurrence">>, <<"registration">>}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapps_notify_publisher:cast(Req, fun kapi_notifications:publish_first_occurrence/1).

%% First Call
-spec maybe_test_for_initial_call(ne_binary(), ne_binary(), kz_account:doc()) -> 'ok'.
maybe_test_for_initial_call(AccountId, AccountDb, AccountJObj) ->
case kz_account:sent_initial_call(AccountJObj) of
'true' -> 'ok';
'false' ->
lager:debug("looking for initial call in account ~s", [AccountId]),
test_for_initial_call(AccountId, AccountDb)
end.

-spec test_for_initial_call(ne_binary(), ne_binary()) -> 'ok'.
test_for_initial_call(AccountId, AccountDb) ->
ViewOptions = [{'key', <<"cdr">>}
,{'limit', 1}
],
case kz_datamgr:get_results(AccountDb, <<"maintenance/listing_by_type">>, ViewOptions) of
{'ok', [_|_]} ->
lager:debug("found initial call in account ~s", [AccountId]),
handle_initial_call(AccountId);
_Else -> 'ok'
end.

-spec handle_initial_call(ne_binary()) -> 'ok'.
handle_initial_call(AccountId) ->
case kz_account:fetch(AccountId) of
{'ok', AccountJObj} -> notify_initial_call(AccountJObj);
_ -> 'ok'
end.

-spec notify_initial_call(kz_account:doc()) -> 'ok'.
notify_initial_call(AccountJObj) ->
UpdatedAccountJObj = kz_account:set_initial_call_sent(AccountJObj, 'true'),
_ = kz_util:account_update(UpdatedAccountJObj),
Req = [{<<"Account-ID">>, kz_doc:id(AccountJObj)}
,{<<"Occurrence">>, <<"call">>}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapps_notify_publisher:cast(Req, fun kapi_notifications:publish_first_occurrence/1).

%%% Low balance check

-spec maybe_test_for_low_balance(ne_binary(), kz_account:doc()) -> 'ok'.
maybe_test_for_low_balance(AccountId, AccountJObj) ->
case ?SHOULD_CRAWL_FOR_LOW_BALANCE of
'false' -> 'ok';
'true' -> test_for_low_balance(AccountId, AccountJObj, 3)
end.

-spec test_for_low_balance(ne_binary(), kz_account:doc(), 0..3) -> 'ok'.
test_for_low_balance(_AccountId, _AccountJObj, 0) ->
lager:debug("max try to get account ~s current balance", [_AccountId]);
test_for_low_balance(AccountId, AccountJObj, Loop) ->
case wht_util:current_balance(AccountId) of
{'error', 'timeout'} ->
test_for_low_balance(AccountId, AccountJObj, Loop - 1);
{'error', _R} -> 'ok';
{'ok', CurrentBalance} ->
maybe_notify_for_low_balance(AccountJObj, CurrentBalance),
maybe_topup_account(AccountJObj, CurrentBalance)
end.

-spec maybe_notify_for_low_balance(kz_account:doc(), kz_transaction:units()) -> 'ok'.
maybe_notify_for_low_balance(AccountJObj, CurrentBalance) ->
AccountId = kz_account:id(AccountJObj),
Threshold = kz_account:low_balance_threshold(AccountJObj),
lager:info("checking if account ~s balance $~w is below notification threshold $~w"
,[AccountId, wht_util:units_to_dollars(CurrentBalance), Threshold]),
case is_balance_below_notify_threshold(CurrentBalance, Threshold) of
'false' -> maybe_reset_low_balance_sent(AccountJObj);
'true' -> maybe_low_balance_notify(AccountJObj, CurrentBalance)
end.

-spec is_balance_below_notify_threshold(kz_transaction:units(), number()) -> boolean().
is_balance_below_notify_threshold(CurrentBalance, Threshold) ->
CurrentBalance =< wht_util:dollars_to_units(Threshold).

-spec maybe_topup_account(kz_account:doc(), kz_transaction:units()) -> 'ok'.
maybe_topup_account(AccountJObj, CurrentBalance) ->
AccountId = kz_account:id(AccountJObj),
lager:info("checking topup for account ~s with balance $~w"
,[AccountId, wht_util:units_to_dollars(CurrentBalance)]),
case kz_topup:init(AccountId, CurrentBalance) of
'ok' ->
maybe_reset_low_balance_sent(AccountJObj),
lager:info("topup successful for ~s", [AccountId]);
{'error', topup_disabled} -> 'ok';
{'error', 'balance_above_threshold'} -> 'ok';
{'error', _Error} ->
lager:error("topup failed for ~s: ~p", [AccountId, _Error])
end.

-spec maybe_reset_low_balance_sent(kz_account:doc()) -> 'ok'.
maybe_reset_low_balance_sent(AccountJObj) ->
case kz_account:low_balance_sent(AccountJObj)
orelse kz_account:low_balance_tstamp(AccountJObj) =/= 'undefined'
of
'true' -> reset_low_balance_sent(AccountJObj);
'false' -> 'ok'
end.

-spec reset_low_balance_sent(kz_account:doc()) -> 'ok'.
reset_low_balance_sent(AccountJObj0) ->
lager:debug("resetting low balance sent"),
AccountJObj1 = kz_account:reset_low_balance_sent(AccountJObj0),
AccountJObj2 = kz_account:remove_low_balance_tstamp(AccountJObj1),
_ = kz_util:account_update(AccountJObj2),
'ok'.

-spec maybe_low_balance_notify(kz_account:doc(), kz_transaction:units()) -> 'ok'.
maybe_low_balance_notify(AccountJObj, CurrentBalance) ->
case kz_account:low_balance_enabled_exists(AccountJObj) of
'false' ->
lager:debug("low balance notification enabled key not present, using deprecated check"),
maybe_low_balance_notify_deprecated(AccountJObj, CurrentBalance);
'true' ->
maybe_low_balance_notify(AccountJObj, CurrentBalance, kz_account:low_balance_enabled(AccountJObj))
end.

-spec maybe_low_balance_notify(kz_account:doc(), kz_transaction:units(), boolean()) -> 'ok'.
maybe_low_balance_notify(_AccountJObj, _CurrentBalance, 'false') ->
lager:debug("low balance notification disabled");
maybe_low_balance_notify(AccountJObj, CurrentBalance, 'true') ->
lager:debug("low balance notification enabled"),
case kz_account:low_balance_tstamp(AccountJObj) of
LowBalanceSent when is_number(LowBalanceSent) ->
Cycle = ?LOW_BALANCE_REPEAT,
Diff = kz_time:now_s() - LowBalanceSent,
case Diff >= Cycle of
'true' -> notify_of_low_balance(AccountJObj, CurrentBalance);
'false' ->
lager:debug("low balance alert sent ~w seconds ago, repeats every ~w", [Diff, Cycle])
end;
_Else -> notify_of_low_balance(AccountJObj, CurrentBalance)
end.

-spec maybe_low_balance_notify_deprecated(kz_account:doc(), kz_transaction:units()) -> 'ok'.
maybe_low_balance_notify_deprecated(AccountJObj, CurrentBalance) ->
case kz_account:low_balance_sent(AccountJObj) of
'true' -> lager:debug("low balance alert already sent");
'false' -> notify_of_low_balance(AccountJObj, CurrentBalance)
end.

-spec notify_of_low_balance(kz_account:doc(), kz_transaction:units()) -> 'ok'.
notify_of_low_balance(AccountJObj, CurrentBalance) ->
AccountId = kz_account:id(AccountJObj),
lager:debug("sending low balance alert for account ~s with balance ~w"
,[AccountId, CurrentBalance]),

Req = [{<<"Account-ID">>, AccountId}
,{<<"Current-Balance">>, CurrentBalance}
| kz_api:default_headers(?APP_NAME, ?APP_VERSION)
],
kapps_notify_publisher:cast(Req, fun kapi_notifications:publish_low_balance/1),

update_account_low_balance_sent(AccountJObj).

-spec update_account_low_balance_sent(kz_account:doc()) -> 'ok'.
update_account_low_balance_sent(AccountJObj0) ->
AccountJObj1 = kz_account:set_low_balance_sent(AccountJObj0),
AccountJObj2 = kz_account:set_low_balance_tstamp(AccountJObj1),
_ = kz_util:account_update(AccountJObj2),
_ = tasks_bindings:pmap(<<"tasks.account_crawler">>, [AccountId, AccountJObj]),
'ok'.
2 changes: 1 addition & 1 deletion applications/tasks/src/kz_tasks_trigger.erl
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ browse_dbs_for_triggers(Ref) ->

-spec cleanup_pass(ne_binary()) -> boolean().
cleanup_pass(Db) ->
tasks_bindings:map(db_to_trigger(Db), Db),
_ = tasks_bindings:map(db_to_trigger(Db), Db),
erlang:garbage_collect(self()).

-spec db_to_trigger(ne_binary()) -> ne_binary().
Expand Down
Loading

0 comments on commit 7edd54c

Please sign in to comment.