Skip to content

Commit

Permalink
sync meter
Browse files Browse the repository at this point in the history
  • Loading branch information
lsxredrain committed Jul 8, 2021
1 parent 74ab67c commit 9de476d
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 41 deletions.
3 changes: 2 additions & 1 deletion apps/dgiot_meter/include/dgiot_meter.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
env = #{},
dtuaddr = <<>>,
step = login,
ref = undefined
ref = undefined,
search = <<"quick">>
}).

%% Internal Header File
Expand Down
39 changes: 25 additions & 14 deletions apps/dgiot_meter/src/dgiot_meter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@
-export([search_meter/1, search_meter/4]).
-export([
create_dtu/3,
create_meter/4
create_meter/4,
get_sub_device/1
]).

-define(APP, ?MODULE).

%%新设备
create_dtu(DtuAddr, ChannelId, DTUIP) ->
?LOG(info,"~p", [dgiot_data:get({dtu, ChannelId})]),
?LOG(info, "~p", [dgiot_data:get({dtu, ChannelId})]),
{ProductId, Acl, _Properties} = dgiot_data:get({dtu, ChannelId}),
Requests = #{
<<"devaddr">> => DtuAddr,
Expand Down Expand Up @@ -62,6 +63,16 @@ create_meter(MeterAddr, ChannelId, DTUIP, DtuAddr) ->
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
dgiot_task:save_pnque(DtuProductId, DtuAddr, ProductId, MeterAddr).

get_sub_device(DtuAddr) ->
Query = #{<<"keys">> => [<<"devaddr">>, <<"product">>],
<<"where">> => #{<<"route.", DtuAddr/binary>> => #{<<"$regex">> => <<".+">>}},
<<"order">> => <<"devaddr">>, <<"limit">> => 256},
case dgiot_parse:query_object(<<"Device">>, Query) of
{ok, #{<<"results">> := []}} -> [];
{ok, #{<<"results">> := List}} -> List;
_ -> []
end.

parse_frame(dlt645, Buff, Opts) ->
{Rest, Frames} = dlt645_decoder:parse_frame(Buff, Opts),
{Rest, lists:foldl(fun(X, Acc) ->
Expand Down Expand Up @@ -96,6 +107,17 @@ to_frame(#{
<<"command">> => ?DLT645_MS_READ_DATA
}).

search_meter(tcp, _Ref, TCPState, 0) ->
Payload = dlt645_decoder:to_frame(#{
<<"msgtype">> => ?DLT645,
<<"addr">> => dlt645_proctol:reverse(<<16#AA, 16#AA, 16#AA, 16#AA, 16#AA, 16#AA>>),
<<"command">> => ?DLT645_MS_READ_DATA,
<<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>)
}),
?LOG(info, "Payload ~p", [dgiot_utils:binary_to_hex(Payload)]),
dgiot_tcp_server:send(TCPState, Payload),
read_meter;

search_meter(tcp, Ref, TCPState, 1) ->
case Ref of
undefined ->
Expand All @@ -111,18 +133,7 @@ search_meter(tcp, Ref, TCPState, 1) ->
Payload ->
dgiot_tcp_server:send(TCPState, Payload),
{erlang:send_after(1500, self(), search_meter), search_meter, Payload}
end;

search_meter(tcp, _Ref, TCPState, 0) ->
Payload = dlt645_decoder:to_frame(#{
<<"msgtype">> => ?DLT645,
<<"addr">> => dlt645_proctol:reverse(<<16#AA, 16#AA, 16#AA, 16#AA, 16#AA, 16#AA>>),
<<"command">> => ?DLT645_MS_READ_DATA,
<<"di">> => dlt645_proctol:reverse(<<0, 0, 0, 0>>)
}),
?LOG(info,"Payload ~p", [dgiot_utils:binary_to_hex(Payload)]),
dgiot_tcp_server:send(TCPState, Payload),
read_meter.
end.

search_meter(1) ->
Flag =
Expand Down
30 changes: 25 additions & 5 deletions apps/dgiot_meter/src/dgiot_meter_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,19 @@
zh => <<"侦听端口"/utf8>>
}
},
<<"search">> => #{
order => 2,
type => enum,
required => false,
default => <<"quick"/utf8>>,
enum => [<<"nosearch">>, <<"quick">>, <<"normal">>],
title => #{
zh => <<"搜表模式"/utf8>>
},
description => #{
zh => <<"搜表模式:nosearch|quick|normal"/utf8>>
}
},
<<"ico">> => #{
order => 102,
type => string,
Expand All @@ -132,19 +145,26 @@ start(ChannelId, ChannelArgs) ->
%% 通道初始化
init(?TYPE, ChannelId, #{
<<"port">> := Port,
<<"product">> := Products}) ->
<<"product">> := Products,
<<"search">> := Search}) ->
lists:map(fun(X) ->
case X of
{ProductId, #{<<"ACL">> := Acl, <<"nodeType">> := 1, <<"thing">> := #{<<"properties">> := Properties}}} ->
{ProductId, #{<<"ACL">> := Acl, <<"nodeType">> := 1}} ->
{ok, #{<<"thing">> := #{<<"properties">> := Properties}}} = dgiot_device:lookup_prod(ProductId),
dgiot_data:insert({dtu, ChannelId}, {ProductId, Acl, Properties});
{ProductId, #{<<"ACL">> := Acl, <<"thing">> := #{<<"properties">> := Properties}}} ->
dgiot_data:insert({meter, ChannelId}, {ProductId, Acl, Properties})
{ProductId, #{<<"ACL">> := Acl}} ->
{ok, #{<<"thing">> := #{<<"properties">> := Properties}}} = dgiot_device:lookup_prod(ProductId),
dgiot_data:insert({meter, ChannelId}, {ProductId, Acl, Properties});
_ ->
?LOG(info,"X ~p", [X]),
pass
end
end, Products),
dgiot_metrics:start(dgiot_meter),
dgiot_data:set_consumer(ChannelId, 20),
State = #state{
id = ChannelId
id = ChannelId,
search = Search
},
{ok, State, dgiot_meter_tcp:start(Port, State)};

Expand Down
65 changes: 45 additions & 20 deletions apps/dgiot_meter/src/dgiot_meter_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,67 @@ init(TCPState) ->
{ok, TCPState}.

%%设备登录报文,登陆成功后,开始搜表
handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = <<>>} = State} = TCPState) when byte_size(DtuAddr) == 15 ->
?LOG(info,"DevAddr ~p ChannelId ~p", [DtuAddr, ChannelId]),
handle_info({tcp, DtuAddr}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = <<>>, search = Search} = State} = TCPState) ->
DTUIP = dgiot_utils:get_ip(Socket),
dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
{Ref, Step} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
{noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = DtuAddr, ref = Ref, step = Step}}};

%%设备登录异常报文丢弃
handle_info({tcp, ErrorBuff}, #tcp{state = #state{dtuaddr = <<>>}} = TCPState) ->
?LOG(info,"ErrorBuff ~p ", [ErrorBuff]),
{noreply, TCPState#tcp{buff = <<>>}};
HexDtuAddr = dgiot_utils:binary_to_hex(DtuAddr),
dgiot_meter:create_dtu(HexDtuAddr, ChannelId, DTUIP),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
{NewRef, NewStep} =
case Search of
<<"nosearch">> ->
[dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := Meterproductid, <<"devaddr">> := Meteraddr}
<- dgiot_meter:get_sub_device(DtuAddr)],
{undefined, read_meter};
<<"quick">> ->
dgiot_meter:search_meter(tcp, undefined, TCPState, 0),
{undefined, search_meter};
_ ->
{Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
{Ref, Step}
end,
{noreply, TCPState#tcp{buff = <<>>, state = State#state{dtuaddr = HexDtuAddr, ref = NewRef, step = NewStep}}};


%%定时器触发搜表
handle_info(search_meter, #tcp{state = #state{ref = Ref} = State} = TCPState) ->
{NewRef, Step} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1),
handle_info(search_meter, #tcp{state = #state{ref = Ref} = State} = TCPState) ->
{NewRef, Step, _Payload} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1),
{noreply, TCPState#tcp{buff = <<>>, state = State#state{ref = NewRef, step = Step}}};

%%ACK报文触发搜表
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, ref = Ref, step = search_meter} = State} = TCPState) ->
handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dtuaddr = DtuAddr, ref = Ref, step = search_meter, search = Search} = State} = TCPState) ->
?LOG(info,"from_dev: search_meter Buff ~p", [dgiot_utils:binary_to_hex(Buff)]),
?LOG(info,"from_dev: parse_frame Buff ~p", [dgiot_meter:parse_frame(dlt645, Buff, [])]),
{Rest, Frames} = dgiot_meter:parse_frame(dlt645, Buff, []),
lists:map(fun(X) ->
case X of
#{<<"addr">> := Addr} ->
?LOG(info,"from_dev: search_meter Addr ~p", [Addr]),
DTUIP = dgiot_utils:get_ip(Socket),
dgiot_meter:create_meter(dgiot_utils:binary_to_hex(Addr), ChannelId, DTUIP, DtuAddr);
_ ->
Other ->
?LOG(info,"Other ~p", [Other]),
pass %%异常报文丢弃
end
end, dgiot_meter:parse_frame(dlt645, Buff, [])),
{NewRef, Step} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1),
{noreply, TCPState#tcp{buff = <<>>, state = State#state{ref = NewRef, step = Step}}};
end, Frames),
case Search of
<<"normal">> ->
{NewRef, Step, _Payload} = dgiot_meter:search_meter(tcp, Ref, TCPState, 1),
{noreply, TCPState#tcp{buff = Rest, state = State#state{ref = NewRef, step = Step}}};
_ ->
case length(Frames) > 0 of
true ->
{noreply, TCPState#tcp{buff = Rest, state = State#state{ref = undefined, step = read_meter}}};
false ->
{noreply, TCPState#tcp{buff = Rest, state = State#state{ref = undefined, step = search_meter}}}
end
end;

%%接受抄表任务命令抄表
handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, step = read_meter}} = TCPState) ->
dgiot_bridge:send_log(ChannelId, "Topic ~p Msg ~p", [dgiot_mqtt:get_topic(Msg), dgiot_mqtt:get_payload(Msg)]),
case binary:split(dgiot_mqtt:get_topic(Msg), <<$/>>, [global, trim]) of
[<<"thing">>, _ProductId, _DevAddr] ->
#{<<"thingdata">> := ThingData} = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload = dgiot_meter:to_frame(ThingData),
dgiot_bridge:send_log(ChannelId, "from_task: ~ts: ~ts ", [_Topic, unicode:characters_to_list(dgiot_mqtt:get_payload(Msg))]),
?LOG(info,"task->dev: Payload ~p", [dgiot_utils:binary_to_hex(Payload)]),
Expand All @@ -85,7 +107,10 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, step = r

%% 接收抄表任务的ACK报文
handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, step = read_meter}} = TCPState) ->
case dgiot_meter:parse_frame(dlt645, Buff, []) of
dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
?LOG(info,"Buff ~p", [dgiot_utils:binary_to_hex(Buff)]),
{Rest, Frames} = dgiot_meter:parse_frame(dlt645, Buff, []),
case Frames of
[#{<<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Expand All @@ -95,7 +120,7 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, step = read_meter}}
end;
_ -> pass
end,
{noreply, TCPState#tcp{buff = <<>>}};
{noreply, TCPState#tcp{buff = Rest}};

%% 异常报文丢弃
%% {stop, TCPState} | {stop, Reason} | {ok, TCPState} | ok | stop
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_meter/src/proctol/dlt645_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
%% limitations under the License.
%%--------------------------------------------------------------------
-module(dlt645_decoder).
-author("weixingzheng").
-author("johnliu").
-include("dgiot_meter.hrl").
-include_lib("dgiot/include/logger.hrl").
-protocol([?DLT645]).
Expand Down
1 change: 1 addition & 0 deletions apps/dgiot_meter/src/proctol/dlt645_proctol.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

%% @doc dlt645 Protocol Processor.
-module(dlt645_proctol).
-author("johnliu").

-include_lib("dgiot_meter.hrl").
-include_lib("dgiot/include/logger.hrl").
Expand Down

0 comments on commit 9de476d

Please sign in to comment.