Skip to content

Commit

Permalink
HELP-14316: test storage plans in cluster setting (2600hz#6497)
Browse files Browse the repository at this point in the history
* HELP-14316: test storage plans in cluster setting

Report that a saving of storage plans on an API server did not cause
call-handling servers to reflect the new storage plan. Unable to
replicate but added additional helpers to make sure plans are flushed
on storage doc changes

HELP-14308: Add metadata to fetched attachments as some folks use a
static URL (with no field formatters) for storing attachments (like
https://storage.company.com/recordings). Need metadata to determine
what file to return on subsequent GET requests

* abstract if kazoo_bindings is running as a PID vs some other criteria
  • Loading branch information
jamesaimonetti authored May 1, 2020
1 parent ff4b31e commit 8ff64d7
Show file tree
Hide file tree
Showing 22 changed files with 426 additions and 138 deletions.
8 changes: 8 additions & 0 deletions applications/crossbar/doc/storage.http.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,11 @@ If the backend needs to receive the attachment binary as a base64-encoded value,
```

When fetching the attachment back from your server, you can return either the raw binary or the base64-encoded version. KAZOO will decode it if necessary.

### Fetching the binary

Metadata about the file being requested will be included as querystring parameters on the GET request to your server.

For instance, if you store voicemail messages to `http://storage.server.com/storage/` with no URL formatting (`fields:[]` in handler settings), you can expect to receive a GET to `http://storage.server.com/storage/?from=pqc_cb_storage4e3005310add6229778caf6babfe7cfb/202004-03b64407c4f1f83340a9ef39fc0d02d8/uploaded_file_63754892074.mp3?from=pqc_cb_storage&media_id=202004-03b64407c4f1f83340a9ef39fc0d02d8&call_id=abad6555b4031ed6cb1e7dec&caller_id_name=pqc_cb_storage&caller_id_number=pqc_cb_storage&from=pqc_cb_storage%4053cb34.sip.2600hz.com&from_user=pqc_cb_storage&from_realm=53cb34.sip.2600hz.com&length=1&timestamp=63754892074&to=1010%4053cb34.sip.2600hz.com&to_user=1010&to_realm=53cb34.sip.2600hz.com&folder=new`

The querystring will reflect the metadata of the stored media (the JSON you'd get with a GET to the corresponding Crossbar API).
3 changes: 1 addition & 2 deletions applications/crossbar/src/crossbar_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
]).

-ifdef(TEST).
-export([patch_the_doc/2
]).
-export([patch_the_doc/2]).
-endif.

-include("crossbar.hrl").
Expand Down
2 changes: 1 addition & 1 deletion applications/crossbar/src/crossbar_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -982,7 +982,7 @@ descendants_count(Opts) ->
{'error', 'no_descendants'} ->
handle_no_descendants(ViewOptions);
{'error', _E} ->
io:format("could not load view listing_by_descendants_count: ~p~n", [_E]);
lager:info("could not load view listing_by_descendants_count: ~p~n", [_E]);
{'ok', Counts} ->
handle_descendant_counts(ViewOptions, Counts)
end.
Expand Down
43 changes: 39 additions & 4 deletions applications/crossbar/src/modules/cb_storage.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
,put/1, put/2
,post/1, post/3
,patch/1, patch/3
,delete/1, delete/3
,delete/1, delete/3, delete_account/2
]).

-ifdef(TEST).
Expand Down Expand Up @@ -65,8 +65,9 @@ init() ->
_ = crossbar_bindings:bind(<<"*.execute.put.storage">>, ?MODULE, 'put'),
_ = crossbar_bindings:bind(<<"*.execute.post.storage">>, ?MODULE, 'post'),
_ = crossbar_bindings:bind(<<"*.execute.patch.storage">>, ?MODULE, 'patch'),
_ = crossbar_bindings:bind(<<"*.execute.delete.storage">>, ?MODULE, 'delete').

_ = crossbar_bindings:bind(<<"*.execute.delete.storage">>, ?MODULE, 'delete'),
_ = crossbar_bindings:bind(<<"*.execute.delete.accounts">>, ?MODULE, 'delete_account'),
'ok'.

%%------------------------------------------------------------------------------
%% @doc Authorizes the incoming request, returning true if the requestor is
Expand Down Expand Up @@ -173,7 +174,6 @@ validate(Context, ?PLANS_TOKEN) ->
validate(Context, ?PLANS_TOKEN, PlanId) ->
validate_storage_plan(set_scope(Context), PlanId, cb_context:req_verb(Context)).


-spec validate_storage(cb_context:context(), http_method()) -> cb_context:context().
validate_storage(Context, ?HTTP_GET) ->
read(Context);
Expand Down Expand Up @@ -247,6 +247,41 @@ delete(Context) ->
delete(Context, ?PLANS_TOKEN, _PlanId) ->
crossbar_doc:delete(Context, 'false').

-spec delete_account(cb_context:context(), kz_term:ne_binary()) -> cb_context:context().
delete_account(Context, AccountId) ->
lager:debug("account ~s deleted, removing any storage plans", [AccountId]),
delete_account_storage(AccountId),
Context.

-spec delete_account_storage(kz_term:ne_binary()) -> 'ok'.
delete_account_storage(AccountId) ->
case kz_datamgr:get_result_ids(?KZ_DATA_DB
,<<"storage/storage_by_account">>
,[{'key', AccountId}]
)
of
{'ok', []} -> lager:info("no storage to delete");
{'ok', StorageIDs} ->
lager:info("removing storage plans ~s", [kz_binary:join(StorageIDs)]),
delete_storage_plans(StorageIDs);
{'error', _E} -> lager:info("error deleting storage plans: ~p", [_E])
end,
log_deleted(kz_datamgr:del_doc(?KZ_DATA_DB, AccountId)).

-spec log_deleted({'ok', kz_json:object()} | kz_datamgr:data_error()) -> 'ok'.
log_deleted({'ok', OK}) ->
case kz_json:is_true(<<"ok">>, OK) of
'true' -> lager:info("deleted account storage plan");
'false' -> lager:info("failed to delete account storage plan: ~p", [OK])
end;
log_deleted(_Else) ->
lager:info("failed to delete account storage plan: ~p", [_Else]).

-spec delete_storage_plans(kz_term:ne_binaries()) -> 'ok'.
delete_storage_plans(StorageIDs) ->
_Deleted = kz_datamgr:del_docs(?KZ_DATA_DB, StorageIDs),
lager:info("deleted storage docs ~p", [_Deleted]).

%%------------------------------------------------------------------------------
%% @doc Create a new instance with the data provided, if it is valid
%% @end
Expand Down
5 changes: 4 additions & 1 deletion applications/crossbar/src/modules/cb_vmboxes.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1049,7 +1049,10 @@ load_attachment_from_message(Doc, Context, Timezone) ->
,filename:extension(AttachmentId)
,Timezone
),
case kz_datamgr:fetch_attachment(kz_doc:account_db(Doc), MediaId, AttachmentId) of

Options = ?TYPE_CHECK_OPTION(<<"mailbox_message">>),

case kz_datamgr:fetch_attachment(kz_doc:account_db(Doc), MediaId, AttachmentId, Options) of
{'error', Error} ->
crossbar_doc:handle_datamgr_errors(Error, MediaId, Context);
{'ok', AttachBin} ->
Expand Down
9 changes: 6 additions & 3 deletions applications/jonny5/src/j5_limits.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ resource_consuming_calls(#limits{resource_consuming_calls=Calls}) -> Calls.
-spec inbound_trunks(limits()) -> tristate_integer().
inbound_trunks(#limits{inbound_trunks=-1}) -> -1;
inbound_trunks(#limits{bundled_inbound_trunks=BundledTrunks
,inbound_trunks=Trunks}) ->
,inbound_trunks=Trunks
}) ->
BundledTrunks + Trunks.

%%------------------------------------------------------------------------------
Expand All @@ -174,7 +175,8 @@ inbound_trunks(#limits{bundled_inbound_trunks=BundledTrunks
-spec outbound_trunks(limits()) -> tristate_integer().
outbound_trunks(#limits{outbound_trunks=-1}) -> -1;
outbound_trunks(#limits{bundled_outbound_trunks=BundledTrunks
,outbound_trunks=Trunks}) ->
,outbound_trunks=Trunks
}) ->
BundledTrunks + Trunks.

%%------------------------------------------------------------------------------
Expand All @@ -184,7 +186,8 @@ outbound_trunks(#limits{bundled_outbound_trunks=BundledTrunks
-spec twoway_trunks(limits()) -> tristate_integer().
twoway_trunks(#limits{twoway_trunks=-1}) -> -1;
twoway_trunks(#limits{bundled_twoway_trunks=BundledTrunks
,twoway_trunks=Trunks}) ->
,twoway_trunks=Trunks
}) ->
BundledTrunks + Trunks.

%%------------------------------------------------------------------------------
Expand Down
36 changes: 31 additions & 5 deletions core/kazoo_attachments/src/kz_att_http.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ put_attachment(Settings, DbName, DocId, AName, Contents, Options) ->
DefaultContentType = props:get_value('content_type', Options, kz_mime:from_filename(AName)),

{ContentType, Body} = build_req_body(Settings, DbName, DocId, AName, Contents, DefaultContentType),
Headers = [{'content_type', ContentType}],
Headers = [{"content-type", ContentType}],

case send_request(Url, format_verb(Verb), Headers, Body) of
{'ok', NewUrl, _Body, _Debug} ->
Expand Down Expand Up @@ -119,13 +119,39 @@ fields(_Settings) -> kz_att_util:default_format_url_fields().
,gen_attachment:att_name()
) -> gen_attachment:fetch_response().
fetch_attachment(HandlerProps, DbName, DocId, AName) ->
BaseUrlParam = kz_json:get_ne_binary_value(<<"url">>, HandlerProps),
HProps = handler_props_map(HandlerProps),

Routines = kz_att_error:fetch_routines(HandlerProps, DbName, DocId, AName),
case kz_json:get_value(<<"url">>, HandlerProps) of
'undefined' -> kz_att_error:new('invalid_data', Routines);
Url ->
handle_fetch_attachment_resp(fetch_attachment(Url), Routines)

BaseUrl = kz_binary:strip_right(BaseUrlParam, $/),
ClientSegment = kz_att_util:format_url(HProps, {DbName, DocId, AName}, fields(HProps)),
Separator = base_separator(BaseUrl),

URL = list_to_binary([BaseUrl, Separator, ClientSegment]),

{'ok', Doc} = kz_datamgr:open_cache_doc(DbName, DocId),
Metadata = kz_json:get_json_value(<<"metadata">>, Doc, kz_doc:public_fields(Doc)),
QS = kz_http_util:json_to_querystring(Metadata),

FetchURL = join_url_and_querystring(URL, QS),

handle_fetch_attachment_resp(fetch_attachment(FetchURL), Routines).

-spec handler_props_map(gen_attachment:handler_props()) -> gen_attachment:settings().
handler_props_map(HandlerProps) ->
case kz_json:get_value(<<"handler_props">>, HandlerProps) of
HP when is_map(HP) -> HP;
_ -> #{}
end.

join_url_and_querystring(<<URL/binary>>, QS) ->
join_url_and_querystring(kz_http_util:urlsplit(URL), QS);
join_url_and_querystring({Scheme, Location, Path, <<>>, Frag}, QS) ->
kz_http_util:urlunsplit({Scheme, Location, Path, QS, Frag});
join_url_and_querystring({Scheme, Location, Path, QueryString, Frag}, QS) ->
kz_http_util:urlunsplit({Scheme, Location, Path, kz_binary:join([QueryString, QS], <<"&">>), Frag}).

-spec handle_fetch_attachment_resp(gen_attachment:fetch_response(), kz_att_error:update_routines()) ->
gen_attachment:fetch_response().
handle_fetch_attachment_resp({'error', Url, Resp}, Routines) ->
Expand Down
13 changes: 9 additions & 4 deletions core/kazoo_bindings/src/kazoo_bindings.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
-behaviour(gen_server).

%% API
-export([start_link/0
-export([start_link/0, is_running/0
,bind/2, bind/3, bind/4
,unbind/2, unbind/3, unbind/4
,map/2, map/3, pmap/2, pmap/3
Expand Down Expand Up @@ -300,6 +300,10 @@ matches(_, _) -> 'false'.
start_link() ->
gen_server:start_link({'local', ?SERVER}, ?MODULE, [], []).

-spec is_running() -> boolean().
is_running() ->
is_pid(whereis(?SERVER)).

-spec stop() -> 'ok'.
stop() -> gen_server:cast(?SERVER, 'stop').

Expand All @@ -323,7 +327,8 @@ bind(Binding, Module, Fun) when is_binary(Binding) ->
bind_result() | bind_results().
bind([_|_]=Bindings, Module, Fun, Payload) ->
[bind(Binding, Module, Fun, Payload) || Binding <- Bindings];
bind(Binding, 'undefined' = Module, Fun, Payload) ->
bind(Binding, Module, Fun, Payload)
when is_function(Fun, 1) ->
lager:debug("adding binding ~s for ~p (~p)", [Binding, Fun, Payload]),
gen_server:call(?SERVER, {'bind', Binding, Module, Fun, Payload}, 'infinity');
bind(Binding, Module, Fun, Payload) ->
Expand Down Expand Up @@ -859,8 +864,8 @@ log_apply(Format, Args, _Silent) ->
lager:debug(Format, Args).

-spec apply_map_responder(module() | 'undefined', responder_fun(), payload()) -> payload().
apply_map_responder('undefined', Fun, Payload) ->
log_apply("applying fun ~p/1", [Fun]),
apply_map_responder(_M, Fun, Payload)
when is_function(Fun, 1) ->
Fun(Payload);
apply_map_responder(M, F, Payload) ->
log_apply("applying ~s:~p/~p", [M, F, length(Payload)]),
Expand Down
8 changes: 4 additions & 4 deletions core/kazoo_caches/src/kz_cache_callbacks.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,17 @@ exec_erase_callbacks(Name, Key) ->
kz_cache_processes:unmonitor_key(Name, Key),
try ets:lookup_element(Name, Key, #cache_obj.callback) of
Fun when is_function(Fun, 3) ->
exec_erase_callbacks(Name, Key, Fun),
'ok';
exec_erase_callbacks(Name, Key, Fun);
_Else -> 'ok'
catch
'error':'badarg' -> 'ok'
end.

-spec exec_erase_callbacks(atom(), any(), callback_fun()) -> pid().
-spec exec_erase_callbacks(atom(), any(), callback_fun()) -> 'ok'.
exec_erase_callbacks(Name, Key, Fun) ->
Value = ets:lookup_element(Name, Key, #cache_obj.value),
exec_callback(Fun, [Key, Value, 'erase']).
_Callback = exec_callback(Fun, [Key, Value, 'erase']),
lager:debug("~s:~p exec erase callback in ~p", [Name, Key, _Callback]).

-spec timed_out(atom(), reference()) -> 'ok'.
timed_out(MonitorName, MonitorRef) ->
Expand Down
9 changes: 4 additions & 5 deletions core/kazoo_data/src/kazoo_data.app.src
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{application,kazoo_data,
[{applications,[kazoo,kazoo_attachments,kazoo_bindings,kazoo_caches,
kazoo_config,kazoo_stdlib,kernel,
lager,stdlib]},
[{applications,[kazoo,kazoo_amqp,kazoo_attachments,
kazoo_bindings,kazoo_caches,kazoo_config,
kazoo_stdlib,kernel,lager,stdlib]},
{description,"Kazoo Data Access abstraction layer"},
{id,"108013a5-95e7-4e18-5757-344d22bebce8"},
{vsn,"4.0.0"},
Expand All @@ -12,5 +12,4 @@
{env,[{default_archive_folder,<<"/tmp">>},
{max_bulk_insert,2000},
{max_bulk_read,2000},
{cache_strategy, none}
]}]}.
{cache_strategy,none}]}]}.
6 changes: 5 additions & 1 deletion core/kazoo_data/src/kz_datamgr.erl
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,11 @@ fetch_attachment(DbName, {DocType, DocId}, AName, Options) ->
fetch_attachment(DbName, DocId, AName, maybe_add_doc_type(DocType, Options));
fetch_attachment(DbName, DocId, AName, Options) ->
Database = kzs_util:to_database(DbName),
kzs_attachments:fetch_attachment(kzs_plan:plan(Database, Options), Database, DocId, AName, Options).
case attachment_options(Database, DocId, Options) of
{'error', _E}=Error -> Error;
{'ok', NewOpts} ->
kzs_attachments:fetch_attachment(kzs_plan:plan(Database, NewOpts), DbName, DocId, AName, Options)
end.

-spec stream_attachment(database_name(), docid(), kz_term:ne_binary()) ->
{'ok', reference()} |
Expand Down
4 changes: 2 additions & 2 deletions core/kazoo_data/src/kzs_attachments.erl
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ do_stream_attachment_from_handler([{Handler, HandlerProps}], {Module, ModuleProp
relay_stream_attachment(Caller, Ref, Module, Props, DbName, DocId, AName) ->
case Module:fetch_attachment(Props, DbName, DocId, AName) of
{'ok', Bin} -> relay_stream_attachment(Caller, Ref, Bin);
{'error', _} = Error -> Caller ! {Ref, Error}
{'error', _} = Error -> Caller ! {Ref, Error};
{'error', Reason, _Extended} -> Caller ! {Ref, {'error', Reason}}
end.

-define(CHUNK_SIZE, 8192).
Expand All @@ -128,7 +129,6 @@ relay_stream_attachment(Caller, Ref, Bin) ->
Caller ! {Ref, {'ok', Bin}},
relay_stream_attachment(Caller, Ref, <<>>).


-type att_map() :: #{'att_handler':={_,_}
,'att_post_handler':='external', _=>_
,'server' := {module(), db()}
Expand Down
4 changes: 1 addition & 3 deletions core/kazoo_data/src/kzs_doc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,7 @@ del_doc(#{server := {App, Conn}}=Server, DbName, Doc, Options) ->
del_docs(Server, DbName, Docs, Options) ->
do_delete_docs(Server, DbName, prepare_docs_for_deletion(Server, DbName, Docs), Options).

do_delete_docs(_Server, _DbName, [], _Options) ->
lager:debug("no docs to delete"),
{'ok', []};
do_delete_docs(_Server, _DbName, [], _Options) -> {'ok', []};
do_delete_docs(#{server := {App, Conn}}, DbName, DelDocs, Options) ->
{PreparedDocs, PublishDocs} = lists:unzip([prepare_doc_for_save(DbName, D) || D <- DelDocs]),
try App:del_docs(Conn, DbName, PreparedDocs, Options) of
Expand Down
Loading

0 comments on commit 8ff64d7

Please sign in to comment.