From a3499ba113856571dfbe5e19cc82c624c77fdee0 Mon Sep 17 00:00:00 2001 From: dawnwinterLiu <1737801684@qq.com> Date: Mon, 30 May 2022 14:48:38 +0800 Subject: [PATCH] fix: fgiot_tcp_client; make run error --- apps/dgiot/include/dgiot_client.hrl | 28 +- apps/dgiot/src/channel/dgiot_client.erl | 94 +- apps/dgiot/src/crypto/aes.erl | 16 +- apps/dgiot/src/transport/dgiot_tcp_client.erl | 126 +- .../dgiot_bacnet/src/dgiot_bacnet_channel.erl | 2 +- .../src/channel/dgiot_common_channel.erl | 95 + .../src/channel/dgiot_httpc_channel.erl | 2 +- .../src/channel/dgiot_tcp_channel.erl | 8 +- .../src/channel/dgiot_tcp_worker.erl | 21 +- .../src/channel/dgiot_tcpc_channel.erl | 17 +- .../src/channel/dgiot_tcpc_worker.erl | 61 +- apps/dgiot_bridge/src/dgiot_bridge.erl | 6 +- .../dgiot_device/src/dgiot_device_channel.erl | 5 + .../src/dgiot_notification_channel.erl | 2 +- .../src/dgiot_profile_channel.erl | 4 +- apps/dgiot_dlink/README.md | 16 +- apps/dgiot_dlink/priv/json/topo.json | 67 + .../priv/swagger/swagger_dlink.json | 43 + .../src/handler/dgiot_dlink_handler.erl | 17 + .../src/proctol/dgiot_mqtt_acl.erl | 4 +- .../src/proctol/dgiot_mqtt_auth.erl | 1 + .../src/dgiot_gb26875_channel.erl | 4 +- .../dgiot_hjt212/src/dgiot_hjt212_channel.erl | 4 +- apps/dgiot_http/etc/dgiot_http.conf | 2 +- apps/dgiot_http/src/umeng/dgiot_umeng.erl | 4 +- apps/dgiot_meter/src/dgiot_meter.erl | 10 +- apps/dgiot_meter/src/dgiot_meter_tcp.erl | 13 +- .../src/proctol/dlt645_decoder.erl | 19 +- apps/dgiot_modbus/src/dgiot_modbus_tcp.erl | 29 +- .../src/dgiot_modbusc_channel.erl | 2 +- apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl | 27 +- apps/dgiot_parse/priv/json/schemas.json | 1778 +++++++++++++++++ .../priv/swagger/swagger_parse.json | 28 +- apps/dgiot_parse/src/dgiot_parse.erl | 52 +- .../src/handler/dgiot_parse_handler.erl | 19 +- apps/dgiot_task/src/dgiot_task.erl | 17 +- apps/dgiot_task/src/dgiot_task_channel.erl | 3 +- apps/dgiot_task/src/dgiot_task_data.erl | 47 +- apps/dgiot_task/src/dgiot_task_worker.erl | 41 +- dgiot_install.sh | 4 +- 40 files changed, 2451 insertions(+), 287 deletions(-) create mode 100644 apps/dgiot_bridge/src/channel/dgiot_common_channel.erl create mode 100644 apps/dgiot_dlink/priv/json/topo.json create mode 100644 apps/dgiot_parse/priv/json/schemas.json diff --git a/apps/dgiot/include/dgiot_client.hrl b/apps/dgiot/include/dgiot_client.hrl index 9b4f0291bc..aa48af3ebd 100644 --- a/apps/dgiot/include/dgiot_client.hrl +++ b/apps/dgiot/include/dgiot_client.hrl @@ -14,21 +14,21 @@ %% limitations under the License. %%-------------------------------------------------------------------- -author("johnliu"). - +-define(DCLINET_QUE(Name), dgiot_utils:to_atom(lists:concat([dgiot_utils:to_atom(Name), "_que"]))). +-define(DCLINET_PNQUE(Name), dgiot_utils:to_atom(lists:concat([dgiot_utils:to_atom(Name), "_pnque"]))). -record(dclock, { - nexttime :: non_neg_integer(), %% 下一次闹铃执行时间 - freq :: non_neg_integer(), %% 周期闹铃提醒频率单位为秒 - count :: non_neg_integer(), %% 闹铃总计执行多少次 - round :: non_neg_integer(), %% 闹铃总计已执行轮次 - rand = true :: boolean() %% 闹铃任务启动是否随机错峰处理, 防止所有客户端在同一个时刻启动任务 + nexttime :: non_neg_integer(), %% 下一次闹铃执行时间 + freq :: non_neg_integer(), %% 周期闹铃提醒频率单位为秒 + count :: non_neg_integer(), %% 闹铃总计执行多少次 + round :: non_neg_integer() %% 闹铃总计已执行轮次 }). -record(dclient, { - channel :: atom(), %% 客户端的用户管理通道 - client :: binary(), %% 客户端地址 - status :: integer(), %% client的状态值 - clock :: #dclock{}, %% client的闹铃 - userdata :: any(), %% 用户自定义参数 + channel :: atom(), %% 客户端的用户管理通道 + client :: binary(), %% 客户端地址 + status :: integer(), %% client的状态值 + clock :: #dclock{}, %% client的闹铃 + userdata :: any(), %% 用户自定义参数 child :: any() %% 子模块参数 }). @@ -36,4 +36,8 @@ %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% dclient的状态字 -define(DCLIENT_SUCCESS, 0). % CLIENT运行正常 --define(DCLIENT_INTIALIZED, 1). % CLIENT初始化状态 \ No newline at end of file +-define(DCLIENT_INTIALIZED, 1). % CLIENT初始化状态 +-define(DCLIENT_START_LOGININ, 2). % CLIENT开始登录 +-define(DCLIENT_LOGIN_SUCCESS, 3). % CLIENT登录成功 +-define(DCLIENT_START_REGISTER, 4). % CLIENT开始注册 +-define(DCLIENT_REGISTER_SUCCESS, 5). % CLIENT注册成功 \ No newline at end of file diff --git a/apps/dgiot/src/channel/dgiot_client.erl b/apps/dgiot/src/channel/dgiot_client.erl index f01a598c12..2056bfb705 100644 --- a/apps/dgiot/src/channel/dgiot_client.erl +++ b/apps/dgiot/src/channel/dgiot_client.erl @@ -17,13 +17,14 @@ -module(dgiot_client). -author("johnliu"). -include("dgiot.hrl"). +-include("dgiot_client.hrl"). -include_lib("dgiot/include/logger.hrl"). %% API -export([register/3, unregister/1, start_link/2, add_clock/3, notify/3, add/2, set_consumer/2, get_consumer/1]). -export([start/2, start/3, stop/1, stop/2, stop/3, restart/2, get/2, send/4]). --export([get_nexttime/2, send_after/4, get_count/3]). - +-export([get_time/1, get_nexttime/2, get_count/3, get_rand/1]). +-export([get_que/2, save_que/3, start_que/1, get_pnque_len/2, save_pnque/5, get_pnque/2, del_pnque/2, start_pnque/3]). -type(result() :: any()). %% todo 目前只做参数检查,不做结果检查 %% @doc 注册client的通道管理池子 @@ -39,6 +40,8 @@ register(ChannelId, Sup, State) -> end, set_consumer(ChannelId, 100), dgiot_data:init(ChannelId), + dgiot_data:init(?DCLINET_QUE(ChannelId)), + dgiot_data:init(?DCLINET_PNQUE(ChannelId)), dgiot_data:delete({start_client, ChannelId}), dgiot_data:delete({stop_client, ChannelId}), ChildSpec = dgiot:child_spec(Sup, supervisor, [ChannelId]), @@ -53,6 +56,76 @@ unregister(ChannelId) -> dgiot_data:delete({start_client, ChannelId}), dgiot_data:delete({stop_client, ChannelId}). +save_pnque(ChannelId, DtuProductId, DtuAddr, ProductId, DevAddr) -> + DtuId = dgiot_parse_id:get_deviceid(DtuProductId, DtuAddr), + case dgiot_data:get(?DCLINET_PNQUE(ChannelId), DtuId) of + not_find -> + dgiot_data:insert(?DCLINET_PNQUE(ChannelId), DtuId, [{ProductId, DevAddr}]); + Pn_que -> + New_Pn_que = dgiot_utils:unique_2(Pn_que ++ [{ProductId, DevAddr}]), + dgiot_data:insert(?DCLINET_PNQUE(ChannelId), DtuId, New_Pn_que) + end. + +get_pnque_len(ChannelId, DtuId) -> + case dgiot_data:get(?DCLINET_PNQUE(ChannelId), DtuId) of + not_find -> + 0; + PnQue -> + length(PnQue) + end. + +get_pnque(ChannelId, DtuId) -> + case dgiot_data:get(?DCLINET_PNQUE(ChannelId), DtuId) of + not_find -> + not_find; + PnQue when length(PnQue) > 0 -> + Head = lists:nth(1, PnQue), + dgiot_data:insert(?DCLINET_PNQUE(ChannelId), DtuId, lists:nthtail(1, PnQue) ++ [Head]), + Head; + _ -> + not_find + end. + +del_pnque(ChannelId, DtuId) -> + case dgiot_data:get(?DCLINET_PNQUE(ChannelId), DtuId) of + not_find -> + pass; + PnQue when length(PnQue) > 0 -> + dgiot_data:delete(?DCLINET_PNQUE(ChannelId), DtuId); + _ -> + pass + end. +start_pnque(ChannelId, Type, ClinetId) -> + case dgiot_data:get(?DCLINET_PNQUE(ChannelId), ClinetId) of + not_find -> + not_find; + PnQue -> + lists:map(fun({ProductId, DevAddr}) -> + DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr), + BinChannelId = dgiot_utils:to_binary(ChannelId), + io:format("~s ~p ChannelId ~p, Type ~p , ClinetId ~p ",[?FILE, ?LINE, ChannelId, Type, ClinetId]), + dgiot_client:start(<>, DeviceId, #{<<"productid">> => ProductId, <<"devaddr">> => DevAddr, <<"dtuid">> => ClinetId}) + end, PnQue) + end. + +save_que(ChannelId, ProductId, DevAddr) -> + DtuId = dgiot_parse_id:get_deviceid(ProductId, DevAddr), + dgiot_data:insert(?DCLINET_QUE(ChannelId), DtuId, {ProductId, DevAddr}). + +get_que(ChannelId, DeviceId) when is_atom(ChannelId) -> + get_que(atom_to_binary(ChannelId), DeviceId); +get_que(ChannelId, DeviceId) -> + dgiot_data:get(?DCLINET_QUE(ChannelId), DeviceId). + +start_que(ChannelId) -> + Fun = fun + ({Key, _Value}) -> + dgiot_client:start(ChannelId, Key); + (_) -> + pass + end, + dgiot_data:loop(?DCLINET_QUE(ChannelId), Fun). + %% @doc 在通道管理池子中增加client的Pid号 -spec add(atom() | binary(), binary()) -> result(). add(ChannelId, ClientId) when is_binary(ChannelId) -> @@ -199,14 +272,10 @@ start_link(Module, #{<<"channel">> := ChannelId, <<"client">> := Client} = State end. %% @doc 做一下全局的错峰处理 --spec send_after(integer(), integer(), boolean(), any()) -> result(). -send_after(RetryTime, Freq, true, Msg) -> - Seed = Freq * 200, % 默认用采样周期的20%的时间来做随机 - Rand = rand:uniform(Seed), - erlang:send_after(RetryTime + Rand, self(), Msg); - -send_after(RetryTime, _Freq, _, Msg) -> - erlang:send_after(RetryTime, self(), Msg). +-spec get_rand(non_neg_integer()) -> non_neg_integer(). +get_rand(Freq) -> + Seed = Freq * 1000, % 默认用采样周期的20%的时间来做随机 + rand:uniform(Seed) div 1000. %% @doc 获取闹铃执行次数 -spec get_count(integer(), integer(), integer()) -> result(). @@ -217,6 +286,11 @@ get_count(_StartTime, _EndTime, Freq) when Freq =< 0 -> get_count(StartTime, EndTime, Freq) -> (EndTime - StartTime) div Freq. +get_time(Time) when is_integer(Time) -> + Time; +get_time(Time) -> + dgiot_datetime:localtime_to_unixtime(dgiot_datetime:to_localtime(Time)). + get_nexttime(NextTime, Freq) -> NowTime = dgiot_datetime:nowstamp(), get_nexttime(NowTime, Freq, NextTime). diff --git a/apps/dgiot/src/crypto/aes.erl b/apps/dgiot/src/crypto/aes.erl index dd5186cae9..a25c63a89b 100644 --- a/apps/dgiot/src/crypto/aes.erl +++ b/apps/dgiot/src/crypto/aes.erl @@ -19,6 +19,7 @@ %% API -export([encode/3, encode/4, decode/4]). +-compile({nowarn_deprecated_function, [{crypto, block_encrypt, 4},{crypto, block_decrypt, 4}]}). -define(AES_IV, <<"00000000000000000000000000000000">>). encode(Type, AES_KEY, Bin) -> @@ -29,21 +30,24 @@ encode(aes_cbc128, AES_KEY, AES_IV0, Bin) -> Len = erlang:size(Bin), Value = 16 - (Len rem 16), PadBin = binary:copy(<<0:8>>, Value), -%% EncodeB = crypto:block_encrypt(aes_cfb8, AES_KEY, AES_IV, <>), - EncodeB = crypto:crypto_one_time(aes_cbc128, AES_KEY, AES_IV, [<>], true), + EncodeB = crypto:block_encrypt(aes_cbc128, AES_KEY, AES_IV, <>), +%% EncodeB = crypto:crypto_one_time(aes_cbc128, AES_KEY, AES_IV, [<>], true), +%% io:format("~s ~p EncodeB ~p ~n", [?FILE, ?LINE, EncodeB]), dgiot_utils:binary_to_hex(EncodeB); encode(aes_cfb8, AES_KEY, AES_IV, Bin) -> -%% EncodeB = crypto:block_encrypt(aes_cfb8, AES_KEY, AES_IV, Bin), - EncodeB = crypto:crypto_one_time(aes_cfb8, AES_KEY, AES_IV, [Bin], true), + EncodeB = crypto:block_encrypt(aes_cfb8, AES_KEY, AES_IV, Bin), +%% EncodeB = crypto:crypto_one_time(aes_cfb8, AES_KEY, AES_IV, [Bin], true), +%% io:format("~s ~p EncodeB ~p ~n", [?FILE, ?LINE, EncodeB]), dgiot_utils:binary_to_hex(EncodeB). decode(aes_cbc128, AES_KEY, AES_IV, Bin) -> Bin1 = dgiot_utils:hex_to_binary(Bin), case erlang:size(Bin1) rem 16 of 0 -> -%% Bin2 = crypto:block_decrypt(aes_cbc128, AES_KEY, AES_IV, Bin1), - Bin2 = crypto:crypto_one_time(aes_cbc128, AES_KEY, AES_IV, [Bin1], false), + Bin2 = crypto:block_decrypt(aes_cbc128, AES_KEY, AES_IV, Bin1), +%% Bin2 = crypto:crypto_one_time(aes_cbc128, AES_KEY, AES_IV, [Bin1], false), +%% io:format("~s ~p EncodeB ~p ~n", [?FILE, ?LINE, Bin2]), binary:part(Bin2, {0, byte_size(Bin2) - binary:last(Bin2)}); _ -> {error, 1102} diff --git a/apps/dgiot/src/transport/dgiot_tcp_client.erl b/apps/dgiot/src/transport/dgiot_tcp_client.erl index 7e65c973b7..63cf03d6a2 100644 --- a/apps/dgiot/src/transport/dgiot_tcp_client.erl +++ b/apps/dgiot/src/transport/dgiot_tcp_client.erl @@ -21,9 +21,9 @@ -include_lib("dgiot/include/dgiot_client.hrl"). -behaviour(gen_server). %% API --export([start_link/1, send/1, send/3]). +-export([start_link/1, send/1, send/2, send/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(connect_state, {host, port, mod, socket = undefined}). +-record(connect_state, {host, port, mod, socket = undefined, freq = 30, count = 10}). -define(TIMEOUT, 10000). -define(TCP_OPTIONS, [binary, {active, once}, {packet, raw}, {reuseaddr, false}, {send_timeout, ?TIMEOUT}]). @@ -33,79 +33,88 @@ start_link(Args) -> init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"ip">> := Host, <<"port">> := Port, <<"mod">> := Mod} = Args]) -> Ip = dgiot_utils:to_list(Host), Port1 = dgiot_utils:to_int(Port), - Count = maps:get(<<"count">>, Args, max), + UserData = #connect_state{mod = Mod, host = Ip, port = Port1, freq = 30, count = 300}, + ChildState = maps:get(<<"child">>, Args, #{}), + StartTime = dgiot_client:get_time(maps:get(<<"starttime">>, Args, dgiot_datetime:now_secs())), + EndTime = dgiot_client:get_time(maps:get(<<"endtime">>, Args, dgiot_datetime:now_secs() + 1000000000)), Freq = maps:get(<<"freq">>, Args, 30), - UserData = #connect_state{mod = Mod, host = Ip, port = Port1}, - ChildState = maps:get(<<"child">>, Args, 30), - Clock = #dclock{freq = Freq, count = Count, round = 0}, + NextTime = dgiot_client:get_nexttime(StartTime, Freq), + Count = dgiot_client:get_count(StartTime, EndTime, Freq), + Rand = + case maps:get(<<"rand">>, Args, true) of + true -> 0; + _ -> dgiot_client:get_rand(Freq) + end, + Clock = #dclock{freq = Freq, nexttime = NextTime + Rand, count = Count, round = 0}, Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED, clock = Clock, userdata = UserData, child = ChildState}, dgiot_client:add(ChannelId, ClientId), case Mod:init(Dclient) of - {ok, NewChildState} -> - do_connect(Dclient#dclient{child = NewChildState}), - {ok, Dclient#dclient{child = NewChildState}, hibernate}; + {ok, NewDclient} -> + do_connect(NewDclient), + {ok, NewDclient, hibernate}; {stop, Reason} -> {stop, Reason} end. -handle_call({connection_ready, Socket}, _From, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod} = UserData} = Dclient) -> +handle_call({connection_ready, Socket}, _From, #dclient{ channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod} = UserData} = Dclient) -> NewUserData = UserData#connect_state{socket = Socket}, - case Mod:handle_info(connection_ready, Dclient) of - {noreply, NewChildState} -> - {reply, ok, Dclient#dclient{child = NewChildState, userdata = NewUserData}, hibernate}; - {stop, _Reason, NewChildState} -> + case Mod:handle_info(connection_ready, Dclient#dclient{userdata = NewUserData}) of + {noreply, NewDclient} -> + {reply, ok, NewDclient, hibernate}; + {stop, _Reason, NewDclient} -> dgiot_client:stop(ChannelId, ClientId), - {reply, _Reason, Dclient#dclient{child = NewChildState, userdata = NewUserData}} + {reply, _Reason, NewDclient} end; handle_call(Request, From, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod}} = Dclient) -> case Mod:handle_call(Request, From, Dclient) of - {reply, Reply, NewChildState} -> - {reply, Reply, Dclient#dclient{child = NewChildState}, hibernate}; - {stop, Reason, NewChildState} -> + {reply, Reply, NewDclient} -> + {reply, Reply, NewDclient, hibernate}; + {stop, Reason, NewDclient} -> dgiot_client:stop(ChannelId, ClientId), - {reply, Reason, Dclient#dclient{child = NewChildState}} + {reply, Reason, NewDclient} end. handle_cast(Msg, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod}} = Dclient) -> case Mod:handle_cast(Msg, Dclient) of - {noreply, NewChildState} -> - {noreply, Dclient#dclient{child = NewChildState}, hibernate}; - {stop, Reason, NewChildState} -> + {noreply, NewDclient} -> + {noreply, NewDclient, hibernate}; + {stop, Reason, NewDclient} -> dgiot_client:stop(ChannelId, ClientId), - {reply, Reason, Dclient#dclient{child = NewChildState}} + {reply, Reason, NewDclient} end. %% 连接次数为0了 -handle_info(do_connect, #dclient{channel = ChannelId, client = ClientId, clock = #dclock{count = Count}} = State) when Count =< 0 -> +handle_info(do_connect, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{count = Count}} = Dclient) when Count =< 0 -> dgiot_client:stop(ChannelId, ClientId), - {noreply, State, hibernate}; -handle_info(do_connect, #dclient{clock = #dclock{count = Count, round = Round, freq = Freq} = Clock} = State) -> + {noreply, Dclient, hibernate}; +handle_info(do_connect, #dclient{userdata = #connect_state{count = Count, freq = Freq} = UserData} = Dclient) -> timer:sleep(Freq * 1000), - NewState = State#dclient{clock = Clock#dclock{count = Count - 1, round = Round + 1}}, - do_connect(NewState), -%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, Count]), - {noreply, NewState, hibernate}; + NewDclient = Dclient#dclient{userdata = UserData#connect_state{count = Count - 1}}, + do_connect(NewDclient), + {noreply, NewDclient, hibernate}; -handle_info({connection_ready, Socket}, #dclient{userdata = #connect_state{mod = Mod}} = Dclient) -> - case Mod:handle_info(connection_ready, Dclient) of - {noreply, ChildState} -> +handle_info({connection_ready, Socket}, #dclient{userdata = #connect_state{mod = Mod} = UserData} = Dclient) -> + case Mod:handle_info(connection_ready, Dclient#dclient{userdata = UserData#connect_state{socket = Socket}}) of + {noreply, NewDclient} -> inet:setopts(Socket, [{active, once}]), - {noreply, Dclient#dclient{child = ChildState}, hibernate}; - {stop, Reason, ChildState} -> - {stop, Reason, Dclient#dclient{child = ChildState}} + {noreply, NewDclient, hibernate}; + {stop, Reason, NewDclient} -> + {stop, Reason, NewDclient} end; %% 往tcp server 发送报文 -handle_info({send, _PayLoad}, #dclient{userdata = #connect_state{socket = undefined}} = State) -> - {noreply, State, hibernate}; -handle_info({send, PayLoad}, #dclient{userdata = #connect_state{socket = Socket}} = State) -> +handle_info({send, _PayLoad}, #dclient{userdata = #connect_state{socket = undefined}} = Dclient) -> + {noreply, Dclient, hibernate}; +handle_info({send, PayLoad}, #dclient{userdata = #connect_state{host = _Ip, port = _Port, socket = Socket}} = Dclient) -> +%% io:format("~s ~p send to from ~p:~p : ~p ~n", [?FILE, ?LINE, _Ip, _Port, dgiot_utils:to_hex(PayLoad)]), gen_tcp:send(Socket, PayLoad), - {noreply, State, hibernate}; + {noreply, Dclient, hibernate}; %% 接收tcp server发送过来的报文 -handle_info({tcp, Socket, Binary}, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod, socket = Socket}} = Dclient) -> +handle_info({tcp, Socket, Binary}, #dclient{userdata = #connect_state{host = _Ip, port = _Port, mod = Mod}} = Dclient) -> +%% io:format("~s ~p recv from ~p:~p ~p ~n", [?FILE, ?LINE, _Ip, _Port, dgiot_utils:to_hex(Binary)]), NewBin = case binary:referenced_byte_size(Binary) of Large when Large > 2 * byte_size(Binary) -> @@ -114,23 +123,23 @@ handle_info({tcp, Socket, Binary}, #dclient{channel = ChannelId, client = Client Binary end, case Mod:handle_info({tcp, NewBin}, Dclient) of - {noreply, ChildState} -> + {noreply, NewDclient} -> inet:setopts(Socket, [{active, once}]), - {noreply, Dclient#dclient{child = ChildState}, hibernate}; - {stop, Reason, ChildState} -> + {noreply, NewDclient, hibernate}; + {stop, Reason, #dclient{channel = ChannelId, client = ClientId} = NewDclient} -> dgiot_client:stop(ChannelId, ClientId), - {noreply, Reason, Dclient#dclient{child = ChildState}, hibernate} + {noreply, Reason, NewDclient, hibernate} end; -handle_info({tcp_error, _Socket, _Reason}, State) -> - {noreply, State, hibernate}; +handle_info({tcp_error, _Socket, _Reason}, Dclient) -> + {noreply, Dclient, hibernate}; -handle_info({tcp_closed, _Sock}, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod, socket = Socket}} = Dclient) -> +handle_info({tcp_closed, Socket}, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod}} = Dclient) -> gen_tcp:close(Socket), case Mod:handle_info(tcp_closed, Dclient) of {noreply, ChildState} -> self() ! do_connect, - {noreply, Dclient#dclient{child = ChildState}, hibernate}; + {noreply, Dclient#dclient{child = ChildState}, hibernate}; {stop, _Reason, ChildState} -> dgiot_client:stop(ChannelId, ClientId), {noreply, Dclient#dclient{child = ChildState}, hibernate} @@ -138,26 +147,28 @@ handle_info({tcp_closed, _Sock}, #dclient{channel = ChannelId, client = ClientId handle_info(Info, #dclient{channel = ChannelId, client = ClientId, userdata = #connect_state{mod = Mod, socket = Socket}} = Dclient) -> case Mod:handle_info(Info, Dclient) of - {noreply, ChildState} -> - {noreply, Dclient#dclient{child = ChildState}, hibernate}; - {stop, _Reason, ChildState} -> + {noreply, NewDclient} -> + {noreply, NewDclient, hibernate}; + {stop, _Reason, NewDclient} -> gen_tcp:close(Socket), + timer:sleep(10), dgiot_client:stop(ChannelId, ClientId), - {noreply, Dclient#dclient{child = ChildState}, hibernate} + {noreply, NewDclient, hibernate} end. terminate(Reason, #dclient{userdata = #connect_state{mod = Mod}} = Dclient) -> Mod:terminate(Reason, Dclient). -code_change(OldVsn, #dclient{userdata = #connect_state{mod = Mod}, child = ChildState} = Dclient, Extra) -> - {ok, ChildState} = Mod:code_change(OldVsn, Dclient, Extra), - {ok, Dclient#dclient{child = ChildState}}. - +code_change(OldVsn, #dclient{userdata = #connect_state{mod = Mod}} = Dclient, Extra) -> + Mod:code_change(OldVsn, Dclient, Extra). %%%=================================================================== %%% Internal functions %%%=================================================================== +send(Socket, Payload) -> + gen_tcp:send(Socket, Payload). + send(Payload) -> self() ! {send, Payload}. @@ -185,4 +196,3 @@ do_connect(#dclient{userdata = #connect_state{host = Host, port = Port}}) -> do_connect(_) -> self() ! do_connect. - diff --git a/apps/dgiot_bacnet/src/dgiot_bacnet_channel.erl b/apps/dgiot_bacnet/src/dgiot_bacnet_channel.erl index 29417c6907..c3e7f9de60 100644 --- a/apps/dgiot_bacnet/src/dgiot_bacnet_channel.erl +++ b/apps/dgiot_bacnet/src/dgiot_bacnet_channel.erl @@ -56,7 +56,7 @@ order => 102, type => string, required => false, - default => <<"http://dgiot-1253666439.cos.ap-shanghai-fsi.myqcloud.com/shuwa_tech/zh/product/dgiot/channel/BACnet.jpg">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/BACnet.jpg">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> diff --git a/apps/dgiot_bridge/src/channel/dgiot_common_channel.erl b/apps/dgiot_bridge/src/channel/dgiot_common_channel.erl new file mode 100644 index 0000000000..6cfb145e30 --- /dev/null +++ b/apps/dgiot_bridge/src/channel/dgiot_common_channel.erl @@ -0,0 +1,95 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- +-module(dgiot_common_channel). +-behavior(dgiot_channelx). +-include("dgiot_bridge.hrl"). +-include_lib("dgiot/include/logger.hrl"). +-define(TYPE, <<"COMMON">>). +-record(state, {id, env}). +%% API +-export([ + start/2 +]). + +%% Channel callback +-export([init/3, handle_init/1, handle_event/3, handle_message/2, stop/3]). + +-channel(?TYPE). +-channel_type(#{ + cType => ?TYPE, + type => ?BRIDGE_CHL, + title => #{ + zh => <<"TCPC资源通道"/utf8>> + }, + description => #{ + zh => <<"TCPC资源通道"/utf8>> + } +}). +%% 注册通道参数 +-params(#{ + <<"ico">> => #{ + order => 102, + type => string, + required => false, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/TcpIcon.jpeg">>, + title => #{ + en => <<"channel ICO">>, + zh => <<"通道ICO"/utf8>> + }, + description => #{ + en => <<"channel ICO">>, + zh => <<"通道ICO"/utf8>> + } + } +}). + +start(ChannelId, ChannelArgs) -> + dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs). + +%% 通道初始化 +init(?TYPE, ChannelId, Args) -> + State = #state{id = ChannelId, env = Args}, + StartTime = maps:get(<<"startTime">>, Args, dgiot_datetime:now_secs() + 5), + EndTime = maps:get(<<"endTime">>, Args, dgiot_datetime:now_secs() + 1000000), + dgiot_client:add_clock(ChannelId, StartTime, EndTime), + ChildSpecs = maps:get(<<"childspecs">>, Args, []), + {ok, State, ChildSpecs}. + +handle_init(State) -> + {ok, State}. + +%% 通道消息处理,注意:进程池调用 +handle_event(_EventId, Event, State) -> + ?LOG(info, "channel ~p", [Event]), + {ok, State}. + +handle_message(start_client, #state{id = ChannelId} = State) -> + io:format("~s ~p ChannelId = ~p.~n", [?FILE, ?LINE, ChannelId]), + case dgiot_data:get({start_client, ChannelId}) of + not_find -> + dgiot_client:start_que(ChannelId); + _ -> + pass + end, + {ok, State}; + +handle_message(_Message, State) -> + {ok, State}. + +stop(ChannelType, ChannelId, _State) -> + dgiot_client:stop(ChannelId), + ?LOG(warning, "channel stop ~p,~p", [ChannelType, ChannelId]), + ok. diff --git a/apps/dgiot_bridge/src/channel/dgiot_httpc_channel.erl b/apps/dgiot_bridge/src/channel/dgiot_httpc_channel.erl index e6325cbef6..86725aa38a 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_httpc_channel.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_httpc_channel.erl @@ -242,7 +242,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/HTTP-collection.png">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/HTTPC.png">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl b/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl index 1f6844410b..e608cd2956 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl @@ -26,15 +26,13 @@ -record(state, { id, buff_size = 1024000, - devaddr = <<>>, heartcount = 0, head = "xxxxxx0eee", len = 0, - app = <<>>, - product = <<>>, - deviceId = <<>>, env = #{}, - dtutype = <<>> + dtutype = <<>>, + productIds = <<>>, + productId = <<>> }). %% API diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl b/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl index b2ef07538d..d3d7bf7fca 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_tcp_worker.erl @@ -24,12 +24,18 @@ -define(MAX_BUFF_SIZE, 1024). -record(state, { id, - productIds = [], - devices = #{}, - buff_size = 1024000 + buff_size = 1024000, + heartcount = 0, + head = "xxxxxx0eee", + len = 0, + env = #{}, + dtutype = <<>>, + productIds = <<>>, + productId = <<>> }). + %% TCP callback -export([child_spec/2, init/1, handle_info/2, handle_cast/2, handle_call/3, terminate/2, code_change/3]). @@ -39,21 +45,18 @@ child_spec(Port, State) -> %% ======================= %% {ok, State} | {stop, Reason} init(#tcp{state = #state{id = ChannelId} = State} = TCPState) -> - Time = rand:seed(exs1024), - _ = erlang:round(rand:uniform() * 60 + 1) * 1000, - erlang:send_after(Time, self(), login), case dgiot_bridge:get_products(ChannelId) of {ok, ?TYPE, ProductIds} -> lists:map(fun(ProductId) -> do_cmd(ProductId, connection_ready, <<>>, TCPState) end, ProductIds), NewState = State#state{productIds = ProductIds}, - TCPState#tcp{log = log_fun(ChannelId), state = NewState}; + {ok, TCPState#tcp{log = log_fun(ChannelId), state = NewState}}; {error, not_find} -> {error, not_find_channel} end. -handle_info({deliver, _, Msg}, TCPState) -> +handle_info({deliver, _, Msg}, TCPState) -> Payload = dgiot_mqtt:get_payload(Msg), Topic = dgiot_mqtt:get_topic(Msg), case binary:split(Topic, <<$/>>, [global, trim]) of @@ -96,7 +99,7 @@ handle_call(_Msg, _From, TCPState) -> handle_cast(_Msg, TCPState) -> {noreply, TCPState}. -terminate(_Reason, #tcp{state = #state{productIds = ProductIds}} = TCPState) -> +terminate(_Reason, #tcp{state = #state{productIds = ProductIds}} = TCPState) -> lists:map(fun(ProductId) -> do_cmd(ProductId, terminate, _Reason, TCPState) end, ProductIds), diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcpc_channel.erl b/apps/dgiot_bridge/src/channel/dgiot_tcpc_channel.erl index 14b1a72ac9..4d929740c8 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_tcpc_channel.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_tcpc_channel.erl @@ -68,7 +68,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/TcpIcon.jpeg">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/Tcpc.jpg">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> @@ -84,11 +84,14 @@ start(ChannelId, ChannelArgs) -> dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs). %% 通道初始化 -init(?TYPE, ChannelId, - #{<<"ip">> := Ip, <<"port">> := Port} = Args) -> +init(?TYPE, ChannelId, Args) -> State = #state{id = ChannelId, env = Args}, - dgiot_client:add_clock(ChannelId, dgiot_datetime:now_secs() + 5, dgiot_datetime:now_secs() + 30), - NewArgs = #{ <<"channel">> => ChannelId, <<"ip">> => Ip, <<"port">> => Port, <<"mod">> => dgiot_tcpc_worker, <<"count">> => 3, <<"freq">> => 10}, + StartTime = maps:get(<<"startTime">>, Args, dgiot_datetime:now_secs() + 5), + EndTime = maps:get(<<"endTime">>, Args, dgiot_datetime:now_secs() + 1000000), + Mod = maps:get(<<"mod">>, Args, dgiot_tcpc_worker), + io:format("~s ~p StartTime = ~p. EndTime = ~p~n", [?FILE, ?LINE, StartTime, EndTime]), + dgiot_client:add_clock(ChannelId, StartTime, EndTime), + NewArgs = Args#{<<"channel">> => ChannelId, <<"mod">> => Mod, <<"count">> => 3, <<"freq">> => 10}, {ok, State, dgiot_client:register(ChannelId, tcp_client_sup, NewArgs)}. handle_init(State) -> @@ -103,16 +106,16 @@ handle_message(start_client, #state{id = ChannelId} = State) -> io:format("~s ~p ChannelId = ~p.~n", [?FILE, ?LINE, ChannelId]), case dgiot_data:get({start_client, ChannelId}) of not_find -> - [dgiot_client:start(ChannelId, dgiot_utils:to_binary(I)) || I <- lists:seq(1, 10)]; + dgiot_client:start(ChannelId, dgiot_utils:to_binary(<<"AF000001">>)); _ -> pass end, - {ok, State}; handle_message(_Message, State) -> {ok, State}. stop(ChannelType, ChannelId, _State) -> + dgiot_client:stop(ChannelId), ?LOG(warning, "channel stop ~p,~p", [ChannelType, ChannelId]), ok. diff --git a/apps/dgiot_bridge/src/channel/dgiot_tcpc_worker.erl b/apps/dgiot_bridge/src/channel/dgiot_tcpc_worker.erl index 65d757da18..68c9679bfd 100644 --- a/apps/dgiot_bridge/src/channel/dgiot_tcpc_worker.erl +++ b/apps/dgiot_bridge/src/channel/dgiot_tcpc_worker.erl @@ -25,49 +25,54 @@ -export([init/1, handle_info/2, terminate/2]). %% tcp client callback -init(#dclient{channel = ChannelId, child = #child_state{} = ChildState}) -> - case dgiot_product:get_channel(ChannelId) of - not_find -> - {stop, <<"not find product">>}; - ProductId -> - io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, ChannelId, ProductId]), - {ok, ChildState#child_state{product = ProductId}} +init(#dclient{child = ChildState} = Dclient) when is_map(ChildState) -> + case maps:find(<<"productid">>, ChildState) of + {ok, ProductId} -> + case do_cmd(ProductId, init, ChildState, Dclient) of + default -> + {noreply, #child_state{product = ProductId}}; + Result -> + Result + end; + _ -> + {stop, <<"not find product">>} end; + init(#dclient{channel = ChannelId}) -> case dgiot_product:get_channel(ChannelId) of not_find -> {stop, <<"not find product">>}; ProductId -> io:format("~s ~p ~p ~p ~n", [?FILE, ?LINE, ChannelId, ProductId]), - {ok, #child_state{product = ProductId}} + {noreply, #child_state{product = ProductId}} end. handle_info(connection_ready, #dclient{child = #child_state{product = ProductId} = ChildState} = Dclient) -> rand:seed(exs1024), Time = erlang:round(rand:uniform() * 1 + 1) * 1000, - io:format("~s ~p ~p ~n", [?FILE, ?LINE, ProductId]), +%% io:format("~s ~p ~p ~n", [?FILE, ?LINE, ProductId]), case do_cmd(ProductId, connection_ready, <<>>, Dclient) of default -> erlang:send_after(Time, self(), login), {noreply, ChildState}; - Device -> - {noreply, ChildState#child_state{device = Device}} + Result -> + Result end; handle_info(#{<<"cmd">> := Cmd, <<"data">> := Data, <<"productId">> := ProductId}, #dclient{child = #child_state{} = ChildState} = Dclient) -> case do_cmd(ProductId, Cmd, Data, Dclient) of default -> {noreply, ChildState}; - {Result, Device} -> - {Result, ChildState#child_state{device = Device}} + Result -> + Result end; handle_info(tcp_closed, #dclient{child = #child_state{product = ProductId} = ChildState} = Dclient) -> case do_cmd(ProductId, tcp_closed, <<>>, Dclient) of default -> {noreply, ChildState}; - {Result, Device} -> - {Result, ChildState#child_state{device = Device}} + Result -> + Result end; handle_info({tcp, Buff}, #dclient{child = #child_state{buff = Old, product = ProductId} = ChildState} = Dclient) -> @@ -75,10 +80,6 @@ handle_info({tcp, Buff}, #dclient{child = #child_state{buff = Old, product = Pro case do_cmd(ProductId, tcp, Data, Dclient) of default -> {noreply, ChildState}; - {noreply, Bin, Device} -> - {noreply, ChildState#child_state{buff = Bin, device = Device}}; - {stop, Reason, Device} -> - {stop, Reason, ChildState#child_state{device = Device}}; Result -> Result end; @@ -102,19 +103,19 @@ handle_info(_Info, Dclient) -> terminate(_Reason, _Dclient) -> ok. -do_cmd(ProductId, Cmd, Data, Dclient) -> - io:format("~s ~p ~p ~n", [?FILE, ?LINE, ProductId]), +do_cmd(ProductId, Cmd, Data, #dclient{child = ChildState} = Dclient) -> + io:format("~s ~p ~p Cmd ~p ~n", [?FILE, ?LINE, ProductId, Cmd]), case dgiot_hook:run_hook({tcp, ProductId}, [Cmd, Data, Dclient]) of {ok, Device} -> - {noreply, Device}; - {reply, ProductId, Payload, Device} -> - case dgiot_tcp_client:send(Payload) of - ok -> - ok; - {error, _Reason} -> - pass - end, - {noreply, Device}; + {noreply, ChildState#child_state{device = Device}}; + {noreply, Device} -> + {noreply, ChildState#child_state{device = Device}}; + {noreply, Bin, Device} -> + {noreply, ChildState#child_state{buff = Bin, device = Device}}; + {error, Reason, Device} -> + {stop, Reason, ChildState#child_state{device = Device}}; + {stop, Reason, Device} -> + {stop, Reason, ChildState#child_state{device = Device}}; _ -> default end. \ No newline at end of file diff --git a/apps/dgiot_bridge/src/dgiot_bridge.erl b/apps/dgiot_bridge/src/dgiot_bridge.erl index 93d0f50916..eca3d745ac 100644 --- a/apps/dgiot_bridge/src/dgiot_bridge.erl +++ b/apps/dgiot_bridge/src/dgiot_bridge.erl @@ -163,7 +163,7 @@ get_acl(ChannelId) -> send_log(ChannelId, ProductId, DevAddr, Fmt, Args) -> is_send_log(ChannelId, ProductId, DevAddr, fun() -> - Topic = <<"$dg/channel/", ChannelId/binary, "/", ProductId/binary, "/", DevAddr/binary>>, + Topic = <<"$dg/user/channel/", ChannelId/binary, "/", ProductId/binary, "/", DevAddr/binary>>, Payload = io_lib:format("[~s]~p " ++ Fmt, [node(), self() | Args]), dgiot_mqtt:publish(ChannelId, Topic, unicode:characters_to_binary(Payload)) end). @@ -171,7 +171,7 @@ send_log(ChannelId, ProductId, DevAddr, Fmt, Args) -> send_log(ChannelId, ProductId, Fmt, Args) -> is_send_log(ChannelId, ProductId, undefined, fun() -> - Topic = <<"$dg/channel/", ChannelId/binary, "/", ProductId/binary>>, + Topic = <<"$dg/user/channel/", ChannelId/binary, "/", ProductId/binary>>, Payload = io_lib:format("[~s]~p " ++ Fmt, [node(), self() | Args]), dgiot_mqtt:publish(ChannelId, Topic, unicode:characters_to_binary(Payload)) end). @@ -179,7 +179,7 @@ send_log(ChannelId, ProductId, Fmt, Args) -> send_log(ChannelId, Fmt, Args) -> is_send_log(ChannelId, undefined, undefined, fun() -> - Topic = <<"$dg/channel/", ChannelId/binary, "/channelid">>, + Topic = <<"$dg/user/channel/", ChannelId/binary, "/channelid">>, Payload = io_lib:format("[~s]~p " ++ Fmt, [node(), self() | Args]), dgiot_mqtt:publish(ChannelId, Topic, unicode:characters_to_binary(Payload)) end). diff --git a/apps/dgiot_device/src/dgiot_device_channel.erl b/apps/dgiot_device/src/dgiot_device_channel.erl index 0f65572dd4..c1604f210e 100644 --- a/apps/dgiot_device/src/dgiot_device_channel.erl +++ b/apps/dgiot_device/src/dgiot_device_channel.erl @@ -210,6 +210,11 @@ handle_message({sync_parse, _Pid, 'after', delete, _Token, <<"Product">>, Object dgiot_product:delete(ObjectId), {ok, State}; +handle_message({update_schemas_json}, State) -> +%% 更新表字段 + dgiot_parse:update_schemas_json(), + {ok, State}; + handle_message(Message, State) -> ?LOG(info, "channel ~p", [Message]), {ok, State}. diff --git a/apps/dgiot_device/src/dgiot_notification_channel.erl b/apps/dgiot_device/src/dgiot_notification_channel.erl index a6d8c50db7..1902e50830 100644 --- a/apps/dgiot_device/src/dgiot_notification_channel.erl +++ b/apps/dgiot_device/src/dgiot_notification_channel.erl @@ -72,7 +72,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/modbus.png">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/NOTIFICATION.png">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> diff --git a/apps/dgiot_device/src/dgiot_profile_channel.erl b/apps/dgiot_device/src/dgiot_profile_channel.erl index 9c16bbaadc..76c834ec09 100644 --- a/apps/dgiot_device/src/dgiot_profile_channel.erl +++ b/apps/dgiot_device/src/dgiot_profile_channel.erl @@ -90,7 +90,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/TaskIcon.png">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/profile.png">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> @@ -167,4 +167,4 @@ handle_message(_Message, State) -> {ok, State}. stop(_ChannelType, _ChannelId, _State) -> - ok. \ No newline at end of file + ok. diff --git a/apps/dgiot_dlink/README.md b/apps/dgiot_dlink/README.md index 562b950747..47dcdb5887 100644 --- a/apps/dgiot_dlink/README.md +++ b/apps/dgiot_dlink/README.md @@ -26,11 +26,11 @@ Dlink协议是针对物联网开发领域设计的一种数据交换规范,数 | 分类 | Topic | 发布者 | 订阅者 | | -------- | -------- | ------- | -------- | | 属性上报 |$dg/thing/{productId}/{deviceAddr}/properties/report|设备|平台| -| 告警上报 |$dg/alarm/{productId}/{deviceId}/properties/report|平台|用户| -| 组态消息 |$dg/user/{DeviceId}/konva/report|平台|用户| -| 卡片消息 | $dg/user/{DeviceId}/realtimecard/report|平台|用户| +| 告警上报 |$dg/user/alarm/{productId}/{deviceId}/properties/report|平台|用户| +| 组态消息 |$dg/user/konva/{DeviceId}/report|平台|用户| +| 卡片消息 | $dg/user/realtimecard/{DeviceId}/report|平台|用户| | 子属性上报 |$dg/thing/{productId}/{deviceAddr}/gateway/sub_devices/properties/report|设备|平台| -| 子属性上报 |$dg/user/{deviceId}/gateway/sub_devices/properties/report|平台|用户| +| 子属性上报 |$dg/user/gateway/{deviceId}/sub_devices/properties/report|平台|用户| | 属性设置 | $dg/thing/{deviceId}/properties/set/request_id={request_id}|用户|平台| | 属性设置 | $dg/device/{productId}/{deviceAddr}/properties/set/request_id={request_id}|平台|设备| | 属性设置 |$dg/thing/{productId}/{deviceAddr}/properties/set/request_id={request_id}|设备|平台| @@ -44,10 +44,10 @@ Dlink协议是针对物联网开发领域设计的一种数据交换规范,数 | 配置下发 |$dg/device/{productId}/{deviceAddr}/profile|平台|设备| | 事件上报 |$dg/thing/{productId}/{deviceAddr}/events|设备|平台| | 事件上报 |$dg/user/{deviceId}/events|平台|用户| -| 通道消息 |$dg/channel/{channelId}/{productId}/{deviceId}|平台|用户| -| 通道设置 |$dg/channel/{channelId}/commands/request_id={request_id}|用户|平台| -| 通道设置 |$dg/channel/{channelId}/commands/response/request_id={request_id}|平台|用户| -| 大屏消息 |$dg/dashboard/{dashboardId}/{productId}/{deviceId}|平台|用户| +| 通道消息 |$dg/user/channel/{channelId}/{productId}/{deviceId}|平台|用户| +| 通道设置 |$dg/user/channel/{channelId}/commands/request_id={request_id}|用户|平台| +| 通道设置 |$dg/user/channel/{channelId}/commands/response/request_id={request_id}|平台|用户| +| 大屏消息 |$dg/user/dashboard/{dashboardId}/{productId}/{deviceId}|平台|用户| | 云云对接 |$dg/bridge/{bridgetopic}|平台|平台| diff --git a/apps/dgiot_dlink/priv/json/topo.json b/apps/dgiot_dlink/priv/json/topo.json new file mode 100644 index 0000000000..3b7be313bb --- /dev/null +++ b/apps/dgiot_dlink/priv/json/topo.json @@ -0,0 +1,67 @@ +{ + "konva": { + "Stage": { + "attrs": { + "width": 1200, + "height": 700 + }, + "children": [ + { + "attrs": { + "id": "Layer_Thing" + }, + "children": [ + { + "attrs": { + "id": "bg", + "src": "/dgiot_file/knova/knova_bg.png?timestamp=1635422987361", + "type": "bg-image", + "width": "1200", + "height": "700" + }, + "className": "Image" + }, + { + "attrs": { + "x": 323, + "y": 84.95900920567532, + "id": "f9dddd0eeb_amisBMK8t", + "name": "amis", + "opacity": 0.75, + "draggable": true + }, + "children": [], + "className": "Label" + }, + { + "attrs": { + "x": 547, + "y": 264, + "id": "f9dddd0eeb_texthsl01", + "name": "thing", + "opacity": 0.75, + "draggable": true + }, + "children": [], + "className": "Label" + }, + { + "attrs": { + "x": 482, + "y": 157, + "id": "f9dddd0eeb_textZGRWA", + "name": "thing", + "opacity": 0.75, + "draggable": true + }, + "children": [], + "className": "Label" + } + ], + "className": "Layer" + } + ], + "className": "Stage" + } + } +} diff --git a/apps/dgiot_dlink/priv/swagger/swagger_dlink.json b/apps/dgiot_dlink/priv/swagger/swagger_dlink.json index 690355f44f..4e28e26fb4 100644 --- a/apps/dgiot_dlink/priv/swagger/swagger_dlink.json +++ b/apps/dgiot_dlink/priv/swagger/swagger_dlink.json @@ -90,6 +90,49 @@ "Dlink" ] } + }, + "/topic":{ + "post":{ + "summary": "订阅topic", + "description":"订阅topic", + "parameters": [ + { + "in": "body", + "name": "data", + "required": true, + "schema": { + "type": "object", + "properties": { + "topic": { + "required": true, + "type": "string", + "example": "$dg/user/router/" + } + } + } + } + ], + "responses":{ + "200":{ + "description":"Returns operation status" + }, + "400":{ + "description":"Bad Request" + }, + "401":{ + "description":"Unauthorized" + }, + "403":{ + "description":"Forbidden" + }, + "500":{ + "description":"Server Internal error" + } + }, + "tags":[ + "Dlink" + ] + } } } } diff --git a/apps/dgiot_dlink/src/handler/dgiot_dlink_handler.erl b/apps/dgiot_dlink/src/handler/dgiot_dlink_handler.erl index fa7dd83afc..51b9228df1 100644 --- a/apps/dgiot_dlink/src/handler/dgiot_dlink_handler.erl +++ b/apps/dgiot_dlink/src/handler/dgiot_dlink_handler.erl @@ -111,5 +111,22 @@ do_request(get_dlinkjson, #{<<"type">> := Type}, _Context, _Req) -> DlinkJson = dgiot_dlink:get_json(Type), {200, DlinkJson}; +do_request(post_topic, #{<<"topic">> := Topic} = _Args, #{<<"sessionToken">> := SessionToken} = _Context, _Req) -> + io:format("~s ~p Topic ~p SessionToken ~p ~n", [?FILE, ?LINE, Topic, SessionToken]), +%% 订阅topic + [_Head, _User, Key | _] = re:split(Topic, "/"), + TopicKey = <<"dg_user_", Key/binary>>, +%% io:format("~s ~p Topic ~p SessionToken ~p TopicKey ~p ~n", [?FILE, ?LINE, Topic, SessionToken,TopicKey]), + case dgiot_data:get({page_router_key, SessionToken, TopicKey}) of + not_find -> + pass; + OldTopic -> + dgiot_mqtt:unsubscribe(SessionToken, OldTopic) + end, + dgiot_mqtt:subscribe(SessionToken, Topic), + dgiot_data:insert({page_router_key, SessionToken, TopicKey}, Topic), + timer:sleep(10), + {200, #{<<"message">> => <<"订阅成功"/utf8>>, <<"Topic">> => Topic, <<"TopicKey">> => TopicKey}}; + do_request(_OperationId, _Args, _Context, _Req) -> {error, <<"Not Allowed.">>}. diff --git a/apps/dgiot_dlink/src/proctol/dgiot_mqtt_acl.erl b/apps/dgiot_dlink/src/proctol/dgiot_mqtt_acl.erl index 3d20268800..822edfd3e4 100644 --- a/apps/dgiot_dlink/src/proctol/dgiot_mqtt_acl.erl +++ b/apps/dgiot_dlink/src/proctol/dgiot_mqtt_acl.erl @@ -90,7 +90,7 @@ do_check(#{clientid := Token, username := UserId} = _ClientInfo, publish, <<"$dg end; %% "$dg/channel/{channelId}/{productId}/{deviceId}" -do_check(#{clientid := Token} = _ClientInfo, subscribe, <<"$dg/channel/", DeviceInfo/binary>> = _Topic) -> +do_check(#{clientid := Token} = _ClientInfo, subscribe, <<"$dg/user/channel/", DeviceInfo/binary>> = _Topic) -> %% io:format("~s ~p Topic: ~p~n", [?FILE, ?LINE, _Topic]), [ChannelId | _] = binary:split(DeviceInfo, <<"/">>, [global]), case dgiot_parse:get_object(<<"Channel">>, ChannelId, [{"X-Parse-Session-Token", Token}], [{from, rest}]) of @@ -101,7 +101,7 @@ do_check(#{clientid := Token} = _ClientInfo, subscribe, <<"$dg/channel/", Device end; %% $dg/dashboard/{dashboardId}/{productId}/{deviceId} -do_check(#{clientid := Token} = _ClientInfo, subscribe, <<"$dg/dashboard/", DashboardId:10/binary, "/", _Rest/binary>> = _Topic) -> +do_check(#{clientid := Token} = _ClientInfo, subscribe, <<"$dg/user/dashboard/", DashboardId:10/binary, "/", _Rest/binary>> = _Topic) -> %% io:format("~s ~p Topic: ~p~n", [?FILE, ?LINE, _Topic]), case dgiot_parse:get_object(<<"View">>, DashboardId, [{"X-Parse-Session-Token", Token}], [{from, rest}]) of {ok, _} -> diff --git a/apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl b/apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl index 4e0403d667..c77fe8f4a1 100644 --- a/apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl +++ b/apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl @@ -36,6 +36,7 @@ check(#{clientid := Token, username := UserId, password := Token}, AuthResult, # %% io:format("~s ~p UserId: ~p~n", [?FILE, ?LINE, UserId]), case dgiot_auth:get_session(Token) of #{<<"objectId">> := UserId} -> + dgiot_mqtt:subscribe(Token, <<"$dg/user/dashboard/#">>), {stop, AuthResult#{anonymous => false, auth_result => success}}; _ -> {stop, AuthResult#{anonymous => false, auth_result => password_error}} diff --git a/apps/dgiot_gb26875/src/dgiot_gb26875_channel.erl b/apps/dgiot_gb26875/src/dgiot_gb26875_channel.erl index ccb9987c37..e202fa57d7 100644 --- a/apps/dgiot_gb26875/src/dgiot_gb26875_channel.erl +++ b/apps/dgiot_gb26875/src/dgiot_gb26875_channel.erl @@ -37,7 +37,7 @@ zh => <<"GB26875"/utf8>> }, description => #{ - zh => <<"gb26875"/utf8>> + zh => <<"GB26875"/utf8>> } }). %% 注册通道参数 @@ -70,7 +70,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/modbus.png">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/GB26875.png">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> diff --git a/apps/dgiot_hjt212/src/dgiot_hjt212_channel.erl b/apps/dgiot_hjt212/src/dgiot_hjt212_channel.erl index ccfb1d5c8a..b8932a7f0a 100644 --- a/apps/dgiot_hjt212/src/dgiot_hjt212_channel.erl +++ b/apps/dgiot_hjt212/src/dgiot_hjt212_channel.erl @@ -37,7 +37,7 @@ zh => <<"HJT212"/utf8>> }, description => #{ - zh => <<"hjt212"/utf8>> + zh => <<"HJT212"/utf8>> } }). %% 注册通道参数 @@ -70,7 +70,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/modbus.png">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/HJT212.png">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> diff --git a/apps/dgiot_http/etc/dgiot_http.conf b/apps/dgiot_http/etc/dgiot_http.conf index 2b95284f03..aaa6a7f495 100644 --- a/apps/dgiot_http/etc/dgiot_http.conf +++ b/apps/dgiot_http/etc/dgiot_http.conf @@ -27,6 +27,6 @@ wechat.secret = 0c8b9e229ebe48c6878727a9d2e0263d jwt.algorithm = rs256 # 是否桥接 -isbridge = true +isbridge = false # 是否桥接 bridge_url = https://prod.iotn2n.com diff --git a/apps/dgiot_http/src/umeng/dgiot_umeng.erl b/apps/dgiot_http/src/umeng/dgiot_umeng.erl index 07abec36dd..7b26bb7136 100644 --- a/apps/dgiot_http/src/umeng/dgiot_umeng.erl +++ b/apps/dgiot_http/src/umeng/dgiot_umeng.erl @@ -559,7 +559,7 @@ create_maintenance(Info) -> <> = dgiot_utils:random(), Timestamp = dgiot_datetime:format(dgiot_datetime:to_localtime(dgiot_datetime:now_secs()), <<"YY-MM-DD HH:NN:SS">>), {Username, UserId, UserPhone, SessionToken} = - case dgiot_parse_auth:login(<<"dgiot_admin">>, <<"dgiot_admin">>) of + case dgiot_parse_auth:login(<<"dgiot_dev">>, <<"dgiot_dev">>) of {ok, #{<<"nick">> := Nick, <<"objectId">> := UserId1, <<"phone">> := UserPhone1, <<"sessionToken">> := SessionToken1}} -> {Nick, UserId1, UserPhone1, SessionToken1}; _ -> @@ -575,7 +575,7 @@ create_maintenance(Info) -> end, #{}, Acl1), NewAcl1; _ -> - #{<<"role:admin">> => #{<<"read">> => true, <<"write">> => true}} + #{<<"role:开发者"/utf8>> => #{<<"read">> => true, <<"write">> => true}} end, Body = #{ <<"number">> => maps:get(<<"id">>, Info, Number), diff --git a/apps/dgiot_meter/src/dgiot_meter.erl b/apps/dgiot_meter/src/dgiot_meter.erl index 00a008e5dc..6f1c1aaf54 100644 --- a/apps/dgiot_meter/src/dgiot_meter.erl +++ b/apps/dgiot_meter/src/dgiot_meter.erl @@ -94,8 +94,8 @@ create_meter(MeterAddr, ChannelId, DTUIP, DtuId, DtuAddr) -> dgiot_device:create_device(Requests), Topic = <<"$dg/device/", ProductId/binary, "/", MeterAddr/binary, "/profile">>, dgiot_mqtt:subscribe(Topic), - {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), - dgiot_task:save_pnque(DtuProductId, DtuAddr, ProductId, MeterAddr); +%% {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), + dgiot_task:save_pnque(ProductId, MeterAddr, ProductId, MeterAddr); _ -> pass end. @@ -125,8 +125,8 @@ create_meter4G(MeterAddr, MDa, ChannelId, DTUIP, DtuId, DtuAddr) -> dgiot_data:insert({metertda, DeviceId}, {dgiot_utils:to_binary(MDa), DtuAddr}), Topic = <<"$dg/device/", ProductId/binary, "/", MeterAddr/binary, "/properties/report">>, dgiot_mqtt:subscribe(Topic), - {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), - dgiot_task:save_pnque(DtuProductId, DtuAddr, ProductId, MeterAddr); +%% {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), + dgiot_task:save_pnque(ProductId, MeterAddr, ProductId, MeterAddr); _ -> pass end. @@ -356,7 +356,7 @@ search_meter(tcp, _Ref, TCPState, 0) -> <<"command">> => ?DLT645_MS_READ_DATA, <<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>) %%组合有功 }), - ?LOG(info, "Payload ~p", [dgiot_utils:binary_to_hex(Payload)]), +%% ?LOG(info, "Payload ~p", [dgiot_utils:binary_to_hex(Payload)]), dgiot_tcp_server:send(TCPState, Payload), read_meter; diff --git a/apps/dgiot_meter/src/dgiot_meter_tcp.erl b/apps/dgiot_meter/src/dgiot_meter_tcp.erl index 9c27ebe0fc..1f7373f56f 100644 --- a/apps/dgiot_meter/src/dgiot_meter_tcp.erl +++ b/apps/dgiot_meter/src/dgiot_meter_tcp.erl @@ -69,8 +69,8 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt dgiot_tcp_server:send(TCPState, Frame2), {Protocol1, MeterAddr}; _ -> -%% dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 login ~p", [?FILE, ?LINE, NewBuff]), - {?DLT645, Buff} + dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 dtu login ~p", [?FILE, ?LINE, NewBuff]), + {?DLT645, NewBuff} end, case Protocol of ?DLT376 -> @@ -138,16 +138,14 @@ handle_info(search_meter, #tcp{state = #state{ref = Ref, protocol = ?DLT645} = S %%ACK报文触发搜表 handle_info({tcp, Buff}, #tcp{socket = Socket, clientid = DtuId, state = #state{id = ChannelId, dtuaddr = DtuAddr, protocol = ?DLT645, ref = Ref, step = search_meter, search = Search} = State} = TCPState) -> %% io:format("~s ~p tcp Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]), - dgiot_bridge:send_log(ChannelId, "~s ~p from_dev=> ~p", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]), dgiot_metrics:inc(dgiot_meter, <<"search_meter">>, 1), {DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}), - ?LOG(info, "Buff ~p", [Buff]), - dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (搜表成功)", [dgiot_utils:binary_to_hex(Buff)]), {Rest, Frames} = dgiot_meter:parse_frame(<<"DLT645">>, Buff, []), lists:map(fun(X) -> case X of #{<<"addr">> := Addr} -> DTUIP = dgiot_utils:get_ip(Socket), + dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "~s ~p from dtu:~p (搜表成功) Buff:~p => MeterAddr:~p", [?FILE, ?LINE, DtuAddr, dgiot_utils:binary_to_hex(Buff), dgiot_utils:binary_to_hex(Addr)]), dgiot_meter:create_meter(dgiot_utils:binary_to_hex(Addr), ChannelId, DTUIP, DtuId, DtuAddr); Other -> ?LOG(info, "Other ~p", [Other]), @@ -186,7 +184,8 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, clientid = DtuId, state = #state{ end, {noreply, TCPState#tcp{buff = Rest}}; ?DLT645 -> - dgiot_bridge:send_log(ChannelId, "DLT645 from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]), + dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 dev recv ~p ", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]), +%% io:format("~s ~p Buff = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Buff)]), {Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []), dlt645_decoder:process_message(Frames, ChannelId), {noreply, TCPState#tcp{buff = Rest}}; @@ -233,7 +232,7 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol <<"dataSource">> => DataSource }, Payload1 = dgiot_meter:to_frame(ThingData), - dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 send=> ~p", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Payload1)]), + dgiot_bridge:send_log(ChannelId, "~s ~p DLT645 to dev => ~p", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Payload1)]), dgiot_tcp_server:send(TCPState, Payload1) end, {noreply, TCPState}; diff --git a/apps/dgiot_meter/src/proctol/dlt645_decoder.erl b/apps/dgiot_meter/src/proctol/dlt645_decoder.erl index 5647490d3d..1c79ad1861 100644 --- a/apps/dgiot_meter/src/proctol/dlt645_decoder.erl +++ b/apps/dgiot_meter/src/proctol/dlt645_decoder.erl @@ -302,7 +302,8 @@ process_message(Frames, ChannelId) -> % 查询上一次合闸时间返回 [#{<<"command">> := 16#91, <<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + {ProductId, _ACL, _Properties} -> + DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, Di = <<16#1E, 16#00, 16#01, 16#01>>, DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, @@ -313,7 +314,8 @@ process_message(Frames, ChannelId) -> % 查询上一次拉闸时间返回 [#{<<"command">> := 16#91, <<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + {ProductId, _ACL, _Properties} -> + DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, Di = <<16#1D, 16#00, 16#01, 16#01>>, DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)}, @@ -324,7 +326,8 @@ process_message(Frames, ChannelId) -> % 拉闸,合闸成功 [#{<<"command">> := 16#9C, <<"addr">> := Addr} | _] -> case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + {ProductId, _ACL, _Properties} -> + DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, Di = <<16#FE, 16#FE, 16#FE, 16#FE>>, DValue = #{dgiot_utils:to_hex(Di) => 0}, @@ -335,7 +338,8 @@ process_message(Frames, ChannelId) -> % 拉闸,合闸失败 [#{<<"command">> := 16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + {ProductId, _ACL, _Properties} -> + DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>, Di = <<16#FE, 16#FE, 16#FE, 16#FD>>, DValue = #{dgiot_utils:to_hex(Di) => dgiot_utils:to_hex(Value)}, @@ -346,10 +350,13 @@ process_message(Frames, ChannelId) -> % 抄表数据返回 [#{<<"command">> := 16#91, <<"di">> := _Di, <<"addr">> := Addr, <<"value">> := Value} | _] -> case dgiot_data:get({meter, ChannelId}) of - {ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr), + {ProductId, _ACL, _Properties} -> + DevAddr = dgiot_utils:binary_to_hex(Addr), Topic = <<"$dg/thing/", ProductId/binary, "/", DevAddr/binary, "/properties/report">>, % 发送给task进行数据存储 DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr), - dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value)); + Taskchannel = dgiot_product:get_taskchannel(ProductId), + dgiot_client:send(Taskchannel, DeviceId, Topic, Value), + dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "~s ~p to task ~p ~ts ", [?FILE, ?LINE, Topic, unicode:characters_to_list(jsx:encode(Value))]); _ -> pass end; _ -> pass diff --git a/apps/dgiot_modbus/src/dgiot_modbus_tcp.erl b/apps/dgiot_modbus/src/dgiot_modbus_tcp.erl index e551772d92..85bfb347a8 100644 --- a/apps/dgiot_modbus/src/dgiot_modbus_tcp.erl +++ b/apps/dgiot_modbus/src/dgiot_modbus_tcp.erl @@ -51,32 +51,25 @@ init(#tcp{state = #state{id = ChannelId}} = TCPState) -> handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, devaddr = <<>>, head = Head, len = Len, product = ProductId, dtutype = Dtutype} = State} = TCPState) -> DTUIP = dgiot_utils:get_ip(Socket), DtuAddr = dgiot_utils:binary_to_hex(Buff), - dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "DTU login:[~p] DtuAddr:[~p]", [Buff, DtuAddr]), List = dgiot_utils:to_list(DtuAddr), List1 = dgiot_utils:to_list(Buff), - #{<<"objectId">> := DeviceId} = - dgiot_parse_id:get_objectid(<<"Device">>, #{<<"product">> => ProductId, <<"devaddr">> => DtuAddr}), case re:run(DtuAddr, Head, [{capture, first, list}]) of {match, [Head]} when length(List) == Len -> - {DevId, Devaddr} = - case create_device(DeviceId, ProductId, DtuAddr, DTUIP, Dtutype) of - {<<>>, <<>>} -> - {<<>>, <<>>}; - {DevId1, Devaddr1} -> - {DevId1, Devaddr1} - end, - Topic = <<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/profile">>, + DeviceId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr), + create_device(DeviceId, ProductId, DtuAddr, DTUIP, Dtutype), + dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "~s ~p DTU login DtuAddr:~p", [?FILE, ?LINE, DtuAddr]), + Topic = <<"$dg/device/", ProductId/binary, "/", DtuAddr/binary, "/profile">>, dgiot_mqtt:subscribe(Topic), - DtuId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr), - {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{devaddr = Devaddr, deviceId = DevId}}}; + {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId, state = State#state{devaddr = DtuAddr, deviceId = DeviceId}}}; _Error -> case re:run(Buff, Head, [{capture, first, list}]) of {match, [Head]} when length(List1) == Len -> + DeviceId = dgiot_parse_id:get_deviceid(ProductId, Buff), create_device(DeviceId, ProductId, Buff, DTUIP, Dtutype), Topic = <<"$dg/device/", ProductId/binary, "/", Buff/binary, "/profile">>, + dgiot_bridge:send_log(ChannelId, ProductId, Buff, "~s ~p DTU login DtuAddr:~p", [?FILE, ?LINE, Buff]), dgiot_mqtt:subscribe(Topic), - DtuId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr), - {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{devaddr = Buff}}}; + {noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId, state = State#state{devaddr = Buff}}}; Error1 -> ?LOG(info, "Error1 ~p Buff ~p ", [Error1, dgiot_utils:to_list(Buff)]), {noreply, TCPState#tcp{buff = <<>>}} @@ -84,7 +77,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de end; handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr, env = #{product := ProductId, pn := Pn, di := Di}, product = DtuProductId} = State} = TCPState) -> - dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "[DtuAddr:~p] returns [~p] to Channel", [DtuAddr, dgiot_utils:binary_to_hex(Buff)]), + dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "~s ~p DTU ~p recv ~p", [?FILE, ?LINE, DtuAddr, dgiot_utils:binary_to_hex(Buff)]), <> = dgiot_utils:hex_to_binary(modbus_rtu:is16(Di)), <> = dgiot_utils:hex_to_binary(modbus_rtu:is16(Pn)), case modbus_rtu:parse_frame(Buff, #{}, #{ @@ -95,7 +88,7 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, devaddr = DtuAddr, <<"address">> => H * 256 + L}) of {_, Things} -> NewTopic = <<"$dg/thing/", DtuProductId/binary, "/", DtuAddr/binary, "/properties/report">>, - dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "Channel sends [~p] to [task:~p]", [jsx:encode(Things), NewTopic]), + dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "~s ~p to task ~p ~ts ", [?FILE, ?LINE, NewTopic, unicode:characters_to_list(jsx:encode(Things))]), DeviceId = dgiot_parse_id:get_deviceid(ProductId, DtuAddr), Taskchannel = dgiot_product:get_taskchannel(ProductId), dgiot_client:send(Taskchannel, DeviceId, NewTopic, Things); @@ -123,7 +116,7 @@ handle_info({deliver, _, Msg}, #tcp{state = #state{id = ChannelId} = State} = TC #{<<"slaveid">> := SlaveId, <<"address">> := Address} = DataSource -> Data = modbus_rtu:to_frame(DataSource), %% io:format("~s ~p Data = ~p.~n", [?FILE, ?LINE, dgiot_utils:to_hex(Data)]), - dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "Channel sends [~p] to [DtuAddr:~p]", [dgiot_utils:binary_to_hex(Data), DevAddr]), + dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, "Channel sends ~p to DTU ~p", [dgiot_utils:binary_to_hex(Data), DevAddr]), dgiot_tcp_server:send(TCPState, Data), {noreply, TCPState#tcp{state = State#state{env = #{product => ProductId, pn => SlaveId, di => Address}}}}; _ -> diff --git a/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl b/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl index 11c3c2363e..974dcbc104 100644 --- a/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl +++ b/apps/dgiot_modbus/src/dgiot_modbusc_channel.erl @@ -80,7 +80,7 @@ order => 102, type => string, required => false, - default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/modbus.png">>, + default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/modbusc.png">>, title => #{ en => <<"channel ICO">>, zh => <<"通道ICO"/utf8>> diff --git a/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl b/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl index 8a1e613dd4..0c8cfb0346 100644 --- a/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl +++ b/apps/dgiot_modbus/src/dgiot_modbusc_tcp.erl @@ -26,23 +26,23 @@ %% tcp client callback -init(#dclient{child = ChildState}) when is_map(ChildState) -> - {ok, ChildState}; +init(#dclient{child = ChildState} = Dclient) when is_map(ChildState) -> + {ok, Dclient}; init(_) -> {ok, #{}}. -handle_info(connection_ready, #dclient{child = ChildState}) -> +handle_info(connection_ready, #dclient{child = ChildState} = Dclient) -> io:format("~s ~p ChildState = ~p.~n", [?FILE, ?LINE, ChildState]), rand:seed(exs1024), Time = erlang:round(rand:uniform() * 1 + 1) * 1000, erlang:send_after(Time, self(), read), - {noreply, ChildState}; + {noreply, Dclient#dclient{child = ChildState}}; -handle_info(tcp_closed, #dclient{child = ChildState}) -> - {noreply, ChildState}; +handle_info(tcp_closed, #dclient{child = ChildState} = Dclient) -> + {noreply, Dclient#dclient{child = ChildState}}; -handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{minaddr := MinAddr, maxaddr := Maxaddr} = ChildState}) -> +handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{minaddr := MinAddr, maxaddr := Maxaddr} = ChildState} = Dclient) -> %% _Address1 = modbus_tcp:get_addr(ChannelId, MinAddr, Maxaddr, 124), Address = maps:get(di, ChildState, MinAddr), Step = maps:get(step, ChildState, 100), @@ -63,9 +63,10 @@ handle_info(read, #dclient{channel = ChannelId, client = ClientId, child = #{min Data = modbus_tcp:to_frame(DataSource), dgiot_tcp_client:send(ChannelId, ClientId, Data), %% io:format("~s ~p Send = ~p.~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(Data)]), - {noreply, ChildState#{minaddr => MinAddr, maxaddr => Maxaddr, di => Address, data => <<>>, step => Step}}; + {noreply, Dclient#dclient{child = ChildState#{minaddr => MinAddr, maxaddr => Maxaddr, di => Address, data => <<>>, step => Step}}}; -handle_info({tcp, Buff}, #dclient{channel = ChannelId, child = #{minaddr := MinAddr, maxaddr := Maxaddr, di := Address, filename := FileName, data := OldData, step := Step} = ChildState}) -> +handle_info({tcp, Buff}, #dclient{channel = ChannelId, + child = #{minaddr := MinAddr, maxaddr := Maxaddr, di := Address, filename := FileName, data := OldData, step := Step} = ChildState} = Dclient) -> dgiot_bridge:send_log(ChannelId, "returns [~p] to Channel", [dgiot_utils:binary_to_hex(Buff)]), %% io:format("~s ~p Address = ~p.~n", [?FILE, ?LINE, Address]), %% io:format("~s ~p Buff = ~p.~n", [?FILE, ?LINE, Buff]), @@ -76,18 +77,18 @@ handle_info({tcp, Buff}, #dclient{channel = ChannelId, child = #{minaddr := MinA %% io:format("~s ~p EndData = ~p.~n", [?FILE, ?LINE, EndData]), modbus_tcp:parse_frame(FileName, EndData), erlang:send_after(10 * 1000, self(), read), - {noreply, ChildState#{di => MinAddr, data => <<>>}}; + {noreply, Dclient#dclient{child = ChildState#{di => MinAddr, data => <<>>}}}; _ -> erlang:send_after(3 * 1000, self(), read), - {noreply, ChildState#{di => Address + Step, data => <>}} + {noreply, Dclient#dclient{child = ChildState#{di => Address + Step, data => <>}}} end; handle_info(_Info, #dclient{child = ChildState} = Dclient) -> io:format("~s ~p _Info = ~p.~n", [?FILE, ?LINE, _Info]), io:format("~s ~p Dclient = ~p.~n", [?FILE, ?LINE, Dclient]), io:format("~s ~p ChildState = ~p.~n", [?FILE, ?LINE, ChildState]), - {noreply, ChildState}. + {noreply, Dclient}. -terminate(_Reason, _TCPState) -> +terminate(_Reason, _Dclient) -> ok. diff --git a/apps/dgiot_parse/priv/json/schemas.json b/apps/dgiot_parse/priv/json/schemas.json new file mode 100644 index 0000000000..656b48f42a --- /dev/null +++ b/apps/dgiot_parse/priv/json/schemas.json @@ -0,0 +1,1778 @@ +[ + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "Permission", + "fields": { + "ACL": { + "type": "ACL" + }, + "alias": { + "type": "String" + }, + "createdAt": { + "type": "Date" + }, + "description": { + "type": "String" + }, + "name": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "parent": { + "targetClass": "Permission", + "type": "Pointer" + }, + "tags": { + "type": "Array" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "App", + "fields": { + "ACL": { + "type": "ACL" + }, + "config": { + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "desc": { + "type": "String" + }, + "enable": { + "type": "Boolean" + }, + "name": { + "required": true, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "secret": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + }, + "user": { + "targetClass": "_User", + "type": "Pointer" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "_Session", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "createdWith": { + "type": "Object" + }, + "expiresAt": { + "type": "Date" + }, + "installationId": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "restricted": { + "type": "Boolean" + }, + "sessionToken": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + }, + "user": { + "targetClass": "_User", + "type": "Pointer" + }, + "users": { + "targetClass": "_User", + "type": "Relation" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Timescale", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "device": { + "targetClass": "Device", + "type": "Pointer" + }, + "objectId": { + "type": "String" + }, + "product": { + "required": false, + "targetClass": "Product", + "type": "Pointer" + }, + "tags": { + "required": false, + "type": "Object" + }, + "updatedAt": { + "type": "Date" + }, + "values": { + "required": true, + "type": "Object" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "_Installation", + "fields": { + "ACL": { + "type": "ACL" + }, + "GCMSenderId": { + "type": "String" + }, + "appIdentifier": { + "type": "String" + }, + "appName": { + "type": "String" + }, + "appVersion": { + "type": "String" + }, + "badge": { + "type": "Number" + }, + "channels": { + "type": "Array" + }, + "createdAt": { + "type": "Date" + }, + "deviceToken": { + "type": "String" + }, + "deviceType": { + "type": "String" + }, + "installationId": { + "type": "String" + }, + "localeIdentifier": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "parseVersion": { + "type": "String" + }, + "pushType": { + "type": "String" + }, + "timeZone": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Instruct", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "device": { + "targetClass": "Device", + "type": "Pointer" + }, + "di": { + "type": "String" + }, + "duration": { + "type": "Number" + }, + "enable": { + "type": "Boolean" + }, + "interval": { + "type": "Number" + }, + "name": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "op": { + "type": "String" + }, + "order": { + "type": "String" + }, + "other": { + "type": "Object" + }, + "pn": { + "type": "String" + }, + "product": { + "targetClass": "Product", + "type": "Pointer" + }, + "rotation": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "Menu", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "icon": { + "type": "String" + }, + "meta": { + "defaultValue": {}, + "required": false, + "type": "Object" + }, + "name": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "orderBy": { + "type": "Number" + }, + "parent": { + "targetClass": "Menu", + "type": "Pointer" + }, + "updatedAt": { + "type": "Date" + }, + "url": { + "type": "String" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Evidence", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "md5": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "objetcId": { + "type": "String" + }, + "original": { + "type": "Object" + }, + "reportId": { + "type": "String" + }, + "scene": { + "type": "String" + }, + "signData": { + "type": "String" + }, + "timestamp": { + "type": "Number" + }, + "ukey": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Maintenance", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "device": { + "required": false, + "targetClass": "Device", + "type": "Pointer" + }, + "info": { + "defaultValue": { + "description": "", + "photo": [], + "step1": {}, + "step2": {}, + "step3": {}, + "step4": {}, + "timeline": [] + }, + "required": false, + "type": "Object" + }, + "name": { + "required": false, + "type": "String" + }, + "number": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "product": { + "required": false, + "targetClass": "Product", + "type": "Pointer" + }, + "status": { + "required": false, + "type": "Number" + }, + "type": { + "required": false, + "type": "String" + }, + "updatedAt": { + "type": "Date" + }, + "user": { + "required": false, + "targetClass": "_User", + "type": "Pointer" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Devicelog", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "createtime": { + "defaultValue": "", + "required": false, + "type": "String" + }, + "data": { + "required": false, + "type": "Object" + }, + "devaddr": { + "required": false, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "product": { + "required": false, + "targetClass": "Product", + "type": "Pointer" + }, + "status": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "LogLevel", + "fields": { + "ACL": { + "type": "ACL" + }, + "Subscribed": { + "required": false, + "type": "Boolean" + }, + "createdAt": { + "type": "Date" + }, + "deviceid": { + "type": "String" + }, + "level": { + "required": false, + "type": "String" + }, + "name": { + "required": false, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "order": { + "required": false, + "type": "Number" + }, + "parent": { + "required": false, + "targetClass": "LogLevel", + "type": "Pointer" + }, + "path": { + "type": "String" + }, + "topic": { + "required": false, + "type": "String" + }, + "type": { + "required": false, + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Category", + "fields": { + "ACL": { + "type": "ACL" + }, + "createdAt": { + "type": "Date" + }, + "data": { + "required": false, + "type": "Object" + }, + "level": { + "required": false, + "type": "Number" + }, + "name": { + "required": false, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "order": { + "required": false, + "type": "Number" + }, + "parent": { + "required": false, + "targetClass": "Category", + "type": "Pointer" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Log", + "fields": { + "ACL": { + "type": "ACL" + }, + "clientid": { + "type": "String" + }, + "createdAt": { + "type": "Date" + }, + "devaddr": { + "required": false, + "type": "String" + }, + "deviceid": { + "type": "String" + }, + "domain": { + "type": "Array" + }, + "level": { + "type": "String" + }, + "line": { + "type": "Number" + }, + "mfa": { + "type": "String" + }, + "msg": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "peername": { + "type": "String" + }, + "pid": { + "type": "String" + }, + "productid": { + "required": false, + "type": "String" + }, + "time": { + "type": "Number" + }, + "topic": { + "type": "String" + }, + "type": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "_User", + "fields": { + "ACL": { + "type": "ACL" + }, + "authData": { + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "department": { + "type": "String" + }, + "email": { + "type": "String" + }, + "emailVerified": { + "type": "Boolean" + }, + "nick": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "password": { + "type": "String" + }, + "phone": { + "type": "String" + }, + "role": { + "required": false, + "targetClass": "_Role", + "type": "Relation" + }, + "roles": { + "required": false, + "targetClass": "_Role", + "type": "Relation" + }, + "tag": { + "type": "Object" + }, + "updatedAt": { + "type": "Date" + }, + "username": { + "type": "String" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "Channel", + "fields": { + "ACL": { + "type": "ACL" + }, + "cType": { + "required": true, + "type": "String" + }, + "config": { + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "data": { + "required": false, + "type": "Object" + }, + "desc": { + "type": "String" + }, + "isEnable": { + "type": "Boolean" + }, + "name": { + "required": true, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "product": { + "targetClass": "Product", + "type": "Relation" + }, + "status": { + "type": "String" + }, + "type": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "Dict", + "fields": { + "ACL": { + "type": "ACL" + }, + "class": { + "required": false, + "type": "String" + }, + "createdAt": { + "type": "Date" + }, + "data": { + "type": "Object" + }, + "dict": { + "required": false, + "targetClass": "Dict", + "type": "Pointer" + }, + "key": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "title": { + "required": false, + "type": "String" + }, + "type": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "Notification", + "fields": { + "ACL": { + "type": "ACL" + }, + "content": { + "required": false, + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "deletedBy": { + "targetClass": "_User", + "type": "Relation" + }, + "objectId": { + "type": "String" + }, + "process": { + "required": false, + "type": "String" + }, + "public": { + "type": "Boolean" + }, + "readBy": { + "targetClass": "_User", + "type": "Relation" + }, + "sender": { + "targetClass": "_User", + "type": "Pointer" + }, + "status": { + "type": "Number" + }, + "type": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + }, + "user": { + "targetClass": "_User", + "type": "Pointer" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "View", + "fields": { + "ACL": { + "type": "ACL" + }, + "class": { + "required": false, + "type": "String" + }, + "createdAt": { + "type": "Date" + }, + "data": { + "required": false, + "type": "Object" + }, + "key": { + "required": false, + "type": "String" + }, + "language": { + "required": false, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "title": { + "required": false, + "type": "String" + }, + "type": { + "required": false, + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "Device", + "fields": { + "ACL": { + "type": "ACL" + }, + "address": { + "required": false, + "type": "String" + }, + "basedata": { + "type": "Object" + }, + "content": { + "defaultValue": { + "alertstatus": false, + "battery_voltage": 12, + "charge_current": 0, + "core_temperature": 38, + "day_electricity": 0.1, + "dump_energy": 83, + "i_out": 0, + "outside_temperature": 27, + "runtime": 1, + "system_state": 0, + "total_power": 2.1, + "v_out": 12, + "v_solarpanel": 0.3 + }, + "required": false, + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "detail": { + "type": "Object" + }, + "devaddr": { + "required": true, + "type": "String" + }, + "deviceSecret": { + "required": false, + "type": "String" + }, + "ip": { + "type": "String" + }, + "isEnable": { + "type": "Boolean" + }, + "lastOnlineTime": { + "type": "Number" + }, + "location": { + "required": false, + "type": "GeoPoint" + }, + "name": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "parentId": { + "targetClass": "Device", + "type": "Pointer" + }, + "product": { + "required": true, + "targetClass": "Product", + "type": "Pointer" + }, + "profile": { + "required": false, + "type": "Object" + }, + "route": { + "type": "Object" + }, + "status": { + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true, + "role:开发者": true + }, + "count": { + "*": true, + "role:开发者": true + }, + "create": { + "*": true, + "role:开发者": true + }, + "delete": { + "*": true, + "role:开发者": true + }, + "find": { + "*": true, + "role:开发者": true + }, + "get": { + "*": true, + "role:开发者": true + }, + "protectedFields": {}, + "update": { + "*": true, + "role:开发者": true + } + }, + "className": "Article", + "fields": { + "ACL": { + "type": "ACL" + }, + "Icon": { + "required": false, + "type": "String" + }, + "category": { + "required": false, + "type": "String" + }, + "createdAt": { + "type": "Date" + }, + "data": { + "required": false, + "type": "String" + }, + "desc": { + "required": false, + "type": "String" + }, + "ico": { + "required": false, + "type": "String" + }, + "name": { + "required": false, + "type": "String" + }, + "objectId": { + "type": "String" + }, + "order": { + "required": false, + "type": "Number" + }, + "parent": { + "required": false, + "targetClass": "Article", + "type": "Pointer" + }, + "projectId": { + "required": false, + "type": "String" + }, + "timestamp": { + "required": false, + "type": "Number" + }, + "type": { + "required": false, + "type": "String" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "_Role", + "fields": { + "ACL": { + "type": "ACL" + }, + "alias": { + "type": "String" + }, + "createdAt": { + "type": "Date" + }, + "depname": { + "type": "String" + }, + "desc": { + "type": "String" + }, + "leafnode": { + "type": "Boolean" + }, + "level": { + "type": "Number" + }, + "menus": { + "targetClass": "Menu", + "type": "Relation" + }, + "name": { + "type": "String" + }, + "objectId": { + "type": "String" + }, + "order": { + "type": "Number" + }, + "org_type": { + "type": "String" + }, + "parent": { + "targetClass": "_Role", + "type": "Pointer" + }, + "roles": { + "targetClass": "_Role", + "type": "Relation" + }, + "rules": { + "targetClass": "Permission", + "type": "Relation" + }, + "tag": { + "type": "Object" + }, + "updatedAt": { + "type": "Date" + }, + "users": { + "targetClass": "_User", + "type": "Relation" + }, + "views": { + "required": false, + "targetClass": "View", + "type": "Relation" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "*": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": { + "*": [] + }, + "update": { + "*": true + } + }, + "className": "ProductTemplet", + "fields": { + "ACL": { + "type": "ACL" + }, + "category": { + "required": false, + "targetClass": "Category", + "type": "Pointer" + }, + "config": { + "required": false, + "type": "Object" + }, + "content": { + "required": false, + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "decoder": { + "required": false, + "type": "Object" + }, + "desc": { + "required": false, + "type": "String" + }, + "icon": { + "required": false, + "type": "String" + }, + "id": { + "type": "String" + }, + "name": { + "required": false, + "type": "String" + }, + "netType": { + "type": "String" + }, + "nodeType": { + "type": "Number" + }, + "objectId": { + "type": "String" + }, + "profile": { + "required": false, + "type": "Object" + }, + "thing": { + "required": false, + "type": "Object" + }, + "updatedAt": { + "type": "Date" + } + } + }, + { + "classLevelPermissions": { + "addField": { + "role:root": true + }, + "count": { + "*": true + }, + "create": { + "*": true + }, + "delete": { + "*": true + }, + "find": { + "*": true + }, + "get": { + "*": true + }, + "protectedFields": {}, + "update": { + "*": true + } + }, + "className": "Product", + "fields": { + "ACL": { + "type": "ACL" + }, + "category": { + "required": false, + "targetClass": "Category", + "type": "Pointer" + }, + "channel": { + "required": false, + "type": "Object" + }, + "children": { + "targetClass": "Product", + "type": "Relation" + }, + "config": { + "type": "Object" + }, + "content": { + "required": false, + "type": "Object" + }, + "createdAt": { + "type": "Date" + }, + "decoder": { + "type": "Object" + }, + "desc": { + "type": "String" + }, + "devType": { + "required": true, + "type": "String" + }, + "dynamicReg": { + "type": "Boolean" + }, + "icon": { + "type": "String" + }, + "name": { + "required": true, + "type": "String" + }, + "netType": { + "type": "String" + }, + "nodeType": { + "type": "Number" + }, + "objectId": { + "type": "String" + }, + "productIdentifier": { + "type": "String" + }, + "productSecret": { + "type": "String" + }, + "producttemplet": { + "required": false, + "targetClass": "ProductTemplet", + "type": "Pointer" + }, + "profile": { + "required": false, + "type": "Object" + }, + "serverCallbackModel": { + "type": "Array" + }, + "thing": { + "type": "Object" + }, + "topics": { + "defaultValue": {}, + "required": false, + "type": "Object" + }, + "updatedAt": { + "type": "Date" + } + } + } +] diff --git a/apps/dgiot_parse/priv/swagger/swagger_parse.json b/apps/dgiot_parse/priv/swagger/swagger_parse.json index dc340a510a..986e5aff8d 100644 --- a/apps/dgiot_parse/priv/swagger/swagger_parse.json +++ b/apps/dgiot_parse/priv/swagger/swagger_parse.json @@ -304,14 +304,9 @@ }, "/health": { "get": { - "security": [], "summary": "health", "description": "检测是启动", - "consumes": [ - "*" - ], - "parameters": [ - ], + "parameters": [], "responses": { "200": { "description": "Returns operation status" @@ -334,22 +329,25 @@ ] } }, - "/update": { + "/upgrade": { "get": { - "description": "Parse表升级", + "summary": "数据库升级", + "description": "数据库升级", "parameters": [], "responses": { "200": { - "description": "Returns instance data" + "description": "Returns operation status" }, - "404": { - "description": "object not found", - "schema": { - "$ref": "#/definitions/Error" - } + "400": { + "description": "Bad Request" + }, + "403": { + "description": "Forbidden" + }, + "500": { + "description": "Server Internal error" } }, - "summary": "Parse表升级", "tags": [ "Basic" ] diff --git a/apps/dgiot_parse/src/dgiot_parse.erl b/apps/dgiot_parse/src/dgiot_parse.erl index 8218807615..a787687e39 100644 --- a/apps/dgiot_parse/src/dgiot_parse.erl +++ b/apps/dgiot_parse/src/dgiot_parse.erl @@ -19,6 +19,7 @@ -include("dgiot_parse.hrl"). -include_lib("dgiot/include/logger.hrl"). -define(DEFField, re:split(application:get_env(?MODULE, delete_field, ""), ",")). +-include_lib("dgiot_bridge/include/dgiot_bridge.hrl"). %% API -export([ @@ -43,6 +44,7 @@ create_schemas/1, create_schemas/2, update_schemas/1, + update_schemas/2, del_schemas/1, del_schemas/2, set_class_level/2, @@ -70,20 +72,48 @@ ]). -export([ - request_rest/6 + request_rest/6, + create_schemas_json/0, + get_schemas_json/0, + update_schemas_json/0 ]). -update() -> -%% 物模型更新 +create_schemas_json() -> + {file, Here} = code:is_loaded(?MODULE), + SchemasFile = dgiot_httpc:url_join([filename:dirname(filename:dirname(Here)), "/priv/json/schemas.json"]), + case dgiot_parse:get_schemas() of + {ok, #{<<"results">> := Schemas}} -> + file:write_file(SchemasFile, jsx:encode(Schemas)); + _ -> + pass + end. + +get_schemas_json() -> + dgiot_utils:get_JsonFile(?MODULE, <<"schemas.json">>). + +%% dgiot_parse:update_schemas(Fields). +update_schemas_json() -> + io:format("~s ~p ~p~n", [?FILE, ?LINE, <<"update_schemas_json start">>]), + %% API更新 + os:cmd("curl 127.0.0.1:5080/install/rule"), + %% 物模型更新 dgiot_product:update_properties(), -%% 表及其字段更新 -%% topics更新 - dgiot_product:update_topics(), -%% 产品字段新增 -%% 适配https://gitee.com/dgiiot/dgiot/issues/I583AD - dgiot_product:update_product_filed(#{<<"profile">> => #{}, <<"content">> => #{}}), -%% 菜单更新 -%% API更新 + %% 表字段更新 + Schemas = dgiot_parse:get_schemas_json(), + dgiot_parse:del_filed_schemas(<<"Product">>, [<<"topics">>]), + timer:sleep(1000), + lists:foldl(fun(#{<<"className">> := ClassName, <<"fields">> := Fields}, _Acc) -> + maps:fold(fun(Key, Value, _Acc1) -> +%% io:format("Fields = #{~p => ~p, ~n ~p => #{~p => ~p}}.~n", [<<"className">>, ClassName, <<"fields">>, Key, Value]), + dgiot_parse:update_schemas(#{<<"className">> => ClassName, <<"fields">> => #{Key => Value}}), + timer:sleep(100) + end, #{}, Fields) + end, #{}, Schemas). + +update() -> + %% 发通知异步调用更新 + ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BACKEND_CHL), <<"DEVICE">>, <<"Device缓存通道"/utf8>>), + dgiot_channelx:do_message(ChannelId, {update_schemas_json}), ok. health() -> diff --git a/apps/dgiot_parse/src/handler/dgiot_parse_handler.erl b/apps/dgiot_parse/src/handler/dgiot_parse_handler.erl index bdb06f3d22..8f5a7c96b7 100644 --- a/apps/dgiot_parse/src/handler/dgiot_parse_handler.erl +++ b/apps/dgiot_parse/src/handler/dgiot_parse_handler.erl @@ -54,7 +54,7 @@ handle(OperationID, Args, Context, Req) -> ?LOG(debug, "do request: ~p, ~p, ~p~n", [OperationID, Args, Reason]), Err = case is_binary(Reason) of true -> Reason; - false -> dgiot_utils:format("~p", [Reason]) + false -> list_to_binary(io_lib:format("~p", [Reason])) end, {500, Headers, #{<<"error">> => Err}}; ok -> @@ -64,10 +64,14 @@ handle(OperationID, Args, Context, Req) -> ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), {200, Headers, Res, Req}; {Status, Res} -> + ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), {Status, Headers, Res, Req}; {Status, NewHeaders, Res} -> ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), - {Status, maps:merge(Headers, NewHeaders), Res, Req} + {Status, maps:merge(Headers, NewHeaders), Res, Req}; + {Status, NewHeaders, Res, NewReq} -> + ?LOG(debug, "do request: ~p, ~p ->~p~n", [OperationID, Args, Res]), + {Status, maps:merge(Headers, NewHeaders), Res, NewReq} end. @@ -103,14 +107,15 @@ do_request(post_batch, Body, #{base_path := BasePath} = _Context, Req) -> {error, Reason} -> {400, Reason} end; -do_request(get_health, _Body, _Context, Req) -> - {200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req}; +do_request(get_health, _Body, _Context, _Req) -> + {ok, #{<<"msg">> => <<"success">>}}; -do_request(get_update, _Body, _Context, Req) -> +%% 数据库升级 +do_request(get_upgrade, _Body, _Context, _Req) -> dgiot_parse:update(), - {200, #{<<"content-type">> => <<"text/plain">>}, <<"ok">>, Req}; + {ok, #{<<"msg">> => <<"success">>}}; %% 服务器不支持的API接口 do_request(_OperationId, _Args, _Context, _Req) -> - ?LOG(debug, "_Args ~p", [_Args]), + io:format("~s ~p _OperationId = ~p.~n", [?FILE, ?LINE, _OperationId]), {error, <<"Not Allowed.">>}. diff --git a/apps/dgiot_task/src/dgiot_task.erl b/apps/dgiot_task/src/dgiot_task.erl index 1c8f5b9673..b0cee8b9ca 100644 --- a/apps/dgiot_task/src/dgiot_task.erl +++ b/apps/dgiot_task/src/dgiot_task.erl @@ -88,8 +88,9 @@ get_collection(ProductId, [], Payload, Ack) -> case X of #{<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm, <<"dataType">> := DataType, + <<"dataSource">> := DataSource, <<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> -> - dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, Payload, Acc2); + dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, DataSource, Payload, Acc2); _ -> Acc2 end @@ -112,8 +113,9 @@ get_collection(ProductId, Dis, Payload, Ack) -> case X of #{<<"dataForm">> := #{<<"strategy">> := Strategy} = DataForm, <<"dataType">> := DataType, + <<"dataSource">> := DataSource, <<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> -> - dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, Payload, Acc2); + dgiot_task_data:get_userdata(ProductId, Identifier, DataForm, DataType, DataSource, Payload, Acc2); _ -> Acc2 end @@ -216,7 +218,14 @@ string2value(Str, Type, Specs) -> round(Value); Type2 when Type2 == <<"FLOAT">>; Type2 == <<"DOUBLE">> -> Precision = maps:get(<<"precision">>, Specs, 3), - dgiot_utils:to_float(Value, Precision); + case binary:split(dgiot_utils:to_binary(string:to_lower(dgiot_utils:to_list(Value))), <<$e>>, [global, trim]) of + [Value1, Pow] -> + Valuefloat = dgiot_utils:to_float(Value1), + PowInt = dgiot_utils:to_int(Pow), + dgiot_utils:to_float(Valuefloat * math:pow(10, PowInt), Precision); + [Value2] -> + dgiot_utils:to_float(Value2, Precision) + end; _ -> Value end @@ -289,6 +298,8 @@ save_td(ProductId, DevAddr, Ack, AppData) -> ChannelId = dgiot_parse_id:get_channelid(dgiot_utils:to_binary(?BRIDGE_CHL), <<"DGIOTTOPO">>, <<"TOPO组态通道"/utf8>>), dgiot_channelx:do_message(ChannelId, {topo_thing, ProductId, DeviceId, AllData}), dgiot_tdengine_adapter:save(ProductId, DevAddr, AllData), + Channel = dgiot_product:get_taskchannel(ProductId), + dgiot_bridge:send_log(Channel, ProductId, DevAddr, "~s ~p save td => ProductId ~p DevAddr ~p ~ts ", [?FILE, ?LINE, ProductId, DevAddr, unicode:characters_to_list(jsx:encode(AllData))]), dgiot_metrics:inc(dgiot_task, <<"task_save">>, 1), NotificationTopic = <<"$dg/alarm/", ProductId/binary, "/", DeviceId/binary, "/properties/report">>, dgiot_mqtt:publish(DeviceId, NotificationTopic, jsx:encode(AllData)), diff --git a/apps/dgiot_task/src/dgiot_task_channel.erl b/apps/dgiot_task/src/dgiot_task_channel.erl index 95b7252700..2884534491 100644 --- a/apps/dgiot_task/src/dgiot_task_channel.erl +++ b/apps/dgiot_task/src/dgiot_task_channel.erl @@ -117,7 +117,8 @@ start(ChannelId, ChannelArgs) -> %% 通道初始化 init(?TYPE, ChannelId, Args) -> - #{<<"freq">> := Freq, <<"start_time">> := Start_time, <<"end_time">> := End_time, <<"rand">> := Rand} = Args, + #{<<"freq">> := Freq, <<"start_time">> := Start_time, <<"end_time">> := End_time} = Args, + Rand = maps:get(<<"rand">>, Args, true), dgiot_client:add_clock(ChannelId, Start_time, End_time), State = #state{id = ChannelId}, {ok, State, dgiot_client:register(ChannelId, task_sup, #{ diff --git a/apps/dgiot_task/src/dgiot_task_data.erl b/apps/dgiot_task/src/dgiot_task_data.erl index 7d3752f1cd..84b7c7a463 100644 --- a/apps/dgiot_task/src/dgiot_task_data.erl +++ b/apps/dgiot_task/src/dgiot_task_data.erl @@ -18,9 +18,9 @@ -include("dgiot_task.hrl"). -include_lib("dgiot/include/logger.hrl"). -include_lib("dgiot_bridge/include/dgiot_bridge.hrl"). --export([get_userdata/6, get_datasource/2, get_ack/4]). +-export([get_userdata/7, get_datasource/2, get_ack/4]). -get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, Payload, Acc) -> +get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, _DataSource, Payload, Acc) -> case maps:find(Identifier, Payload) of {ok, Value} -> Addr = dgiot_gps:get_gpsaddr(Value), @@ -31,33 +31,38 @@ get_userdata(ProductId, Identifier, _DataForm, #{<<"type">> := <<"geopoint">>}, Acc end; -get_userdata(_ProductId, Identifier, #{<<"collection">> := Collection} = DataForm, #{<<"type">> := Type, <<"specs">> := Specs}, Payload, Acc) -> +get_userdata(_ProductId, Identifier, #{<<"collection">> := Collection}, DataType, DataSource, Payload, Acc) -> case maps:find(Identifier, Payload) of {ok, Value} -> - Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]), - Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]), - case dgiot_task:string2value(Str1, Type, Specs) of - error -> - maps:without([Identifier], Acc); - Value1 -> - Acc#{Identifier => Value1} - end; + calculate_value(Value, Collection, Identifier, DataType, Acc); _ -> - Address = maps:get(<<"address">>, DataForm, <<"">>), + Address = maps:get(<<"address">>, DataSource, <<"">>), case maps:find(Address, Payload) of {ok, Value} -> - Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]), - Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]), - case dgiot_task:string2value(Str1, Type, Specs) of - error -> - maps:without([Identifier], Acc); - Value1 -> - Acc#{Identifier => Value1} - end; - _ -> Acc + NewAcc = maps:without([Address], Acc), + calculate_value(Value, Collection, Identifier, DataType, NewAcc); + _ -> +%% 电表 + Di = maps:get(<<"di">>, DataSource, <<"">>), + case maps:find(Di, Payload) of + {ok, Value} -> + NewAcc = maps:without([Di], Acc), + calculate_value(Value, Collection, Identifier, DataType, NewAcc); + _ -> + Acc + end end end. +calculate_value(Value, Collection, Identifier, #{<<"type">> := Type, <<"specs">> := Specs}, Acc) -> + Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]), + Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]), + case dgiot_task:string2value(Str1, Type, Specs) of + error -> + maps:without([Identifier], Acc); + Value1 -> + Acc#{Identifier => Value1} + end. get_ack(ProductId, Payload, Dis, Ack) -> NewPayload = diff --git a/apps/dgiot_task/src/dgiot_task_worker.erl b/apps/dgiot_task/src/dgiot_task_worker.erl index f4ee224dfb..dcfa0bd649 100644 --- a/apps/dgiot_task/src/dgiot_task_worker.erl +++ b/apps/dgiot_task/src/dgiot_task_worker.erl @@ -42,14 +42,21 @@ start_link(Args) -> %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"starttime">> := StartTime, <<"endtime">> := EndTime, <<"freq">> := Freq, <<"rand">> := Rand}]) -> +init([#{<<"channel">> := ChannelId, <<"client">> := ClientId, <<"starttime">> := StartTime, <<"endtime">> := EndTime, <<"freq">> := Freq} = Args]) -> dgiot_client:add(ChannelId, ClientId), dgiot_metrics:inc(dgiot_task, <<"task">>, 1), NextTime = dgiot_client:get_nexttime(StartTime, Freq), Count = dgiot_client:get_count(StartTime, EndTime, Freq), + Rand = + case maps:get(<<"rand">>,Args, true) of + true -> + dgiot_client:get_rand(Freq); + _ -> + 0 + end, io:format("~s ~p ChannelId ~p ClientId ~p NextTime = ~p Freq ~p Count = ~p.~n", [?FILE, ?LINE, ChannelId, ClientId, NextTime, Freq, Count]), Dclient = #dclient{channel = ChannelId, client = ClientId, status = ?DCLIENT_INTIALIZED, userdata = #device_task{}, - clock = #dclock{nexttime = NextTime, freq = Freq, count = Count, round = 0, rand = Rand}}, + clock = #dclock{nexttime = NextTime + Rand, freq = Freq, count = Count, round = 0}}, {ok, Dclient}; init(A) -> @@ -79,7 +86,7 @@ handle_info({change_clock, NextTime, EndTime, Freq}, #dclient{clock = Clock} = D %% 定时触发网关及网关任务, 在单个任务轮次中,要将任务在全局上做一下错峰操作 handle_info(next_time, #dclient{channel = Channel, client = Client, userdata = UserData, - clock = #dclock{round = Round, nexttime = NextTime, count = Count, freq = Freq, rand = Rand} = Clock} = Dclient) -> + clock = #dclock{round = Round, nexttime = NextTime, count = Count, freq = Freq} = Clock} = Dclient) -> dgiot_client:stop(Channel, Client, Count), %% 检查是否需要停止任务 NewNextTime = dgiot_client:get_nexttime(NextTime, Freq), case dgiot_task:get_pnque(Client) of @@ -90,7 +97,7 @@ handle_info(next_time, #dclient{channel = Channel, client = Client, userdata = U PnQueLen = dgiot_task:get_pnque_len(Client), DiQue = dgiot_task:get_instruct(ProductId, NewRound), %% io:format("~s ~p DiQue = ~p.~n", [?FILE, ?LINE, DiQue]), - dgiot_client:send_after(10, Freq, Rand, read), % 每轮任务开始时,做一下随机开始 + erlang:send_after(100, self(), read), % 每轮任务开始时,做一下随机开始 {noreply, Dclient#dclient{userdata = UserData#device_task{product = ProductId, devaddr = DevAddr, pnque_len = PnQueLen, dique = DiQue}, clock = Clock#dclock{nexttime = NewNextTime, count = Count - 1, round = NewRound}}} end; @@ -100,11 +107,11 @@ handle_info(read, State) -> {noreply, send_msg(State)}; %% ACK消息触发进行新的指令发送 -handle_info({dclient_ack, Topic, Payload}, #dclient{userdata = Usedata} = State) -> +handle_info({dclient_ack, Topic, Payload}, #dclient{channel = ChannelId, userdata = Usedata} = State) -> dgiot_metrics:inc(dgiot_task, <<"task_recv">>, 1), case binary:split(Topic, <<$/>>, [global, trim]) of [<<"$dg">>, <<"thing">>, ProductId, DevAddr, <<"properties">>, <<"report">>] -> -%% io:format("~s ~p Payload = ~p.~n", [?FILE, ?LINE, Payload]), + dgiot_bridge:send_log(dgiot_utils:to_binary(ChannelId), ProductId, DevAddr, "~s ~p recv => ~p ~ts ", [?FILE, ?LINE, Topic, unicode:characters_to_list(jsx:encode(Payload))]), dgiot_task:save_td(ProductId, DevAddr, Payload, #{}), {noreply, send_msg(State#dclient{userdata = Usedata#device_task{product = ProductId, devaddr = DevAddr}})}; _ -> @@ -141,7 +148,7 @@ send_msg(#dclient{channel = Channel, userdata = #device_task{product = Product, %% io:format("~s ~p Payload1 = ~p.~n", [?FILE, ?LINE, Payload1]), Topic = <<"$dg/device/", Product/binary, "/", DevAddr/binary, "/properties">>, dgiot_mqtt:publish(Channel, Topic, jsx:encode(Payload1)), - dgiot_bridge:send_log(Channel, Product, DevAddr, "to_dev=> ~s ~p ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(jsx:encode(Payload1))]), + dgiot_bridge:send_log(Channel, Product, DevAddr, "~s ~p to dev => ~ts: ~ts", [?FILE, ?LINE, unicode:characters_to_list(Topic), unicode:characters_to_list(jsx:encode(Payload1))]), {Count + 1, Acc ++ [Payload1], Acc1 ++ [Identifier1]}; _ -> {Count, Acc, Acc1} @@ -153,14 +160,18 @@ send_msg(#dclient{channel = Channel, userdata = #device_task{product = Product, State#dclient{userdata = UserData#device_task{dique = NewDisQue, interval = Interval}}. get_next_pn(#dclient{client = CLient, clock = #dclock{round = Round}, userdata = UserData} = State) -> - {NextProductId, NextDevAddr} = dgiot_task:get_pnque(CLient), - NextDeviceId = dgiot_parse_id:get_deviceid(NextProductId, NextDevAddr), - case NextDeviceId of - CLient -> + case dgiot_task:get_pnque(CLient) of + not_find -> State; - _ -> - DisQue = dgiot_task:get_instruct(NextProductId, Round), - NewState = State#dclient{client = NextDeviceId, userdata = UserData#device_task{product = NextProductId, devaddr = NextDevAddr, dique = DisQue}}, - send_msg(NewState) + {NextProductId, NextDevAddr} -> + NextDeviceId = dgiot_parse_id:get_deviceid(NextProductId, NextDevAddr), + case NextDeviceId of + CLient -> + State; + _ -> + DisQue = dgiot_task:get_instruct(NextProductId, Round), + NewState = State#dclient{client = NextDeviceId, userdata = UserData#device_task{product = NextProductId, devaddr = NextDevAddr, dique = DisQue}}, + send_msg(NewState) + end end. diff --git a/dgiot_install.sh b/dgiot_install.sh index b1f11de08b..eded26613d 100644 --- a/dgiot_install.sh +++ b/dgiot_install.sh @@ -1515,9 +1515,9 @@ dgiot_shell # set parameters by default value deployType=single # [single | cluster | devops | ci] domain_name="prod.iotn2n.com" # [prod.iotn2n.com | your_domain_name] -software="dgiot_n139" # [dgiot_n139| dgiot_n] +software="dgiot_n165" # [dgiot_n165| dgiot_n] plugin="dgiot" # [dgiot | dgiot_your_plugin] -dgiotmd5="3da7c2e4929259d3c3c023f67f8bb979" # [dgiotmd5] +dgiotmd5="da072c15a460fb8ef7095d142b3fcc10" # [dgiotmd5] pg_eip="changeyourip" # [datanode_eip] pg_auth='changeyourpassword' # [pg_auth]