Skip to content

Commit

Permalink
feat: add tcp save log
Browse files Browse the repository at this point in the history
  • Loading branch information
lsxredrain committed Oct 10, 2021
1 parent b432ce6 commit 868c8d0
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
* add tcp transparent ([b1f091c](https://github.com/dgiot/dgiot/commit/b1f091cba9fa143fcc8040a6fa8848ac89412e96))
* add tdengie metrics ([c38b046](https://github.com/dgiot/dgiot/commit/c38b046f1939bf6d59a106a62b2b1f3c86a89127))
* add trace log, Modify meter channel ([d4b9fd7](https://github.com/dgiot/dgiot/commit/d4b9fd758e2ca2643bdf5c560aa32ff441fac22f))
* add transparent log ([b432ce6](https://github.com/dgiot/dgiot/commit/b432ce6c13e3b1664685d74a6e5c08b73be987de))
* trace ([1eadb3a](https://github.com/dgiot/dgiot/commit/1eadb3acd12b3b6c7c818771999f2b5c3fc8b9a7))


Expand Down
11 changes: 4 additions & 7 deletions apps/dgiot/src/transport/dgiot_tcp_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ handle_info({tcp, Sock, Data}, #state{mod = Mod, child = #tcp{buff = Buff, socke
NewChildState = ChildState#tcp{buff = <<>>},
case NewChildState of
#tcp{clientid = CliendId, register = true} ->
dgiot_tracer:check_trace(CliendId, CliendId, Binary, ?MODULE, ?LINE);
dgiot_tracer:check_trace(CliendId, CliendId, dgiot_utils:binary_to_hex(Binary), ?MODULE, ?LINE);
_ ->
pass
end,
Expand Down Expand Up @@ -202,28 +202,25 @@ code_change(OldVsn, #state{mod = Mod, child = ChildState} = State, Extra) ->
%%% Internal functions
%%%===================================================================

send(#tcp{clientid = CliendId, register = true, transport = Transport, socket = Socket, log = Log}, Payload) ->
dgiot_tracer:check_trace(CliendId, CliendId, Payload, ?MODULE, ?LINE),
send(#tcp{clientid = CliendId, register = true, transport = Transport, socket = Socket}, Payload) ->
dgiot_tracer:check_trace(CliendId, CliendId, dgiot_utils:binary_to_hex(Payload), ?MODULE, ?LINE),
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1),
write_log(Log, <<"SEND">>, Payload),
case Socket == undefined of
true ->
{error, disconnected};
false ->
Transport:send(Socket, Payload)
end;

send(#tcp{transport = Transport, socket = Socket, log = Log}, Payload) ->
send(#tcp{transport = Transport, socket = Socket}, Payload) ->
dgiot_metrics:inc(dgiot_bridge, <<"tcp_server_send">>, 1),
write_log(Log, <<"SEND">>, Payload),
case Socket == undefined of
true ->
{error, disconnected};
false ->
Transport:send(Socket, Payload)
end.


rate_limit({Rate, Burst}) ->
esockd_rate_limit:new(Rate, Burst).

Expand Down
36 changes: 7 additions & 29 deletions apps/dgiot_bridge/src/channel/dgiot_tcp_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ handle_info({deliver, _, Msg}, TCPState) ->
Payload = dgiot_mqtt:get_payload(Msg),
Topic = dgiot_mqtt:get_topic(Msg),
case binary:split(Topic, <<$/>>, [global, trim]) of
[<<"transparent">>, DeviceId, <<"hex">>] ->
save_log(DeviceId,Payload),
dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(Payload)),
[<<"thing">>, ProductId,DevAddr,<<"tcp">>, <<"hex">>] ->
DeviceId = dgiot_parse:get_deviceid(ProductId,DevAddr),
dgiot_device:save_log(DeviceId, Payload, ['tcp_send']),
dgiot_tcp_server:send(TCPState, dgiot_utils:hex_to_binary(dgiot_utils:trim_string(Payload))),
{noreply, TCPState};
_ ->
case jsx:is_json(Payload) of
Expand Down Expand Up @@ -213,7 +214,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
{<<>>, <<>>} ->
{noreply, TCPState#tcp{buff = <<>>}};
{_, _} ->
sub_topic(DeviceId),
dgiot_device:sub_topic(DeviceId,<<"tcp/hex">>),
NewProducts = dgiot_utils:unique_1(Products ++ [NewProductId]),
dgiot_bridge:send_log(ChannelId, NewProductId, DtuAddr, "DeviceId ~p DTU revice from ~p", [DeviceId,DtuAddr]),
{noreply, TCPState#tcp{buff = <<>>, register = true, clientid = DeviceId,
Expand All @@ -222,7 +223,7 @@ handle_info({tcp, Buff}, #tcp{socket = Socket, state = #state{id = ChannelId, de
end;

handle_info({tcp, Buff}, #tcp{state = #state{id = ChannelId, product = Products, deviceId = DeviceId}} = TCPState) ->
pub_topic(DeviceId, Buff),
dgiot_device:save_log(DeviceId,dgiot_utils:binary_to_hex(dgiot_utils:to_binary(Buff)),['tcp_receive']),
case decode(Buff, Products, TCPState) of
{ok, [], NewTCPState} ->
{noreply, NewTCPState#tcp{buff = <<>>}};
Expand Down Expand Up @@ -360,7 +361,7 @@ create_device(DeviceId, ProductId, DTUMAC, DTUIP, Dtutype) ->
_ ->
<<"">>
end,
?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => DTUMAC, <<"productid">> => ProductId, <<"productname">> => Productname, <<"devicename">> => <<Dtutype/binary, DTUMAC/binary>>, <<"status">> => <<"上线"/utf8>>}, ['device_statuslog']),
?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => DTUMAC, <<"productid">> => ProductId, <<"productname">> => Productname, <<"devicename">> => <<Dtutype/binary, DTUMAC/binary>>}, ['online']),
{DeviceId, DTUMAC};
Error2 ->
?LOG(info, "Error2 ~p ", [Error2]),
Expand Down Expand Up @@ -414,7 +415,6 @@ get_productid(ChannelId, Products, Head, Dtutype) ->
<<"thing">> => #{},
<<"productSecret">> => dgiot_utils:random()
},
?LOG(info, "Product ~p", [Product]),
dgiot_parse:create_object(<<"Product">>, Product),
pass
end;
Expand All @@ -438,25 +438,3 @@ check_login(Head, Len, Addr) ->
<<>>
end
end.


sub_topic(DeviceId) ->
Topic = <<"profile/", DeviceId/binary>>,
dgiot_mqtt:subscribe(Topic),
Topic1 = <<"transparent/", DeviceId/binary>>,
dgiot_mqtt:subscribe(Topic1),
Topic2 = <<"transparent/", DeviceId/binary, "/hex">>,
dgiot_mqtt:subscribe(Topic2).

pub_topic(DeviceId, Payload) ->
Topic2 = <<"transparent/", DeviceId/binary, "/post/hex">>,
save_log(DeviceId, dgiot_utils:binary_to_hex(Payload)),
dgiot_mqtt:publish(DeviceId, Topic2, dgiot_utils:binary_to_hex(Payload)).


save_log(DeviceId,Payload) ->
case dgiot_device:lookup(DeviceId) of
{ok,{[true,_,_,DeviceName, Devaddr, ProductId],_}} ->
?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => Devaddr, <<"productid">> => ProductId, <<"devicename">> => DeviceName, <<"msg">> => Payload}, ['transparent']);
_ -> pass
end.
22 changes: 21 additions & 1 deletion apps/dgiot_device/src/dgiot_device.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-export([init_ets/0]).
-export([create_device/1, create_device/2, get_sub_device/1, get_sub_device/2, get/2]).
-export([load_device/1, sync_parse/1, post/1, put/1, save/1, save/2, save/3, lookup/1, lookup/2, delete/1, delete/2, save_prod/2, lookup_prod/1, get_online/1]).
-export([encode/1, decode/3, save_subdevice/2, get_subdevice/2, get_file/4, get_acl/1]).
-export([encode/1, decode/3, save_subdevice/2, get_subdevice/2, get_file/4, get_acl/1, save_log/3,sub_topic/2]).

init_ets() ->
dgiot_data:init(?DGIOT_PRODUCT),
Expand Down Expand Up @@ -466,3 +466,23 @@ get_appname(ProductId, DevAddr) ->
_ ->
<<"admin">>
end.

save_log(DeviceId,Payload,Domain) ->
case dgiot_device:lookup(DeviceId) of
{ok,{[true,_,_,DeviceName, Devaddr, ProductId],_}} ->
?MLOG(info, #{
<<"deviceid">> => DeviceId,
<<"devaddr">> => Devaddr,
<<"productid">> => ProductId,
<<"devicename">> => DeviceName,
<<"msg">> => Payload}, Domain);
_ -> pass
end.

sub_topic(DeviceId,Type) ->
case dgiot_device:lookup(DeviceId) of
{ok,{[true,_,_,_DeviceName, Devaddr, ProductId],_}} ->
Topic2 = <<"thing/", ProductId/binary,"/",Devaddr/binary, "/", Type/binary>>,
dgiot_mqtt:subscribe(Topic2);
_ -> pass
end.

0 comments on commit 868c8d0

Please sign in to comment.