Skip to content

Commit

Permalink
feat: trace topic
Browse files Browse the repository at this point in the history
  • Loading branch information
AvantLiu committed Mar 10, 2022
1 parent db709fa commit 7e3a074
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 9 deletions.
16 changes: 9 additions & 7 deletions apps/dgiot/src/utils/dgiot_tracer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -101,14 +101,16 @@ check_trace(From, Topic, Payload) ->
case get_trace({clientid, From}) of
true ->
BinClientId = dgiot_utils:to_binary(From),
dgiot_mqtt:publish(From, <<"logger_trace/trace/", BinClientId/binary, "/", Topic/binary>>, Payload);
dgiot_mqtt:publish(From, <<"$dg/trace/", BinClientId/binary, "/", Topic/binary>>, Payload);
false ->
case get_trace({topic, Topic}) of
true ->
dgiot_mqtt:publish(self(), <<"logger_trace/trace/", Topic/binary>>, Payload);
false ->
false
end
%% todo Topic安全校验不好做
%% case get_trace({topic, Topic}) of
%% true ->
%% dgiot_mqtt:publish(self(), <<"$dg/trace/", Topic/binary>>, Payload);
%% false ->
%% false
%% end
ok
end.

%%--------------------------------------------------------------------
Expand Down
13 changes: 12 additions & 1 deletion apps/dgiot_dlink/src/dgiot_mqtt_acl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ do_check(#{clientid := ClientID} = _ClientInfo, subscribe, <<"$dg/channel/", Dev
deny
end;

%% "$dg/dashboard/{dashboardId}/{productId}/{deviceId}"
%% "$dg/dashboard/{dashboardId}/report"
do_check(#{clientid := ClientID} = _ClientInfo, subscribe, <<"$dg/dashboard/", DeviceInfo/binary>> = Topic) ->
io:format("~s ~p Topic: ~p~n", [?FILE, ?LINE, Topic]),
[DashboardId | _] = binary:split(DeviceInfo, <<"/">>, [global]),
Expand All @@ -110,6 +110,17 @@ do_check(#{clientid := ClientID} = _ClientInfo, subscribe, <<"$dg/dashboard/", D
deny
end;

%% "$dg/trace/{DeviceId}/{Topic}"
do_check(#{clientid := ClientID, username := Username} = _ClientInfo, subscribe, <<"$dg/trace/", DeviceInfo/binary>> = Topic) ->
io:format("~s ~p Topic: ~p~n", [?FILE, ?LINE, Topic]),
[DeviceId | _] = binary:split(DeviceInfo, <<"/">>, [global]),
case check_device_acl(ClientID, DeviceId, Username) of
ok ->
allow;
_ ->
deny
end;

do_check(_ClientInfo, _PubSub, Topic) ->
io:format("~s ~p Topic: ~p~n", [?FILE, ?LINE, Topic]),
deny.
Expand Down
2 changes: 1 addition & 1 deletion apps/dgiot_meter/src/dgiot_meter_tcp.erl
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ handle_info({deliver, _Topic, Msg}, #tcp{state = #state{id = ChannelId, protocol
DA = dgiot_utils:binary_to_hex(dlt376_decoder:pn_to_da(dgiot_utils:to_int(Da))),
[#{<<"thingdata">> := #{<<"dataSource">> := DataSource} = ThingData} | _] = jsx:decode(dgiot_mqtt:get_payload(Msg), [{labels, binary}, return_maps]),
Payload1 = dgiot_meter:to_frame(ThingData#{<<"devaddr">> => Dtuaddr, <<"dataSource">> => DataSource#{<<"da">> => DA}}),
dgiot_bridge:send_log(ChannelId, ChannelId, ProductId, DevAddr, " ~s ~p DLT376 send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
dgiot_bridge:send_log(ChannelId, ProductId, DevAddr, " ~s ~p DLT376 send to DevAddr ~p => ~p", [?FILE, ?LINE, DevAddr, dgiot_utils:binary_to_hex(Payload1)]),
dgiot_tcp_server:send(TCPState, Payload1)
end;
?DLT645 ->
Expand Down
3 changes: 3 additions & 0 deletions src/emqx_tracer.erl
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,9 @@ trace(publish, #message{topic = <<"$SYS/", _/binary>>}) ->
trace(publish, #message{topic = <<"logger_trace", _/binary>>}) ->
%% Do not trace '$SYS' publish
ignore;
trace(publish, #message{topic = <<"$dg/trace", _/binary>>}) ->
%% Do not trace '$SYS' publish
ignore;
trace(publish, #message{from = From, topic = Topic, payload = Payload})
when is_binary(From); is_atom(From) ->
emqx_hooks:run('mqtt_publish.trace',[From, Topic, Payload]).
Expand Down

0 comments on commit 7e3a074

Please sign in to comment.