Skip to content

Commit

Permalink
Update CHANGELOG.md
Browse files Browse the repository at this point in the history
  • Loading branch information
lsxredrain committed Dec 23, 2021
1 parent 84ba3f2 commit 2a3ebbe
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 180 deletions.
165 changes: 23 additions & 142 deletions apps/dgiot_meter/src/dgiot_meter_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,14 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
DTUIP = dgiot_utils:get_ip(Socket),
NewBuff =
case is_binary(Buff) of
true -> dgiot_utils:binary_to_hex(Buff);
false -> Buff
end,
true -> dgiot_utils:binary_to_hex(Buff);
false -> Buff
end,
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
dgiot_bridge:send_log(ChannelId, DtuProductId, NewBuff, "Login ~p ", [NewBuff]),
dgiot_bridge:send_log(ChannelId, DtuProductId, NewBuff, "(登录) ~p ", [NewBuff]),
{Protocol, DtuAddr} =
case Buff of
<<16#68, _:4/bytes, 16#68,_A1:8/bytes,_Rest/binary>> ->
<<16#68, _:4/bytes, 16#68, _A1:8/bytes, _Rest/binary>> ->
{_, [Acc | _]} = dlt376_decoder:parse_frame(NewBuff, []),
#{<<"msgtype">> := Protocol1, <<"addr">> := MeterAddr} = Acc,
dgiot_meter:create_meter4G(MeterAddr, ChannelId, DTUIP),
Expand All @@ -59,22 +59,29 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, dt
{ProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
Topic = <<"thing/", ProductId/binary, "/", DtuAddr/binary>>,
dgiot_mqtt:subscribe(Topic), %为这个设备订阅一个mqtt
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "from dev ~p login", [dgiot_utils:binary_to_hex(Buff)]),
dgiot_bridge:send_log(ChannelId, ProductId, DtuAddr, "from dev ~p (登录)", [dgiot_utils:binary_to_hex(Buff)]),
{NewRef, NewStep} = {undefined, read_meter},
DtuId = dgiot_parse:get_deviceid(DtuProductId, DtuAddr),
dgiot_metrics:inc(dgiot_meter, <<"dtu_online">>, 1),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DtuId, state = State#state{dtuaddr = DtuAddr, protocol = ?DLT376, ref = NewRef, step = NewStep}}};
?DLT645 ->
dgiot_meter:create_dtu(DtuAddr, ChannelId, DTUIP),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
Topic = <<"profile/", DtuProductId/binary,"/", DtuAddr/binary>>,
Topic = <<"profile/", DtuProductId/binary, "/", DtuAddr/binary>>,
dgiot_mqtt:subscribe(Topic),
{NewRef, NewStep} =
case Search of
<<"nosearch">> ->
[dgiot_task:save_pnque(DtuProductId, DtuAddr, Meterproductid, Meteraddr) || #{<<"product">> := #{<<"objectId">> := Meterproductid}, <<"devaddr">> := Meteraddr}
<- dgiot_meter:get_sub_device(DtuAddr)],
{undefined, read_meter};
lists:map(fun(X) ->
case X of
#{<<"product">> := #{<<"objectId">> := MeterProductid}, <<"devaddr">> := Meteraddr} ->
dgiot_bridge:send_log(ChannelId, MeterProductid, Meteraddr, "save taskque Meteraddr ~p", [Meteraddr]),
dgiot_task:save_pnque(DtuProductId, DtuAddr, MeterProductid, Meteraddr);
_ ->
pass
end
end, dgiot_meter:get_sub_device(DtuAddr)),
{undefined, read_meter};
<<"quick">> ->
dgiot_meter:search_meter(tcp, undefined, TCPState, 0),
{undefined, search_meter};
Expand Down Expand Up @@ -127,115 +134,12 @@ handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, protocol = Protocol
?DLT376 ->
dgiot_bridge:send_log(ChannelId, "from_dev: ~p ", [dgiot_utils:binary_to_hex(Buff)]),
{Rest, Frames} = dgiot_meter:parse_frame(?DLT376, Buff, []),
case Frames of
% 返回抄表数据
[#{<<"di">> := <<16#01, 16#01, 16#01, 16#10>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/post">>, % 发送给mqtt进行数据存储
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
_ -> pass
end;
% 返回读取上次合闸时间
[#{<<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
_ -> pass
end;
% 返回读取上次拉闸时间
[#{<<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
_ -> pass
end;
% 拉闸,合闸成功
[#{<<"di">> := <<16#FE, 16#FE, 16#FE, 16#FE>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
_ -> pass
end;
% 拉闸,合闸失败
[#{<<"di">> := <<16#FE, 16#FE, 16#FE, 16#FD>>, <<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", Addr/binary, "/status">>,
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
_ -> pass
end;
_ -> pass
end,
dlt376_decoder:process_message(Frames, ChannelId),
{noreply, TCPState#tcp{buff = Rest}};
?DLT645 ->
{Rest, Frames} = dgiot_meter:parse_frame(?DLT645, Buff, []),
dgiot_bridge:send_log(ChannelId, "from_dev: Rest ~p Frames ~p ", [Rest, Frames]),
% ?LOG(info, "GGM 245 Frames ~p", [Frames]),
case Frames of
% 查询上一次合闸时间返回
[#{<<"command">> := 16#91, <<"di">> := <<16#1E, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
Di = <<16#1E, 16#00, 16#01, 16#01>>,
DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
_ -> pass
end;
% 查询上一次拉闸时间返回
[#{<<"command">> := 16#91, <<"di">> := <<16#1D, 16#00, 16#01, 16#01>>, <<"addr">> := Addr, <<"data">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
Di = <<16#1D, 16#00, 16#01, 16#01>>,
DValue = #{dgiot_utils:to_hex(Di) => dlt645_decoder:binary_to_dtime_dlt645_bcd(Value)},
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
_ -> pass
end;
% 拉闸,合闸成功
[#{<<"command">> := 16#9C, <<"addr">> := Addr} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
Di = <<16#FE, 16#FE, 16#FE, 16#FE>>,
DValue = #{dgiot_utils:to_hex(Di) => 0},
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
_ -> pass
end;
% 拉闸,合闸失败
[#{<<"command">> := 16#DC, <<"addr">> := Addr, <<"data">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/status">>,
Di = <<16#FE, 16#FE, 16#FE, 16#FD>>,
DValue = #{dgiot_utils:to_hex(Di) => dgiot_utils:to_hex(Value)},
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(DValue));
_ -> pass
end;
% 抄表数据返回
[#{<<"command">> := 16#91, <<"di">> := _Di, <<"addr">> := Addr, <<"value">> := Value} | _] ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, _ACL, _Properties} -> DevAddr = dgiot_utils:binary_to_hex(Addr),
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
dgiot_mqtt:publish(DeviceId, Topic, jsx:encode(Value));
_ -> pass
end;
_ -> pass
end,
dlt645_decoder:process_message(Frames, ChannelId),
{noreply, TCPState#tcp{buff = Rest}};
_ ->
{noreply, TCPState#tcp{buff = <<0, 0, 0, 0, 0, 0, 0, 0>>}}
Expand Down Expand Up @@ -278,28 +182,16 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = _ChannelId}} = TCPS
case ThingData of
% 远程控制电表拉闸、合闸
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
ThingData1 = #{
<<"devaddr">> => DevAddr,
<<"ctrlflag">> => CtrlFlag,
<<"devpass">> => DevPass,
<<"protocol">> => Protocol,
<<"apiname">> => get_meter_ctrl
},
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
% 上次合闸时间
ThingData1 = #{
<<"devaddr">> => DevAddr,
<<"ctrlflag">> => CtrlFlag,
<<"protocol">> => Protocol,
<<"apiname">> => get_meter_ctrl_status
},
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
_ ->
?LOG(info, "GGM 236 dgiot_meter_tcp, handle_info9 ~p", [ThingData]),
pass
end;
?DLT645 ->
Expand All @@ -308,23 +200,12 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = _ChannelId}} = TCPS
% 远程控制电表拉闸、合闸
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"devpass">> := DevPass, <<"apiname">> := <<"get_meter_ctrl">>} ->
% ?LOG(info, "GGM 212 dgiot_meter_tcp, handle_info9 ~p,~p,~p,~p",[DevAddr,CtrlFlag,DevPass,Protocol]),
ThingData1 = #{
<<"devaddr">> => DevAddr,
<<"ctrlflag">> => CtrlFlag,
<<"devpass">> => DevPass,
<<"protocol">> => Protocol,
<<"apiname">> => get_meter_ctrl
},
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"devpass">> => DevPass, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
% 获取上次拉闸、合闸的时间(一次发送两条查询指令)
#{<<"devaddr">> := DevAddr, <<"ctrlflag">> := CtrlFlag, <<"apiname">> := <<"get_meter_ctrl_status">>} ->
ThingData1 = #{
<<"devaddr">> => DevAddr,
<<"ctrlflag">> => CtrlFlag,
<<"protocol">> => Protocol,
<<"apiname">> => get_meter_ctrl_status
},
ThingData1 = #{<<"devaddr">> => DevAddr, <<"ctrlflag">> => CtrlFlag, <<"protocol">> => Protocol, <<"apiname">> => get_meter_ctrl_status},
Payload1 = dgiot_meter:to_frame(ThingData1),
dgiot_tcp_server:send(TCPState, Payload1);
_ ->
Expand Down
Loading

0 comments on commit 2a3ebbe

Please sign in to comment.