Skip to content

Commit

Permalink
Handle BigCouch nodes with required user auth (2600hz#6491)
Browse files Browse the repository at this point in the history
* Reply back to kz_tasks_trigger even when auto compaction is disabled

* Let user know ASAP if an auto compaction job is running

* Handle BigCouch nodes with required user auth
  • Loading branch information
harenson authored May 1, 2020
1 parent 73a6ee0 commit ef4f6cc
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 48 deletions.
20 changes: 9 additions & 11 deletions applications/tasks/src/modules/kt_compaction_reporter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -432,20 +432,18 @@ code_change(_OldVsn, State, _Extra) ->
%%------------------------------------------------------------------------------
-spec stats_to_status_fold(kz_term:ne_binary(), compaction_stats(), [kz_term:proplist()]) ->
[kz_term:proplist()].
stats_to_status_fold(_CallId, Stats = #{'queued_dbs' := QueuedDBs}, Acc) ->
stats_to_status_fold(_CallId, Stats = #{'found_dbs' := FoundDBs}, Acc) ->
Keys = ['id', 'found_dbs', 'compacted_dbs', 'queued_dbs', 'skipped_dbs', 'current_db',
'found_shards', 'compacted_shards', 'recovered_disk', 'pid', 'node', 'started'],
ToBin = fun(Something) -> kz_term:to_binary(Something) end,
StatsProp = [{ToBin(Key), ToBin(maps:get(Key, Stats))} || Key <- Keys],
case QueuedDBs =:= 0 of
'true' ->
%% This happens when it finishes compacting on the first node so now
%% `found_dbs = compacted_dbs + skipped_dbs' which means `queued_dbs = 0' and
%% also it means it is still compacting on other nodes otherwise this status
%% were not being build.
MsgProp = [{<<"msg">>, <<"Still compacting on other nodes">>}],
StatsProp = [{kz_term:to_binary(Key), kz_term:to_binary(maps:get(Key, Stats))} || Key <- Keys],
case FoundDBs of
0 ->
%% If compaction is running and `found_dbs=0' means it let this module know a new
%% compaction job is running but it is missing to report the dbs to be compacted.
%% Which means it is still loading dbs (sorting).
MsgProp = [{<<"NOTE">>, <<"Still listing/sorting databases.">>}],
[StatsProp ++ MsgProp | Acc];
'false' ->
_ ->
[StatsProp | Acc]
end.

Expand Down
81 changes: 44 additions & 37 deletions applications/tasks/src/modules/kt_compactor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
,compact_db/1
,compact_node/1

%% Used to handle auto_compaction triggers. Check init/0 for more info.
,do_compact_db/1
]).

-export([browse_dbs_for_triggers/1]).
%% Functions meant to be used only by this module. Exported ONLY because they are used as callback
%% functions for the tasks_bindings module. Check `init/0' for more info.
-export([browse_dbs_for_triggers/1
,do_compact_db/1
]).

%% Triggerables
-export([help/1, help/2, help/3
Expand Down Expand Up @@ -84,20 +86,18 @@ init(_) ->
_ = kapps_maintenance:refresh(kazoo_couch:get_admin_dbs()),
set_node_defaults(AdminNodes),

_ = case ?COMPACT_AUTOMATICALLY of
'false' -> lager:info("node ~s not configured to compact automatically", [node()]);
'true' ->
_ = tasks_bindings:bind(?TRIGGER_AUTO_COMPACTION
,?MODULE
,'browse_dbs_for_triggers'
),
lager:info("node ~s configured to compact automatically", [node()]),
%% Need to use `do_compact_db/1' instead of `compact_db/1' because the
%% the former uses `?HEUR_RATIO' for heuristic and the latter ignores
%% heuristic and doesn't allow to improve auto compaction job exec time.
_ = tasks_bindings:bind(?TRIGGER_ALL_DBS, ?MODULE, 'do_compact_db')
end,

case ?COMPACT_AUTOMATICALLY of
'false' -> lager:info("node ~s not configured to compact automatically", [node()]);
'true' -> lager:info("node ~s configured to compact automatically", [node()])
end,

%% Always bind to auto_compaction events, let `browse_dbs_for_triggers/[1,2]' decide whether or
%% not it has to take care of events. Avoids having to restart tasks app every time auto
%% compaction is enabled/disabled.
_ = tasks_bindings:bind(?TRIGGER_AUTO_COMPACTION, ?MODULE, 'browse_dbs_for_triggers'),
%% `do_compact_db/1' uses `?HEUR_RATIO' which allows to improve auto compaction job exec time.
%% `do_compact_db/1' should not receive any AMQP events if auto compaction is disabled.
_ = tasks_bindings:bind(?TRIGGER_ALL_DBS, ?MODULE, 'do_compact_db'),
_ = tasks_bindings:bind(<<"tasks.help">>, ?MODULE, 'help'),
_ = tasks_bindings:bind(<<"tasks."?CATEGORY".output_header">>, ?MODULE, 'output_header'),
tasks_bindings:bind_actions(<<"tasks."?CATEGORY>>, ?MODULE, ?ACTIONS).
Expand Down Expand Up @@ -437,11 +437,13 @@ get_node_connections(Node, #server{options=Options}) ->
NodeAdminPort = ?NODE_ADMIN_PORT(Node),

lager:debug("getting connection information for ~s, ~p and ~p", [Host, NodeAPIPort, NodeAdminPort]),
C1 = couchbeam:server_connection(Hostname, NodeAPIPort, <<"">>, Options),
C2 = couchbeam:server_connection(Hostname, NodeAdminPort, <<"">>, Options),
#{'username' := Username, 'password' := Password} = props:get_value('connection_map', Options),
ConnOpts = [{'basic_auth', {Username, Password}}],
APIConn = couchbeam:server_connection(Hostname, NodeAPIPort, <<"">>, ConnOpts),
AdminConn = couchbeam:server_connection(Hostname, NodeAdminPort, <<"">>, ConnOpts),

try {kz_couch_util:connection_info(C1),
kz_couch_util:connection_info(C2)
try {kz_couch_util:connection_info(APIConn),
kz_couch_util:connection_info(AdminConn)
}
of
{{'error', 'timeout'}, _} ->
Expand Down Expand Up @@ -525,8 +527,10 @@ sort_by_disk_size(DbsSizes) when is_list(DbsSizes) ->
sort_by_disk_size({_UnencDb1, {DiskSize1, _}}, {_UnencDb2, {DiskSize2, _}}) ->
DiskSize1 > DiskSize2;
sort_by_disk_size({_UnencDb1, {_DiskSize1, _}}, {_UnencDb2, _Else}) -> %% Else = 'not_found' | 'undefined'
%% Failed to get disk size information for db2.
'true';
sort_by_disk_size({_UnencDb1, _Else}, {_UnencDb2, {_DiskSize2, _}}) -> %% Else = 'not_found' | 'undefined'
%% Failed to get disk size information for db1.
'false'.

-spec track_job(kz_term:ne_binary(), function(), [term()]) -> rows().
Expand Down Expand Up @@ -560,21 +564,31 @@ supid_to_jobtype(SupId) ->
%%------------------------------------------------------------------------------
%% @doc Entry point for starting the automatic compaction job.
%%
%% This functions gets triggered by the `browse_dbs_ref' based on `browse_dbs_timer'
%% function. By default it triggers the action 1 day after the timer starts.
%% This function gets triggered by `kz_tasks_trigger's `browse_dbs_ref' timer.
%% By default it triggers the action 1 day after the timer starts.
%% @end
%%------------------------------------------------------------------------------
-spec browse_dbs_for_triggers(atom() | reference()) -> 'ok'.
browse_dbs_for_triggers(Ref) ->
browse_dbs_for_triggers(Ref, ?COMPACT_AUTOMATICALLY),
gen_server:cast('kz_tasks_trigger', {'cleanup_finished', Ref}).

-spec browse_dbs_for_triggers(atom() | reference(), boolean()) -> 'ok'.
browse_dbs_for_triggers(Ref, 'false') ->
%% Avoid kz_tasks_trigger wait forever for a reply from this process when triggering
%% the auto compaction job on a node with auto compaction disabled.
lager:info("automatic compaction is disabled on this node, skipping ~p trigger", [Ref]);
browse_dbs_for_triggers(Ref, 'true') ->
CallId = build_compaction_callid(<<"cleanup_pass">>),
kz_log:put_callid(CallId),
lager:info("starting cleanup pass of databases"),
Dbs = maybe_list_and_sort_dbs_for_compaction(?COMPACT_AUTOMATICALLY, CallId),
'ok' = kt_compaction_reporter:start_tracking_job(self(), node(), CallId),
Dbs = list_and_sort_dbs_for_compaction(),
'ok' = kt_compaction_reporter:set_job_dbs(CallId, Dbs),
_Counter = lists:foldl(fun trigger_db_cleanup/2, {length(Dbs), 1}, Dbs),
'ok' = kt_compaction_reporter:stop_tracking_job(CallId),
kz_log:put_callid('undefined'), % Reset callid
lager:info("pass completed for ~p", [Ref]),
gen_server:cast('kz_tasks_trigger', {'cleanup_finished', Ref}).
lager:info("pass completed for ~p", [Ref]).

-spec build_compaction_callid(kz_term:ne_binary()) -> kz_term:ne_binary().
build_compaction_callid(JobTypeBin) ->
Expand All @@ -599,19 +613,12 @@ trigger_db_cleanup(Db, {TotalDbs, Counter}) ->
cleanup_pass(Db),
{TotalDbs, Counter + 1}.

-spec maybe_list_and_sort_dbs_for_compaction(boolean(), kz_term:ne_binary()) ->
[kz_term:ne_binary()].
maybe_list_and_sort_dbs_for_compaction('true', CallId) ->
lager:debug("auto compaction enabled, getting databases list and sorting them by disk size"),
-spec list_and_sort_dbs_for_compaction() -> [db_and_sizes()].
list_and_sort_dbs_for_compaction() ->
lager:debug("getting databases list and sorting them by disk size"),
Sorted = get_all_dbs_and_sort_by_disk(),
lager:debug("finished listing and sorting databases (~p found)", [length(Sorted)]),
'ok' = kt_compaction_reporter:start_tracking_job(self(), node(), CallId, Sorted),
Sorted;
maybe_list_and_sort_dbs_for_compaction('false', _CallId) ->
lager:debug("auto compaction disabled, skip sorting dbs by size"),
{'ok', Dbs} = kz_datamgr:db_info(),
lager:debug("finished listing databases (~p found)", [length(Dbs)]),
Dbs.
Sorted.

-spec cleanup_pass(kz_term:ne_binary()) -> boolean().
cleanup_pass(Db) ->
Expand Down

0 comments on commit ef4f6cc

Please sign in to comment.