Skip to content

Commit

Permalink
feat: meter to_frame
Browse files Browse the repository at this point in the history
  • Loading branch information
AvantLiu committed Feb 24, 2022
1 parent d7bb382 commit f375b68
Show file tree
Hide file tree
Showing 12 changed files with 217 additions and 93 deletions.
31 changes: 31 additions & 0 deletions apps/dgiot_dlink/src/dgiot_dlink.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,37 @@

-include_lib("dgiot/include/logger.hrl").

-define(TYPE, <<"DLINK">>).

%% 注册协议类型
-protocol_type(#{
cType => ?TYPE,
type => <<"DLINK">>,
colum => 10,
title => #{
zh => <<"DLINK协议"/utf8>>
},
description => #{
zh => <<"DLINK协议"/utf8>>
}
}).
%% 注册协议参数
-params(#{
<<"dis">> => #{
order => 1,
type => string,
allowCreate => true,
required => true,
default => <<"00"/utf8>>,
title => #{
zh => <<"数据标识"/utf8>>
},
description => #{
zh => <<"数据标识"/utf8>>
}
}
}).

start(Server) ->
Services = #{protos => [dgiot_dlink_pb],
services => #{'dgiot.Dlink' => dgiot_dlink_server}
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_http/priv/swagger/swagger_http.json
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@
"data": {
"type": "object",
"required": true,
"example": "{}",
"example": {},
"description": "模板内容"
}
}
Expand Down
25 changes: 17 additions & 8 deletions apps/dgiot_meter/src/dgiot_meter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ create_meter(MeterAddr, ChannelId, DTUIP, DtuAddr) ->
<<"devModel">> => <<"Meter">>
},
dgiot_device:create_device(Requests),
Topic = <<"profile/", ProductId/binary, "/", MeterAddr/binary>>,
dgiot_mqtt:subscribe(Topic),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
dgiot_task:save_pnque(DtuProductId, DtuAddr, ProductId, MeterAddr);
_ ->
Expand Down Expand Up @@ -115,7 +117,7 @@ create_meter4G(DevAddr, ChannelId, DTUIP) ->


get_sub_device(DtuAddr) ->
Query = #{<<"keys">> => [<<"devaddr">>, <<"product">>,<<"route">>],
Query = #{<<"keys">> => [<<"devaddr">>, <<"product">>, <<"route">>],
<<"where">> => #{<<"route.", DtuAddr/binary>> => #{<<"$regex">> => <<".+">>}},
<<"order">> => <<"devaddr">>, <<"limit">> => 256},
case dgiot_parse:query_object(<<"Device">>, Query) of
Expand All @@ -139,9 +141,10 @@ parse_frame(?DLT376, Buff, Opts) ->
% DLT376发送抄数指令
to_frame(#{
<<"devaddr">> := Addr,
<<"di">> := Di,
<<"protocol">> := ?DLT376,
<<"data">> := <<"null">>
<<"dataSource">> := #{
<<"di">> := Di
}
} = Frame) ->
dlt376_decoder:to_frame(Frame#{
<<"msgtype">> => ?DLT376,
Expand All @@ -155,9 +158,10 @@ to_frame(#{
% DLT645 组装电表抄表指令
to_frame(#{
<<"devaddr">> := Addr,
<<"di">> := Di,
<<"protocol">> := ?DLT645,
<<"data">> := <<"null">>
<<"dataSource">> := #{
<<"di">> := Di
}
} = Frame) ->
dlt645_decoder:to_frame(Frame#{
<<"msgtype">> => ?DLT645,
Expand Down Expand Up @@ -290,15 +294,20 @@ to_frame(#{

to_frame(#{
<<"devaddr">> := Addr,
<<"di">> := Di,
<<"protocol">> := ?DLT645
<<"protocol">> := ?DLT645,
<<"dataSource">> := #{
<<"di">> := Di
}
} = Frame) ->
dlt645_decoder:to_frame(Frame#{
<<"msgtype">> => ?DLT645,
<<"addr">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Addr)),
<<"di">> => dlt645_proctol:reverse(dgiot_utils:hex_to_binary(Di)),
<<"command">> => ?DLT645_MS_READ_DATA
}).
});

to_frame(Frame) ->
io:format("~s ~p Error Frame = ~p.~n", [?FILE, ?LINE, Frame]).

search_meter(tcp, _Ref, TCPState, 0) ->
Payload = dlt645_decoder:to_frame(#{
Expand Down
11 changes: 10 additions & 1 deletion apps/dgiot_meter/src/dgiot_meter_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,16 @@ init(?TYPE, ChannelId, #{
{ProductId, #{<<"ACL">> := Acl, <<"nodeType">> := 3, <<"thing">> := Thing}} ->
dgiot_data:insert({dtu, ChannelId}, {ProductId, Acl, maps:get(<<"properties">>, Thing, [])});
{ProductId, #{<<"ACL">> := Acl, <<"thing">> := Thing}} ->
dgiot_data:insert({meter, ChannelId}, {ProductId, Acl, maps:get(<<"properties">>, Thing, [])});
Props = maps:get(<<"properties">>, Thing, []),
dgiot_data:insert({meter, ChannelId}, {ProductId, Acl, Props}),
lists:map(fun(Prop) ->
case Prop of
#{<<"identifier">> := Identifier, <<"dataSource">> := #{<<"di">> := Di}} ->
dgiot_data:insert({protocol, Di, ProductId}, Identifier);
_ ->
pass
end
end, Props);
_ ->
pass
end
Expand Down
44 changes: 23 additions & 21 deletions apps/dgiot_meter/src/dgiot_meter_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
<<16#68, _:4/bytes, 16#68, _A1:8/bytes, _Rest/binary>> ->
{_, [Acc | _]} = dlt376_decoder:parse_frame(Buff, []), %% NewBuff
#{<<"msgtype">> := Protocol1, <<"con">> := Con, <<"addr">> := MeterAddr} = Acc,
Concentrator = maps:get(<<"concentrator">>, Acc, <<16#31,16#07,16#5F,16#81,16#00>>),
Concentrator = maps:get(<<"concentrator">>, Acc, <<16#31, 16#07, 16#5F, 16#81, 16#00>>),
case Con of
1 ->
Frame1 = maps:get(<<"frame">>, Acc, <<>>),
Expand Down Expand Up @@ -93,7 +93,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
pass
end,
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT376,ref = NewRef, step = NewStep}}};
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT376, ref = NewRef, step = NewStep}}};
?DLT645 ->
dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
Expand All @@ -119,9 +119,9 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
{Ref, Step, _Payload} = dgiot_meter:search_meter(tcp, undefined, TCPState, 1),
{Ref, Step}
end,
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
dgiot_bridge:send_log(ChannelId, DtuProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(DtuAddr)]),
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT645, ref = NewRef, step = NewStep}}}
end;

Expand Down Expand Up @@ -168,9 +168,9 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, pr
{Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
?LOG(info, "Frames ~p~n", [Frames]), %[#{<<"addr">> => <<"330100480000">>,<<"di">> => <<0,1,0,0>>,<<"value">> => #{<<"00010000">> => 0}}]
case Frames of
[#{<<"con">> := 1, <<"frame">> := Frame}| _] ->
[#{<<"con">> := 1, <<"frame">> := Frame} | _] ->
dgiot_tcp_server:send(TCPState, Frame); %%回复确认
[#{<<"afn">> := 16#0A, <<"di">> := <<16#00,16#00,16#02,16#01>>}| _] ->
[#{<<"afn">> := 16#0A, <<"di">> := <<16#00, 16#00, 16#02, 16#01>>} | _] ->
dlt376_decoder:process_message(Frames, ChannelId, DTUIP); %%注册或更新电表信
_ ->
dlt376_decoder:process_message(Frames, ChannelId)
Expand All @@ -186,8 +186,8 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, pr
end;

handle_info({retry, Concentrator}, TCPState) ->
Crc1 = dgiot_utils:get_parity(<<16#4B,Concentrator/binary,16#0C,16#60,16#01,16#01,16#01,16#10>>),
Frame1 = <<16#68,16#32,16#00,16#32,16#00,16#68,16#4B,Concentrator/binary,16#0C,16#60,16#01,16#01,16#01,16#10,Crc1:8,16#16>>,
Crc1 = dgiot_utils:get_parity(<<16#4B, Concentrator/binary, 16#0C, 16#60, 16#01, 16#01, 16#01, 16#10>>),
Frame1 = <<16#68, 16#32, 16#00, 16#32, 16#00, 16#68, 16#4B, Concentrator/binary, 16#0C, 16#60, 16#01, 16#01, 16#01, 16#10, Crc1:8, 16#16>>,
dgiot_tcp_server:send(TCPState, Frame1), %16#27,16#03,16#00,16#72,16#05,16#53
%% UserData = <<16#4B,Concentrator/binary,16#10,16#60,16#00,16#00,16#01,16#01,16#02,16#01,16#00,16#00,16#00,16#00,16#00,16#00,16#68,16#27,16#02,16#11,16#07,16#15,16#01,16#00,16#00,16#01,16#00>>,
%% Crc2 = dgiot_utils:get_parity(UserData),
Expand All @@ -213,31 +213,33 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = _ChannelId, protoco
Route = dgiot_data:get({concentrator, ProductId, DevAddr}),
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(ThingData), %%DLT645协议,需要透传转发
?LOG(info,"Route ~p DevAddr:~p Payload1:~p ~n~n",[Route, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
?LOG(info, "Route ~p DevAddr:~p Payload1:~p ~n~n", [Route, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
HexPayload = dgiot_utils:binary_to_hex(Payload1),
Payload2 = dlt376_decoder:to_frame(#{<<"afn">> => 16#10,
<<"command">> => 16#4A,
<<"concentrator">> => <<16#01,16#33,16#48,16#00,16#00>>,
<<"concentrator">> => <<16#01, 16#33, 16#48, 16#00, 16#00>>,
<<"di">> => <<"00000100">>,
<<"data">> => <<"026B81801000", HexPayload/binary, "00000000000000000000000000000000">>}),
?LOG(info,"Payload2:~p ~n~n",[dgiot_utils:binary_to_hex(Payload2)]),
?LOG(info, "Payload2:~p ~n~n", [dgiot_utils:binary_to_hex(Payload2)]),
dgiot_tcp_server:send(TCPState, Payload2);
?DLT645 ->
[#{<<"thingdata">> := ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(ThingData),
%% io:format("~s ~p DLT645 Payload1 = ~p.~n", [?FILE, ?LINE, Payload1]),
dgiot_tcp_server:send(TCPState, Payload1)
end;
[<<"profile">>, _ProductId, DevAddr] -> %%[<<"profile">>,<<"ecfbf67dd7">>,<<"330100480000">>
%% io:format("~s ~p ProData ~p Protocol ~p~n", [?FILE, ?LINE, Payload, Protocol]),
case Protocol of
?DLT376 ->
Payload2 = dlt376_decoder:frame_write_param(#{<<"concentrator">> => DevAddr, <<"payload">> => jsx:decode(Payload)}),
?LOG(info, "Payload2:~p ~n~n", [dgiot_utils:binary_to_hex(Payload2)]),
dgiot_tcp_server:send(TCPState, Payload2);
?DLT645 ->
Payload1 = dgiot_meter:to_frame(Payload),
dgiot_tcp_server:send(TCPState, Payload1)
end;
case Protocol of
?DLT376 ->
Payload2 = dlt376_decoder:frame_write_param(#{<<"concentrator">> => DevAddr, <<"payload">> => jsx:decode(Payload)}),
?LOG(info, "Payload2:~p ~n~n", [dgiot_utils:binary_to_hex(Payload2)]),
dgiot_tcp_server:send(TCPState, Payload2);
?DLT645 ->
Payload1 = dlt645_decoder:frame_write_param(#{<<"meter">> => DevAddr, <<"payload">> => jsx:decode(Payload)}),
?LOG(info, "DLT645 Payload1 :~p ~n~n", [dgiot_utils:binary_to_hex(Payload1)]),
dgiot_tcp_server:send(TCPState, Payload1)
end;
[<<"thingctrl">>, _ProductId, _DevAddr] ->
#tcp{state = #state{protocol = Protocol}} = TCPState,
case Protocol of
Expand Down
8 changes: 4 additions & 4 deletions apps/dgiot_meter/src/proctol/dlt376_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@
},
<<"type">> => #{
order => 3,
type => enum,
type => string,
required => true,
default => #{<<"value">> => <<"byte">>, <<"label">> => <<"byte">>},
default => #{<<"value">> => <<"bytes">>, <<"label">> => <<"bytes">>},
enum => [
#{<<"value">> => <<"byte">>, <<"label">> => <<"byte">>},
#{<<"value">> => <<"bytes">>, <<"label">> => <<"bytes">>},
#{<<"value">> => <<"little">>, <<"label">> => <<"little">>},
#{<<"value">> => <<"bit">>, <<"label">> => <<"bit">>}
],
Expand Down Expand Up @@ -532,7 +532,7 @@ frame_write_param(#{<<"concentrator">> := ConAddr, <<"payload">> := Frame}) ->
{BitList, Afn, Da, Fn} =
lists:foldl(fun(Index, {Acc, A, D, F}) ->
case maps:find(dgiot_utils:to_binary(Index), Frame) of
{ok, #{<<"value">> := Value, <<"dataForm">> := #{<<"afn">> := AFN, <<"da">> := Da, <<"di">> := FN, <<"length">> := Len, <<"type">> := Type} = _DataForm}} ->
{ok, #{<<"value">> := Value, <<"dataSource">> := #{<<"afn">> := AFN, <<"da">> := Da, <<"di">> := FN, <<"length">> := Len, <<"type">> := Type} = _DataForm}} ->
io:format("~s ~p Value ~p. Da ~p FN ~p ~n", [?FILE, ?LINE, Value, Da,FN]),
DA = dgiot_utils:binary_to_hex(pn_to_da(Da)),
case Type of
Expand Down
83 changes: 75 additions & 8 deletions apps/dgiot_meter/src/proctol/dlt645_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
test/0,
parse_value/2,
binary_to_dtime_dlt645_bcd/1,
process_message/2
process_message/2,
frame_write_param/1
]).

-define(TYPE, ?DLT645).
Expand All @@ -45,25 +46,37 @@
}).
%% 注册协议参数
-params(#{
<<"di">> => #{
<<"afn">> => #{
order => 1,
type => string,
required => true,
default => <<"00"/utf8>>,
title => #{
zh => <<"功能码"/utf8>>
},
description => #{
zh => <<"功能码"/utf8>>
}
},
<<"di">> => #{
order => 2,
type => string,
required => true,
default => <<"0000"/utf8>>,
title => #{
zh => <<"信息标识"/utf8>>
},
description => #{
zh => <<"信息标识"/utf8>>
zh => <<"信息标识 di"/utf8>>
}
},
<<"type">> => #{
order => 2,
type => enum,
order => 3,
type => string,
required => true,
default => <<"byte"/utf8>>,
default => #{<<"value">> => <<"bytes">>, <<"label">> => <<"bytes">>},
enum => [
#{<<"value">> => <<"byte">>, <<"label">> => <<"byte">>},
#{<<"value">> => <<"bytes">>, <<"label">> => <<"bytes">>},
#{<<"value">> => <<"little">>, <<"label">> => <<"little">>},
#{<<"value">> => <<"bit">>, <<"label">> => <<"bit">>}
],
Expand All @@ -75,7 +88,7 @@
}
},
<<"length">> => #{
order => 3,
order => 4,
type => integer,
required => true,
default => 2,
Expand Down Expand Up @@ -356,6 +369,60 @@ to_frame(#{
16#16
>>.

frame_write_param(#{<<"meter">> := MeterAddr, <<"payload">> := Frame}) ->
Addr = dlt645_proctol:reverse(dgiot_utils:hex_to_binary(MeterAddr)),
Length = length(maps:keys(Frame)),
io:format("~s ~p SortFrame ~p.~n", [?FILE, ?LINE, Length]),
{BitList, Afn} =
lists:foldl(fun(Index, {Acc, A}) ->
case maps:find(dgiot_utils:to_binary(Index), Frame) of
{ok, #{<<"value">> := Value, <<"dataSource">> := #{<<"afn">> := AFN, <<"length">> := Len, <<"type">> := Type}}} ->
io:format("~s ~p Value ~p.", [?FILE, ?LINE, Value]),
case Type of
<<"bytes">> ->
NewValue = dgiot_utils:hex_to_binary(Value),
io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, NewValue]),
{get_values(Acc, NewValue), dgiot_utils:hex_to_binary(AFN)};
<<"little">> ->
NewValue = dgiot_utils:to_int(Value),
L = dgiot_utils:to_int(Len),
Len1 = L * 8,
io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, NewValue]),
{get_values(Acc, <<NewValue:Len1/little>>), dgiot_utils:hex_to_binary(AFN)};
<<"bit">> ->
NewValue = dgiot_utils:to_int(Value),
L = dgiot_utils:to_int(Len),
io:format("~s ~p NewValue ~p.~n", [?FILE, ?LINE, NewValue]),
{Acc ++ [{NewValue, L}], dgiot_utils:hex_to_binary(AFN)};
_ ->
{Acc, A}
end;
_ ->
{Acc, A}
end
end, {[], 0}, lists:seq(1, Length)),
io:format("~s ~p BitList ~p.~n", [?FILE, ?LINE, BitList]),
UserZone = <<<<V:BitLen>> || {V, BitLen} <- BitList>>,
UserZone33 = list_to_binary(dgiot_utils:add_33h(UserZone)),
Len = byte_size(UserZone),
io:format("~s ~p UserZone ~p", [?FILE, ?LINE, UserZone]),
io:format("~s ~p Addr ~p. Afn ~p ~n", [?FILE, ?LINE, Addr, Afn]),
Crc = dgiot_utils:get_parity(<<16#68, Addr/binary, 16#68, Afn/binary, Len:8, UserZone33/binary>>),
<<
16#68,
Addr/binary,
16#68,
Afn/binary,
Len:8,
UserZone33/binary,
Crc:8,
16#16
>>.

get_values(Acc, Data) ->
lists:foldl(fun(V, Acc1) ->
Acc1 ++ [{V, 8}]
end, Acc, binary_to_list(Data)).

test() ->
B1 = <<12, 16#68, 16#01, 16#00, 16#00, 16#00, 16#00, 16#00, 16#68, 16#91, 16#08, 16#33, 16#33, 16#3D, 16#33, 16#33, 16#33, 16#33, 16#33, 16#0C, 16#16,
Expand Down
Loading

0 comments on commit f375b68

Please sign in to comment.