Skip to content

Commit

Permalink
add device cache
Browse files Browse the repository at this point in the history
  • Loading branch information
lsxredrain committed Jul 20, 2021
1 parent f7d021d commit 73cf1f9
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 22 deletions.
6 changes: 3 additions & 3 deletions apps/dgiot/src/storage/dgiot_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ search() ->
{_, K, V} = X,
#{K => V}
end,
search(Fun,#{<<"start">> => 0, <<"len">> => 5}).
search(Fun,#{<<"start">> => 0, <<"limit">> => 5}).

search(Fun,Page) ->
search(?MNESIA_TAB, Fun, Page).
Expand All @@ -258,9 +258,9 @@ search(Name, Fun, Page) ->
DataSet.

search(_, _, '$end_of_table', Acc, Page) ->
Page#{<<"data">> => lists:reverse(Acc), <<"len">> => length(Acc)};
Page#{<<"data">> => lists:reverse(Acc), <<"limit">> => length(Acc)};

search(Name, Fun, Key, Acc, Page = #{<<"start">> := Start, <<"len">> := Len, <<"count">> := Count}) when Count >= Start, Count < Start + Len ->
search(Name, Fun, Key, Acc, Page = #{<<"start">> := Start, <<"limit">> := Limit, <<"count">> := Count}) when Count >= Start, Count < Start + Limit ->
case ets:lookup(Name, Key) of
[Row | _] ->
case Fun(Row) of
Expand Down
68 changes: 60 additions & 8 deletions apps/dgiot_device/src/dgiot_device.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,91 @@
-dgiot_data("ets").
-export([init_ets/0]).
-export([create_device/1, create_device/2, get_sub_device/1, get_sub_device/2, get/2]).
-export([save/1, save/2, save/3, lookup/1, lookup/2, delete/1, delete/2, save_prod/2, lookup_prod/1]).
-export([post/1, put/1, save/1, save/2, save/3, lookup/1, lookup/2, delete/1, delete/2, save_prod/2, lookup_prod/1]).
-export([encode/1, decode/3]).

init_ets() ->
dgiot_data:init(?DGIOT_PRODUCT),
ok.

post(Device) ->
DeviceId = maps:get(<<"objectId">>, Device),
#{<<"objectId">> := ProductId} = maps:get(<<"product">>, Device),
#{<<"latitude">> := Latitude, <<"longitude">> := Logitude} = maps:get(<<"location">>, Device, #{<<"latitude">> => 0, <<"longitude">> => 0}),
Product = binary_to_atom(ProductId),
Acl =
lists:foldl(fun(X, Acc) ->
Acc ++ [binary_to_atom(X)]
end, [], maps:keys(maps:get(<<"ACL">>, Device))),
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), {Latitude, Logitude}, Product, Acl], node()}).

put(Device) ->
DeviceId = maps:get(<<"objectId">>, Device),
case lookup(DeviceId) of
{ok, {[_Ts, {OldLatitude, OldLogitude}, OldProduct, OldAcl], Node}} ->
#{<<"objectId">> := ProductId} = maps:get(<<"product">>, Device, #{<<"objectId">> => atom_to_binary(OldProduct)}),
#{<<"latitude">> := Latitude, <<"longitude">> := Logitude} = maps:get(<<"location">>, Device, #{<<"latitude">> => OldLatitude, <<"longitude">> => OldLogitude}),
Product = binary_to_atom(ProductId),
NewAcl =
case maps:find(<<"ACL">>, Device) of
error ->
OldAcl;
{ok, Acl} ->
lists:foldl(fun(X, Acc) ->
Acc ++ [binary_to_atom(X)]
end, [], maps:keys(Acl))
end,
LastAcl =
case length(NewAcl) of
0 ->
OldAcl;
_ ->
NewAcl
end,
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), {Latitude, Logitude}, Product, LastAcl], Node});
_ ->
#{<<"objectId">> := ProductId} = maps:get(<<"product">>, Device),
#{<<"latitude">> := Latitude, <<"longitude">> := Logitude} = maps:get(<<"location">>, Device, #{<<"latitude">> => 0, <<"longitude">> => 0}),
Product = binary_to_atom(ProductId),
Acl =
lists:foldl(fun(X, Acc) ->
Acc ++ [binary_to_atom(X)]
end, [], maps:keys(maps:get(<<"ACL">>, Device))),
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), {Latitude, Logitude}, Product, Acl], node()})
end.

save(Device) ->
DeviceId = maps:get(<<"objectId">>, Device),
#{<<"objectId">> := ProductId} = maps:get(<<"product">>, Device),
#{<<"latitude">> := Latitude, <<"longitude">> := Logitude} = maps:get(<<"location">>, Device,#{<<"latitude">> => 0, <<"longitude">> => 0}),
#{<<"latitude">> := Latitude, <<"longitude">> := Logitude} = maps:get(<<"location">>, Device, #{<<"latitude">> => 0, <<"longitude">> => 0}),
Product = binary_to_atom(ProductId),
Acl =
lists:foldl(fun(X, Acc) ->
Acc ++ [binary_to_atom(X)]
end, [], maps:keys(maps:get(<<"ACL">>, Device))),
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), {Latitude, Logitude}, Product, Acl], node()}).

save(ProductId, DevAddr) ->
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
save(DeviceId, Data) ->
case lookup(DeviceId) of
{ok, {[_Ts, Location, Product, Acl], Node}} ->
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), Location, Product, Acl], Node});
{ok, {[_Ts, {OldLatitude, OldLogitude}, Product, Acl], Node}} ->
Latitude = maps:get(<<"latitude">>, Data, OldLatitude),
Logitude = maps:get(<<"longitude">>, Data, OldLogitude),
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), {Latitude, Logitude}, Product, Acl], Node});
_ -> pass
end.

save(DeviceId, Latitude, Logitude) ->
save(ProductId, DevAddr, Data) ->
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
case lookup(DeviceId) of
{ok, {[_Ts, _, Product, Acl], Node}} ->
{ok, {[_Ts, {OldLatitude, OldLogitude}, Product, Acl], Node}} ->
Latitude = maps:get(<<"latitude">>, Data, OldLatitude),
Logitude = maps:get(<<"longitude">>, Data, OldLogitude),
dgiot_mnesia:insert(DeviceId, {[dgiot_datetime:now_ms(), {Latitude, Logitude}, Product, Acl], Node});
_ -> pass
end.



lookup(DeviceId) ->
case dgiot_mnesia:lookup(DeviceId) of
{atomic, []} ->
Expand Down
5 changes: 0 additions & 5 deletions apps/dgiot_opc/src/dgiot_opc_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@ init(?TYPE, ChannelId, ChannelArgs) ->
handle_init(State) ->
dgiot_mqtt:subscribe(<<"dgiot_opc_da_ack">>),
dgiot_mqtt:subscribe(<<"dgiot_opc_da_scan">>),
dgiot_parse:subscribe(<<"Device">>, post),
%% erlang:send_after(1000 * 5, self(), scan_opc),
%% erlang:send_after(1000 * 10, self(), send_opc),
{ok, State}.
Expand All @@ -109,10 +108,6 @@ handle_event(EventId, Event, _State) ->
?LOG(info, "channel ~p, ~p", [EventId, Event]),
ok.

handle_message({sync_parse, Args}, State) ->
?LOG(info, "sync_parse ~p", [Args]),
{ok, State};

handle_message(scan_opc, #state{env = Env} = State) ->
dgiot_opc:scan_opc(Env),
{ok, State};
Expand Down
1 change: 0 additions & 1 deletion apps/dgiot_parse/src/dgiot_parse.erl
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,6 @@ request(Name, Method, Header, Path0, Body, Options) ->
{error, Reason}
end.


get_tables(Dirs) -> get_tables(Dirs, []).
get_tables([], Acc) -> Acc;
get_tables([Dir | Other], Acc) ->
Expand Down
49 changes: 44 additions & 5 deletions apps/dgiot_parse/src/dgiot_parse_rest.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ request(Method, Header, Path0, Body, Options) ->
#{<<"count">> := Count} ->
handle_result(Fun(), #{<<"count">> => Count});
_ ->
?LOG(error,"count not find, ~p~n", [CountBody]),
?LOG(error, "count not find, ~p~n", [CountBody]),
handle_result(Fun(), #{})
end;
Other ->
Expand All @@ -94,6 +94,45 @@ request(Method, Header, Path0, Body, Options) ->
%%% Internal functions
%%%===================================================================

save_cache(_, <<"/batch">>, #{<<"requests">> := Requests}) ->
lists:map(fun(X) ->
Method = maps:get(<<"method">>, X, <<"">>),
Path = maps:get(<<"path">>, X, <<"">>),
Body = maps:get(<<"body">>, X, #{}),
save_cache(Method, Path, Body)
end, Requests),
pass;

save_cache('POST', Path, Device) ->
case Path of
<<"/classes/Device">> ->
Map = jsx:decode(Device, [{labels, binary}, return_maps]),
dgiot_device:post(Map);
_ ->
pass
end;

save_cache('PUT', Path, Device) ->
case Path of
<<"/classes/Device/", ObjectId/binary>> ->
Map = jsx:decode(Device, [{labels, binary}, return_maps]),
dgiot_device:put(Map#{<<"objectId">> => ObjectId});
_ ->
pass
end;

save_cache('DELETE', Path, Device) ->
case Path of
<<"/classes/Device/", ObjectId/binary>> ->
dgiot_device:delete(ObjectId);
_ ->
?LOG(error, "Path ~p, Device ~p", [ Path, Device]),
pass
end;

save_cache(_, _Path, _Device) ->
pass.


to_list(V) when is_atom(V) -> atom_to_list(V);
to_list(V) when is_binary(V) -> binary_to_list(V);
Expand Down Expand Up @@ -275,15 +314,16 @@ encode_body(_Path, _Method, Map, _) ->
jsx:encode(Map).

do_request(Method, Path, Header, Data, Options) ->
%% ?LOG(info,"Method ~p, Path ~p, Data ~p, Options ~p",[Method, Path, Data, Options]),
case httpc_request(Method, Path, Header, Data, [], [], Options) of
{error, Reason} ->
{error, Reason};
{ok, StatusCode, Headers, ResBody} ->
case do_request_after(Method, Path, Data, ResBody, Options) of
{ok, NewResBody} ->
save_cache(Method, Path, Data),
{ok, StatusCode, Headers, NewResBody};
ignore ->
save_cache(Method, Path, Data),
{ok, StatusCode, Headers, ResBody};
{error, Reason} ->
{error, Reason}
Expand All @@ -301,7 +341,6 @@ httpc_request(Method, Path, Header, Query, HttpOptions, ReqOptions, Options) whe
#{<<"host">> := Host, <<"path">> := ParsePath} = proplists:get_value(cfg, Options),
Url = dgiot_httpc:url_join([Host, ParsePath] ++ [<<Path/binary, Query/binary>>]),
Request = {Url, Header},

httpc_request(Method, Request, HttpOptions, ReqOptions);

httpc_request(Method, Path, Header, Body, HttpOptions, ReqOptions, Options) when Method == 'POST'; Method == 'PUT' ->
Expand Down Expand Up @@ -387,7 +426,7 @@ handle_result(Result, Map) ->

log(Method, {Url, Header}) ->
IsLog = application:get_env(dgiot_parse, log, false),
IsLog andalso ?LOG(info,"~s ~s ~p", [method(Method), Url, Header]);
IsLog andalso ?LOG(info, "~s ~s ~p", [method(Method), Url, Header]);
log(Method, {Url, Header, _, Body}) ->
IsLog = application:get_env(dgiot_parse, log, false),
IsLog andalso ?LOG(info,"~s ~s Header:~p Body:~p", [method(Method), Url, Header, Body]).
IsLog andalso ?LOG(info, "~s ~s Header:~p Body:~p", [method(Method), Url, Header, Body]).
1 change: 1 addition & 0 deletions apps/dgiot_tdengine/src/dgiot_tdengine_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ do_save([ProductId, DevAddr, Data, _Context], #state{id = ChannelId} = State) ->
?LOG(error, "Save to tdengine error, ~p, ~p", [Data, Reason]);
{ok, #{<<"thing">> := Properties}} ->
Object = format_data(ProductId, DevAddr, Properties, Data),
dgiot_device:save(ProductId,DevAddr,Data),
save_to_cache(ChannelId, Object)
end,
{ok, State}.
Expand Down

0 comments on commit 73cf1f9

Please sign in to comment.