Skip to content

Commit

Permalink
Background task to export CDRs to CSV (2600hz#5883)
Browse files Browse the repository at this point in the history
Large CDR exports via API frequently run afoul of timeout limits,
pagination key issues, etc. Instead, create a task and let it run in
the background while the API returns quickly. Then the task ID can be
polled for task completion and fetching of the CSV.
  • Loading branch information
jamesaimonetti authored and lazedo committed Jul 6, 2019
1 parent 68d2c04 commit d168b49
Show file tree
Hide file tree
Showing 28 changed files with 763 additions and 312 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,4 @@ core/kazoo_proper/priv/mp3.mp3
.kerl
/doc/engineering/.org/proper.pdf
/doc/engineering/.org/proper.tex
/todo.org
181 changes: 26 additions & 155 deletions applications/crossbar/src/modules/cb_cdrs.erl
Original file line number Diff line number Diff line change
Expand Up @@ -52,51 +52,6 @@
-define(KEY_UTC_OFFSET, <<"utc_offset">>).
-define(KEY_CCV, <<"custom_channel_vars">>).

-define(COLUMNS
,[{<<"id">>, fun col_id/3}
,{<<"call_id">>, fun col_call_id/3}
,{<<"caller_id_number">>, fun col_caller_id_number/3}
,{<<"caller_id_name">>, fun col_caller_id_name/3}
,{<<"callee_id_number">>, fun col_callee_id_number/3}
,{<<"callee_id_name">>, fun col_callee_id_name/3}
,{<<"duration_seconds">>, fun col_duration_seconds/3}
,{<<"billing_seconds">>, fun col_billing_seconds/3}
,{<<"timestamp">>, fun col_timestamp/3}
,{<<"hangup_cause">>, fun col_hangup_cause/3}
,{<<"other_leg_call_id">>, fun col_other_leg_call_id/3}
,{<<"owner_id">>, fun col_owner_id/3}
,{<<"to">>, fun col_to/3}
,{<<"from">>, fun col_from/3}
,{<<"direction">>, fun col_call_direction/3}
,{<<"request">>, fun col_request/3}
,{<<"authorizing_id">>, fun col_authorizing_id/3}
,{<<"cost">>, fun col_customer_cost/3}
%% New fields
,{<<"dialed_number">>, fun col_dialed_number/3}
,{<<"calling_from">>, fun col_calling_from/3}
,{<<"datetime">>, fun col_pretty_print/3}
,{<<"unix_timestamp">>, fun col_unix_timestamp/3}
,{<<"rfc_1036">>, fun col_rfc1036/3}
,{<<"iso_8601">>, fun col_iso8601/3}
,{<<"iso_8601_combined">>, fun col_iso8601_combined/3}
,{<<"call_type">>, fun col_account_call_type/3}
,{<<"rate">>, fun col_rate/3}
,{<<"rate_name">>, fun col_rate_name/3}
,{<<"bridge_id">>, fun col_bridge_id/3}
,{<<"recording_url">>, fun col_recording_url/3}
,{<<"media_recordings">>, fun col_media_recordings/3}
,{<<"media_server">>, fun col_media_server/3}
,{<<"call_priority">>, fun col_call_priority/3}
,{<<"interaction_id">>, fun col_interaction_id/3}
]).

-define(COLUMNS_RESELLER
,[{<<"reseller_cost">>, fun col_reseller_cost/3}
,{<<"reseller_call_type">>, fun col_reseller_call_type/3}
]).

-type csv_column_fun() :: fun((kz_json:object(), kz_time:gregorian_seconds(), cb_context:context()) -> kz_term:ne_binary()).

%%%=============================================================================
%%% API
%%%=============================================================================
Expand Down Expand Up @@ -449,13 +404,28 @@ normalize_cdr(Context, <<"json">>, Result) ->
Duration = kzd_cdrs:duration_seconds(JObj, 0),
Timestamp = kzd_cdrs:timestamp(JObj, 0) - Duration,

kz_json:from_list([{K, F(JObj, Timestamp, Context)} || {K, F} <- csv_rows(Context)]);
RowMappers = props:replace_value(<<"datetime">>, fun col_pretty_print/3, csv_rows(Context)),

kz_json:from_list([{K, apply_row_mapper(F, JObj, Timestamp, Context)} || {K, F} <- RowMappers]);
normalize_cdr(Context, <<"csv">>, Result) ->
JObj = kz_json:get_json_value(<<"doc">>, Result),
Duration = kzd_cdrs:duration_seconds(JObj, 0),
Timestamp = kzd_cdrs:timestamp(JObj, 0) - Duration,

<<(kz_binary:join([F(JObj, Timestamp, Context) || {_, F} <- csv_rows(Context)], <<",">>))/binary, "\r\n">>.
RowMappers = props:replace_value(<<"datetime">>, fun col_pretty_print/3, csv_rows(Context)),

<<(kz_binary:join([apply_row_mapper(F, JObj, Timestamp, Context)
|| {_, F} <- RowMappers
]
,<<",">>
))/binary
,"\r\n"
>>.

apply_row_mapper(F, JObj, Timestamp, Context) when is_function(F, 3) ->
F(JObj, Timestamp, Context);
apply_row_mapper(F, JObj, Timestamp, _Context) when is_function(F, 2) ->
F(JObj, Timestamp).

-spec maybe_add_csv_header(cb_context:context(), kz_term:ne_binary(), kz_json:objects() | kz_term:binaries()) -> cb_context:context().
maybe_add_csv_header(Context, _, []) ->
Expand All @@ -467,124 +437,20 @@ maybe_add_csv_header(Context, <<"csv">>, [Head | Tail]=Data) ->
'true' ->
cb_context:set_resp_data(Context, Data);
'false' ->
CSVHeader = kz_binary:join([K || {K, _Fun} <- csv_rows(Context)], <<",">>),
CSVHeader = kz_binary:join([K || {K, _Fun} <- csv_rows(Context)]
,<<",">>
),
cb_context:set_resp_data(Context, [<<CSVHeader/binary, "\r\n", Head/binary>> | Tail])
end.

-spec csv_rows(cb_context:context()) -> [{kz_term:ne_binary(), csv_column_fun()}].
csv_rows(Context) ->
case cb_context:fetch(Context, 'is_reseller', 'false') of
'false' -> ?COLUMNS;
'true' -> ?COLUMNS ++ ?COLUMNS_RESELLER
end.

%% see csv_column_fun() for specs for each function here
col_id(JObj, _Timestamp, _Context) -> kz_doc:id(JObj, <<>>).
col_call_id(JObj, _Timestamp, _Context) -> kzd_cdrs:call_id(JObj, <<>>).
col_caller_id_number(JObj, _Timestamp, _Context) -> kzd_cdrs:caller_id_number(JObj, <<>>).
col_caller_id_name(JObj, _Timestamp, _Context) -> kzd_cdrs:caller_id_name(JObj, <<>>).
col_callee_id_number(JObj, _Timestamp, _Context) -> kzd_cdrs:callee_id_number(JObj, <<>>).
col_callee_id_name(JObj, _Timestamp, _Context) -> kzd_cdrs:callee_id_name(JObj, <<>>).
col_duration_seconds(JObj, _Timestamp, _Context) -> kzd_cdrs:duration_seconds(JObj, <<>>).
col_billing_seconds(JObj, _Timestamp, _Context) -> kzd_cdrs:billing_seconds(JObj, <<>>).
col_timestamp(_JObj, Timestamp, _Context) -> kz_term:to_binary(Timestamp).
col_hangup_cause(JObj, _Timestamp, _Context) -> kzd_cdrs:hangup_cause(JObj, <<>>).
col_other_leg_call_id(JObj, _Timestamp, _Context) -> kzd_cdrs:other_leg_call_id(JObj, <<>>).
col_owner_id(JObj, _Timestamp, _Context) -> kz_json:get_value([?KEY_CCV, <<"owner_id">>], JObj, <<>>).
col_to(JObj, _Timestamp, _Context) -> kzd_cdrs:to(JObj, <<>>).
col_from(JObj, _Timestamp, _Context) -> kzd_cdrs:from(JObj, <<>>).
col_call_direction(JObj, _Timestamp, _Context) -> kzd_cdrs:call_direction(JObj, <<>>).
col_request(JObj, _Timestamp, _Context) -> kzd_cdrs:request(JObj, <<>>).
col_authorizing_id(JObj, _Timestamp, _Context) ->
case {kz_json:get_value([?KEY_CCV, <<"account_id">>], JObj, <<>>)
,kz_json:get_value([?KEY_CCV, <<"authorizing_id">>], JObj, <<>>)
}
of
{A, A} -> <<>>;
{_A, B} -> B
end.
col_customer_cost(JObj, _Timestamp, _Context) -> kz_term:to_binary(customer_cost(JObj)).

col_dialed_number(JObj, _Timestamp, _Context) -> dialed_number(JObj).
col_calling_from(JObj, _Timestamp, _Context) -> calling_from(JObj).
col_pretty_print(_JObj, Timestamp, Context) ->
UTCSecondsOffset = cb_context:req_value(Context, ?KEY_UTC_OFFSET),
pretty_print_datetime(handle_utc_time_offset(Timestamp, UTCSecondsOffset)).
col_unix_timestamp(_JObj, Timestamp, _Context) -> kz_term:to_binary(kz_time:gregorian_seconds_to_unix_seconds(Timestamp)).
col_rfc1036(_JObj, Timestamp, _Context) -> kz_time:rfc1036(Timestamp).
col_iso8601(_JObj, Timestamp, _Context) -> kz_date:to_iso8601_extended(Timestamp).
col_iso8601_combined(_JObj, Timestamp, _Context) -> kz_time:iso8601(Timestamp).
col_account_call_type(JObj, _Timestamp, _Context) -> kz_json:get_value([?KEY_CCV, <<"account_billing">>], JObj, <<>>).
col_rate(JObj, _Timestamp, _Context) -> kz_term:to_binary(kz_currency:units_to_dollars(kz_json:get_value([?KEY_CCV, <<"rate">>], JObj, 0))).
col_rate_name(JObj, _Timestamp, _Context) -> kz_json:get_value([?KEY_CCV, <<"rate_name">>], JObj, <<>>).
col_bridge_id(JObj, _Timestamp, _Context) -> kz_json:get_value([?KEY_CCV, <<"bridge_id">>], JObj, <<>>).
col_recording_url(JObj, _Timestamp, _Context) -> kz_json:get_value([<<"recording_url">>], JObj, <<>>).
col_media_recordings(JObj, _Timestamp, _Context) -> format_recordings(JObj).
col_media_server(JObj, _Timestamp, _Context) -> kzd_cdrs:media_server(JObj, <<>>).
col_call_priority(JObj, _Timestamp, _Context) -> kz_json:get_value([?KEY_CCV, <<"call_priority">>], JObj, <<>>).

col_reseller_cost(JObj, _Timestamp, _Context) -> kz_term:to_binary(reseller_cost(JObj)).
col_reseller_call_type(JObj, _Timestamp, _Context) -> kz_json:get_value([?KEY_CCV, <<"reseller_billing">>], JObj, <<>>).

col_interaction_id(JObj, _Timestamp, _Context) -> kzd_cdrs:interaction_id(JObj, <<>>).

-spec pretty_print_datetime(kz_time:datetime() | kz_time:gregorian_seconds()) -> kz_term:ne_binary().
pretty_print_datetime(Timestamp) when is_integer(Timestamp) ->
pretty_print_datetime(calendar:gregorian_seconds_to_datetime(Timestamp));
pretty_print_datetime({{Y,Mo,D},{H,Mi,S}}) ->
iolist_to_binary(io_lib:format("~4..0w-~2..0w-~2..0w ~2..0w:~2..0w:~2..0w"
,[Y, Mo, D, H, Mi, S]
)).
kzd_cdrs:csv_headers(cb_context:fetch(Context, 'is_reseller', 'false')).

-spec handle_utc_time_offset(kz_time:gregorian_seconds(), kz_term:api_integer()) -> kz_time:gregorian_seconds().
handle_utc_time_offset(Timestamp, 'undefined') -> Timestamp;
handle_utc_time_offset(Timestamp, UTCSecondsOffset) ->
Timestamp + kz_term:to_number(UTCSecondsOffset).

-spec format_recordings(kz_json:object()) -> kz_term:binaries().
format_recordings(JObj) ->
case kz_json:get_value([?KEY_CCV, <<"media_recordings">>], JObj, []) of
Recordings when is_list(Recordings) -> Recordings;
Recording -> [Recording]
end.

-spec dialed_number(kzd_cdrs:doc()) -> binary().
dialed_number(JObj) ->
case kzd_cdrs:call_direction(JObj) of
<<"inbound">> ->
[Num|_] = binary:split(kzd_cdrs:request(JObj, <<>>), <<"@">>),
Num;
<<"outbound">> ->
[Num|_] = binary:split(kzd_cdrs:to(JObj, <<>>), <<"@">>),
Num
end.

-spec calling_from(kz_json:object()) -> binary().
calling_from(JObj) ->
case kzd_cdrs:call_direction(JObj) of
<<"inbound">> -> kzd_cdrs:caller_id_number(JObj, <<>>);
<<"outbound">> ->
[Num|_] = binary:split(kzd_cdrs:from_uri(JObj, <<>>), <<"@">>),
Num
end.

-spec customer_cost(kz_json:object()) -> pos_integer().
customer_cost(JObj) ->
CCVs = kzd_cdrs:custom_channel_vars(JObj, kz_json:new()),
case kz_json:get_ne_binary_value(<<"account_billing">>, CCVs) of
<<"per_minute">> -> kapps_call_util:call_cost(JObj);
_ -> 0
end.

-spec reseller_cost(kz_json:object()) -> pos_integer().
reseller_cost(JObj) ->
CCVs = kzd_cdrs:custom_channel_vars(JObj, kz_json:new()),

case kz_json:get_ne_binary_value([<<"reseller_billing">>], CCVs) of
<<"per_minute">> -> kapps_call_util:call_cost(JObj);
_ -> 0
end.

%%------------------------------------------------------------------------------
%% @doc Load a CDR document from the database
%% @end
Expand Down Expand Up @@ -635,3 +501,8 @@ load_legs(Id, Context) ->
kz_json:objects().
normalize_leg_view_results(JObj, Acc) ->
Acc ++ [kz_json:get_json_value(<<"doc">>, JObj)].

-spec col_pretty_print(kz_json:object(), kz_time:gregorian_seconds(), cb_context:context()) -> kz_term:ne_binary().
col_pretty_print(_JObj, Timestamp, Context) ->
UTCSecondsOffset = cb_context:req_value(Context, ?KEY_UTC_OFFSET),
kz_time:pretty_print_datetime(handle_utc_time_offset(Timestamp, UTCSecondsOffset)).
2 changes: 1 addition & 1 deletion applications/crossbar/src/modules/cb_tasks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -529,7 +529,7 @@ read_attachment(TaskId, Context, AccountId) ->
,requested_attachment_name(Context)
);
_Status ->
lager:debug("reading ~s failed: ~p", [_Status]),
lager:debug("reading ~s failed: ~p", [TaskId, _Status]),
ReadContext
end.

Expand Down
7 changes: 4 additions & 3 deletions applications/tasks/doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ The function must return a valid instance of the type `kz_tasks:return()`:
* `kz_csv:row()`: the row to write (useful if `output_header(TaskName)` was implemented).
* `[kz_csv:row()]`: this is only supported for `noinput` tasks. Writes more than 1 row to output.
* `{ok, Data}`: nothing is written to output and `Data` will be passed to the function on next call.
* `{file, Path}`: The file to upload has been produced "out of band" and should be uploaded to the task doc.
* `{ToWrite, Data}`: where `ToWrite` is either a `kz_csv:row()` or `[kz_csv:row()]`. Writes them to output & will pass `Data` on next call.
* `{binary(), Data}`: writes the binary string to output & will pass `Data` on next call.
* `{Error, Data}`: attempts to write `Error` as an error to output & will pass `Data` on next call.
Expand Down Expand Up @@ -126,9 +127,9 @@ There are a number of triggers you can use in your module's `init/0`:
### Triggers

- Cron-like
- Minutely
- Hourly
- Daily
- Minutely, on the minute
- Hourly, on the hour
- Daily, at 00:00
- Database
- Account DBs
- Account MODBs
Expand Down
9 changes: 9 additions & 0 deletions applications/tasks/doc/cdrs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# CDR Task

Accounts with large volumes of CDRs find the API insufficient for downloading a month's worth of CDRs for billing purposes. This task aims to allow folks to have their CDR CSV created in the background and retrievable once finished as a task versus having to hope their dataset can be processed before the API timeout occurs.

## Available Tasks

### Dump

Dumps the current month's CDRs to a CSV file.
47 changes: 31 additions & 16 deletions applications/tasks/src/kz_task_worker_noinput.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

-define(OUT(TaskId), kz_tasks_scheduler:output_path(TaskId)).


%%%=============================================================================
%%% API
%%%=============================================================================
Expand Down Expand Up @@ -52,8 +51,8 @@ start(TaskId, API, ExtraArgs) ->
%% @doc
%% @end
%%------------------------------------------------------------------------------
-spec init(kz_tasks:id(), kz_json:object(), kz_tasks:extra_args()) -> {ok, state()} |
{error, any()}.
-spec init(kz_tasks:id(), kz_json:object(), kz_tasks:extra_args()) -> {'ok', state()} |
{'error', any()}.
init(TaskId, API, ExtraArgs) ->
Header = kz_tasks_scheduler:get_output_header(API),
case write_output_csv_header(TaskId, Header) of
Expand Down Expand Up @@ -113,34 +112,49 @@ new_state_after_writing(NewColumns, WrittenSucceeded, WrittenFailed
_ = kz_tasks_scheduler:worker_pause(),
S.

store_file(#state{task_id = TaskId}, File) ->
lager:info("storing ~s to task doc ~s", [File, TaskId]),
{'ok', CSV} = file:read_file(File),
case kz_tasks_scheduler:attempt_upload(TaskId, filename:basename(File), CSV, File) of
'ok' -> 'true';
{'error', _E} ->
lager:error("failed to upload ~s: ~p", [File, _E]),
'false'
end.

-spec is_task_successful(kz_tasks:iterator(), state()) ->
{boolean(), kz_tasks:columns(), non_neg_integer(), kz_tasks:iterator()} |
stop.
'stop'.
is_task_successful(IterValue
,State=#state{api = API
,extra_args = ExtraArgs
}) ->
}
) ->
case tasks_bindings:apply(API, [ExtraArgs, IterValue]) of
['stop'] -> 'stop';
[{'EXIT', {_Error, _ST=[_|_]}}] ->
lager:error("error: ~p", [_Error]),
kz_util:log_stacktrace(_ST),
{Columns,Written} = store_return(State, ?WORKER_TASK_FAILED),
{Columns, Written} = store_return(State, ?WORKER_TASK_FAILED),
{'false', Columns, Written, 'stop'};
[{'ok', NewIterValue}] ->
%% For initialisation steps. Skips writing a CSV output row.
{'true', State#state.columns, 0, NewIterValue};
[{'file', FileForUpload}] ->
%% Upload file to task attachments
IsSuccessful = store_file(State, FileForUpload),
{IsSuccessful, State#state.columns, 0, 'stop'};
[{[_|_]=NewRowOrRows, NewIterValue}] ->
{Columns,Written} = store_return(State, NewRowOrRows),
{Columns, Written} = store_return(State, NewRowOrRows),
{'true', Columns, Written, NewIterValue};
[{#{}=NewMappedRow, NewIterValue}] ->
{Columns,Written} = store_return(State, NewMappedRow),
{Columns, Written} = store_return(State, NewMappedRow),
{'true', Columns, Written, NewIterValue};
[{?NE_BINARY=NewRow, NewIterValue}] ->
{Columns,Written} = store_return(State, NewRow),
{Columns, Written} = store_return(State, NewRow),
{'true', Columns, Written, NewIterValue};
[{Error, NewIterValue}] ->
{Columns,Written} = store_return(State, Error),
{Columns, Written} = store_return(State, Error),
{'false', Columns, Written, NewIterValue}
end.

Expand All @@ -155,7 +169,7 @@ store_return(#state{task_id = TaskId
}
,Reason
) ->
Data = [reason(OutputHeader, Reason), $\n],
Data = [reason(OutputHeader, Reason)],
kz_util:write_file(?OUT(TaskId), Data, ['append']),
NewColumns = columns(OutputHeader, Reason),
{NewColumns, 1}.
Expand All @@ -172,20 +186,21 @@ reason(_, _) -> <<>>.
-spec columns(kz_tasks:output_header(), kz_tasks:return()) -> kz_tasks:columns().
columns(_, MappedRow) when is_map(MappedRow) ->
Found = [K || {K,V} <- maps:to_list(MappedRow),
V =/= undefined
V =/= 'undefined'
],
sets:from_list(Found);
columns(Header, [_|_]=Row) ->
Found = [K || {K,V} <- lists:zip(Header, Row),
V =/= undefined
Found = [K || {K, V} <- lists:zip(Header, Row),
V =/= 'undefined'
],
sets:from_list(Found);
columns(_, _) ->
sets:new().

-spec write_output_csv_header(kz_tasks:id(), kz_csv:row()) -> ok | {error, any()}.
-spec write_output_csv_header(kz_tasks:id(), kz_csv:row()) ->
'ok' | {'error', file:posix() | 'badarg' | 'terminated' | 'system_limit'}.
write_output_csv_header(TaskId, Header) ->
Data = [kz_csv:row_to_iolist(Header), $\n],
Data = [kz_csv:row_to_iolist(Header)],
file:write_file(?OUT(TaskId), Data).

%%% End of Module.
8 changes: 4 additions & 4 deletions applications/tasks/src/kz_tasks_help.erl
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ get_help(JObj) ->
{Category, Action} -> lookup_result(Category, Action, help(Category, Action))
end.

lookup_result(_, {error, _}=E) -> E;
lookup_result(Category, {ok, JObj}) ->
lookup_result(_, {'error', _}=E) -> E;
lookup_result(Category, {'ok', JObj}) ->
kz_json:from_list([{Category, JObj}]).

lookup_result(_, _, {error, _}=E) -> E;
lookup_result(Category, Action, {ok, JObj}) ->
lookup_result(_, _, {'error', _}=E) -> E;
lookup_result(Category, Action, {'ok', JObj}) ->
kz_json:set_value([Category, Action], JObj, kz_json:new()).

-spec parse_apis(kz_json:object()) -> kz_json:object().
Expand Down
Loading

0 comments on commit d168b49

Please sign in to comment.