Skip to content

Commit

Permalink
feat: taos_mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
dawnwinterLiu committed Nov 14, 2022
1 parent ec17b48 commit de3874e
Show file tree
Hide file tree
Showing 8 changed files with 70 additions and 12 deletions.
5 changes: 3 additions & 2 deletions apps/dgiot_device/src/utils/dgiot_device_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ parse_cache_Device(_ClassName) ->
%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, ClassName]),
dgiot_product:load_cache(),
Success = fun(Page) ->
lists:map(fun(#{<<"devaddr">> := Devaddr} = Device) ->
lists:map(fun(#{<<"devaddr">> := _Devaddr} = Device) ->
%% save_profile(Device),
io:format("Devaddr ~p ~n",[Devaddr]),
timer:sleep(10),
%% io:format("Devaddr ~p ~n",[Devaddr]),
dgiot_device:save(Device)
end, Page)
end,
Expand Down
5 changes: 5 additions & 0 deletions apps/dgiot_dlink/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ Dlink协议是针对物联网开发领域设计的一种数据交换规范,数
| app对接 |$dg/thing/uniapp/{SessionToken}/report|app|平台|
| 设备初始化请求 |$dg/thing/{productId}/{deviceAddr}/init/request|设备|平台|
| 设备初始化响应 |$dg/device/{productId}/{deviceAddr}/init/response|平台|设备|
| taos 默认topic |$dg/taos/tdpool/{clientId}/# |平台|taos|
| taos sql |$dg/taos/tdpool/{clientId}/sql |平台|taos|
| taos 调试模式 |$dg/taos/tdpool/{clientId}/debug |平台|taos|
| taos 设置连接参数 |$dg/taos/tdpool/{clientId}/connect |平台|taos|
| taos sql执行结果 |$dg/thing/taos/{clientId} |taos|平台|

## payload设计
### 属性上报 ($dg/thing/{productId}/{deviceAddr}/properties/report)
Expand Down
19 changes: 17 additions & 2 deletions apps/dgiot_dlink/src/proctol/dgiot_mqtt_acl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ description() -> "Acl with Dlink".
%%--------------------------------------------------------------------
%% Internal functions
%%-------------------------------------------------------------------
do_check(#{username := <<"dgiot">>, peerhost := PeerHost}, _PubSub, _Topic) when PeerHost == {127,0,0,1} ->
do_check(#{username := <<"taos">>, peerhost := PeerHost, clientid := ClientId}, publish, <<"$dg/thing/taos/", ClientId/binary>> = _Topic) when PeerHost == {127, 0, 0, 1} ->
io:format("~s ~p ClientId: ~p _Topic ~p ~n", [?FILE, ?LINE, ClientId, _Topic]),
allow;

do_check(#{username := <<"dgiot">>, peerhost := PeerHost}, _PubSub, _Topic) when PeerHost == {127, 0, 0, 1} ->
%% io:format("~s ~p ClientId: ~p _Topic ~p ~n", [?FILE, ?LINE, ClientId, _Topic]),
allow;

Expand All @@ -52,13 +56,24 @@ do_check(#{username := <<"dgiot">>, clientid := ClientId}, _PubSub, _Topic) ->
deny
end;

do_check(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binary>>, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", ProductID:10/binary, "_", Rest/binary>> = _Topic) ->
%% io:format("~s ~p Topic: ~p _ClientInfo ~p~n", [?FILE, ?LINE, _Topic, _ClientInfo]),
Len = size(DeviceAddr),
case Rest of
<<DeviceAddr:Len/binary, _/binary>> ->
allow;
_ ->
deny
end;


do_check(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binary>>, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", DeviceInfo/binary>> = _Topic) ->
%% io:format("~s ~p Topic: ~p _ClientInfo ~p~n", [?FILE, ?LINE, _Topic, _ClientInfo]),
check_device_addr(DeviceInfo, DeviceAddr);

%% "$dg/thing/productid/devaddr/#"
do_check(#{clientid := DeviceAddr, username := ProductID} = _ClientInfo, publish, <<"$dg/thing/", ProductID:10/binary, "/", DeviceInfo/binary>> = _Topic) ->
io:format("~s ~p Topic: ~p _ClientInfo ~p~n", [?FILE, ?LINE, _Topic, _ClientInfo]),
%% io:format("~s ~p Topic: ~p _ClientInfo ~p~n", [?FILE, ?LINE, _Topic, _ClientInfo]),
check_device_addr(DeviceInfo, DeviceAddr);

%% "$dg/thing/deviceid/#"
Expand Down
7 changes: 6 additions & 1 deletion apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@
, description/0
]).

check(#{peerhost := PeerHost, username := <<"dgiot">> }, AuthResult, _) when PeerHost == {127, 0, 0, 1} ->
check(#{peerhost := PeerHost, username := <<"taos">>, clientid := ClientId}, AuthResult, _) when PeerHost == {127, 0, 0, 1} ->
io:format("~s ~p ClientId = ~p.~n", [?FILE, ?LINE, ClientId]),
dgiot_tdengine:save_tdpools(ClientId),
{ok, AuthResult#{anonymous => false, auth_result => success}};

check(#{peerhost := PeerHost, username := <<"dgiot">>}, AuthResult, _) when PeerHost == {127, 0, 0, 1} ->
{ok, AuthResult#{anonymous => false, auth_result => success}};

check(#{username := <<"dgiot">>, password := Password}, AuthResult, _) ->
Expand Down
4 changes: 2 additions & 2 deletions apps/dgiot_dlink/src/proctol/dgiot_mqtt_offline.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ on_session_terminated(#{clientid := <<ProductID:10/binary, "_", DeviceAddr/binar
io:format("~s ~p ProductID = ~p DeviceAddr ~p DeviceId ~p ~n", [?FILE, ?LINE, ProductID, DeviceAddr, DeviceId]),
ok;

on_session_terminated(ClientInfo, _Reason, _SessInfo, _State) ->
io:format("~s ~p ClientInfo = ~p.~n", [?FILE, ?LINE, ClientInfo]),
on_session_terminated(_ClientInfo, _Reason, _SessInfo, _State) ->
%% io:format("~s ~p ClientInfo = ~p.~n", [?FILE, ?LINE, ClientInfo]),
ok.

description() -> "Disconnected with Dlink".
Expand Down
3 changes: 0 additions & 3 deletions apps/dgiot_factory/src/dgiot_factory_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -256,9 +256,6 @@ do_request(post_duplicate_shift, #{<<"product">> := ProductId, <<"sink_date">> :
do_request(put_sumpick, #{<<"BatchList">> := BatchList} = _Arg, _Context, _Req) ->
Sum = dgiot_factory_utils:get_sum(BatchList),
{ok, #{<<"status">> => 0, msg => <<"操作成功"/utf8>>, <<"data">> => #{<<"SumPick">> => Sum}}};
do_request(put_materialapply_id, #{<<"id">> := DeviceId,<<"basedata">> := BaseData} = _Arg, _Context, _Req) ->
dgiot_material_channel:handle_material_apply(DeviceId,BaseData),
{ok, #{<<"status">> => 0, msg => <<"操作成功"/utf8>>, <<"data">> => #{}}};
%% 服务器不支持的API接口
do_request(_OperationId, _Args, _Context, _Req) ->
io:format("~s ~p _Args = ~p ~n", [?FILE, ?LINE, _Args]),
Expand Down
34 changes: 33 additions & 1 deletion apps/dgiot_tdengine/src/dgiot_tdengine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
-export([create_database/2, create_schemas/1, create_object/2, create_user/2, alter_user/2, delete_user/1, query_object/2, batch/1, parse_batch/1]).
-export([transaction/2, format_data/5]).
-export([get_reportdata/3]).
-export([export/2, import/2]).
-export([export/2, import/2, save_tdpools/1, tdpool_connect/1]).
-export([save_sql/1]).

%% dgiot_tdengine:export().
export(ChannelId, #{<<"deviceid">> := DeviceId} = Body) ->
Expand Down Expand Up @@ -340,3 +341,34 @@ get_fields(Table) ->
_ ->
[]
end.

save_tdpools(ClientId) ->
erlang:spawn(fun() ->
apply(dgiot_mqtt, subscribe_mgmt, [ClientId, <<"$dg/taos/tdpool/", ClientId/binary, "/#">>]) end),
case dgiot_data:get(tdpool, pools) of
not_find ->
dgiot_data:insert(tdpool, pools, [ClientId]),
dgiot_data:insert(tdpool, {ClientId, pid}, self());
Que ->
dgiot_data:insert(tdpool, pools, dgiot_utils:unique_1(Que ++ [ClientId])),
dgiot_data:insert(tdpool, {ClientId, pid}, self())
end.

tdpool_connect(Password) ->
case dgiot_data:get(tdpool, pools) of
not_find ->
pass;
[ClientId | _Que] ->
Topic = <<"$dg/taos/tdpool/", ClientId/binary, "/connect">>,
timer:sleep(1000),
dgiot_mqtt:publish(ClientId, Topic, Password)
end.

save_sql(Sql) ->
case dgiot_data:get(tdpool, pools) of
not_find ->
pass;
[ClientId | _Que] ->
Topic = <<"$dg/taos/tdpool/", ClientId/binary, "/sql">>,
dgiot_mqtt:publish(ClientId, Topic, Sql)
end.
5 changes: 4 additions & 1 deletion apps/dgiot_tdengine/src/dgiot_tdengine_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@
}).

init_ets() ->
dgiot_data:init(tdpool),
dgiot_data:init(?DGIOT_TD_THING_ETS).

start(ChannelId, #{
Expand All @@ -169,7 +170,7 @@ start(ChannelId, #{
}).

%% 通道初始化
init(?TYPE, ChannelId, Config) ->
init(?TYPE, ChannelId, #{<<"password">> := Password} = Config) ->
Opts = [?CACHE(ChannelId), #{
auto_save => application:get_env(dgiot_tdengine, cache_auto_save, 30000),
size => application:get_env(dgiot_tdengine, cache_max_size, 50000),
Expand All @@ -184,6 +185,8 @@ init(?TYPE, ChannelId, Config) ->
Specs = [
{dgiot_dcache, {dgiot_dcache, start_link, Opts}, permanent, 5000, worker, [dgiot_dcache]}
],
erlang:spawn(fun() ->
apply(dgiot_tdengine, tdpool_connect, [Password]) end),
dgiot_metrics:dec(dgiot_tdengine, <<"tdengine">>, 1000),
DbType = maps:get(<<"db">>, Config, <<"ProductId">>),
dgiot_data:insert({tdengine_db, ChannelId}, DbType),
Expand Down

0 comments on commit de3874e

Please sign in to comment.