Skip to content

Commit

Permalink
feat: add dgiot_mqtt_mock
Browse files Browse the repository at this point in the history
  • Loading branch information
lsxredrain committed Nov 9, 2022
1 parent c555b76 commit 01fb404
Show file tree
Hide file tree
Showing 8 changed files with 1,723 additions and 63 deletions.
37 changes: 36 additions & 1 deletion apps/dgiot_device/src/dgiot_device.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
-include_lib("dgiot/include/logger.hrl").
-define(TIMEOUT, 60000).

-export([create_device/1, create_device/2, get_sub_device/1, get_sub_device/2, save_subdevice/2, get_subdevice/2, get_subdevices/2, save_subdevice/3]).
-export([create_device/1, create_device/2, create_device/3, get_sub_device/1, get_sub_device/2, save_subdevice/2, get_subdevice/2, get_subdevices/2, save_subdevice/3]).
-export([parse_cache_Device/1, sync_parse/1, get/2, post/1, post/2, put/1, save/1, save/2, lookup/1, lookup/2, delete/1, delete/2]).
-export([save_profile/1, get_profile/1, get_profile/2, get_online/1, online/1, offline/1, offline_child/1, enable/1, disable/1]).
-export([put_location/3, get_location/1, get_address/3]).
Expand Down Expand Up @@ -234,6 +234,41 @@ create_device(#{<<"status">> := Status, <<"brand">> := Brand, <<"devModel">> :=
dgiot_parse:create_object(<<"Device">>, maps:without([<<"brand">>, <<"devModel">>], NewDevice))
end.

create_device(ProductId, DeviceAddr, Ip) ->
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DeviceAddr),
case dgiot_device:lookup(DeviceId) of
{ok, _} ->
Body = #{<<"status">> => <<"ONLINE">>},
dgiot_device:online(DeviceId),
dgiot_parse:update_object(<<"Device">>, DeviceId, Body);
_ ->
case dgiot_product:lookup_prod(ProductId) of
not_find ->
pass;
#{<<"ACL">> := Acl, <<"name">> := Name, <<"devType">> := DevType, <<"dynamicReg">> := true} ->
<<DeviceSecret:10/binary, _/binary>> = dgiot_utils:to_md5(dgiot_utils:random()),
Device = #{
<<"ip">> => Ip,
<<"status">> => <<"ONLINE">>,
<<"deviceSecret">> => DeviceSecret,
<<"isEnable">> => true,
<<"brand">> => Name,
<<"devModel">> => DevType,
<<"name">> => DeviceAddr,
<<"devaddr">> => DeviceAddr,
<<"product">> => #{
<<"__type">> => <<"Pointer">>,
<<"className">> => <<"Product">>,
<<"objectId">> => ProductId
},
<<"ACL">> => Acl
},
dgiot_device:create_device(Device);
_ ->
pass
end
end.

get(ProductId, DevAddr) ->
Keys = [<<"objectId">>, <<"status">>, <<"isEnable">>],
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
Expand Down
13 changes: 12 additions & 1 deletion apps/dgiot_device/src/dgiot_product.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
-export([create_product/1, create_product/2, add_product_relation/2, delete_product_relation/1]).
-export([get_prop/1, get_props/1, get_props/2, get_unit/1, update_properties/2, update_properties/0]).
-export([update_topics/0, update_product_filed/1]).
-export([save_devicetype/1, get_devicetype/1, get_device_thing/2]).
-export([save_devicetype/1, get_devicetype/1, get_device_thing/2, get_productSecret/1]).
-export([save_keys/1, get_keys/1, get_control/1, save_control/1, get_interval/1, save_device_thingtype/1]).

init_ets() ->
Expand Down Expand Up @@ -74,6 +74,7 @@ save(Product) ->
save_keys(ProductId),
save_control(ProductId),
save_devicetype(ProductId),
save_productSecret(ProductId),
dgiot_product_channel:save_channel(ProductId),
dgiot_product_channel:save_tdchannel(ProductId),
dgiot_product_channel:save_taskchannel(ProductId),
Expand Down Expand Up @@ -103,6 +104,16 @@ get(ProductId) ->
{error, Reason}
end.

save_productSecret(ProductId) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"productSecret">> := ProductSecret}} ->
dgiot_data:insert({productSecret,ProductId}, ProductSecret);
_ ->
pass
end.

get_productSecret(ProductId) ->
dgiot_data:get({productSecret, ProductId}).

%% 保存配置下发控制字段
save_control(ProductId) ->
Expand Down
85 changes: 85 additions & 0 deletions apps/dgiot_dlink/src/dgiot_dlink_channel.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2020-2021 DGIOT Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(dgiot_dlink_channel).
-behavior(dgiot_channelx).
-define(TYPE, <<"DLINK">>).
-author("johnliu").
-record(state, {id}).
-include_lib("dgiot_bridge/include/dgiot_bridge.hrl").
-include_lib("dgiot/include/logger.hrl").

-export([start/2]).
-export([init/3, handle_event/3, handle_message/2, handle_init/1, stop/3]).


%% 注册通道类型
-channel_type(#{
cType => ?TYPE,
type => ?PROTOCOL_CHL,
title => #{
zh => <<"Dlink采集通道"/utf8>>
},
description => #{
zh => <<"Dlink采集通道"/utf8>>
}
}).
%% 注册通道参数
-params(#{
<<"ico">> => #{
order => 102,
type => string,
required => false,
default => <<"/dgiot_file/shuwa_tech/zh/product/dgiot/channel/dgiot_dlink_channel.png">>,
title => #{
en => <<"channel ICO">>,
zh => <<"通道ICO"/utf8>>
},
description => #{
en => <<"channel ICO">>,
zh => <<"通道ICO"/utf8>>
}
}
}).

start(ChannelId, ChannelArgs) ->
dgiot_channelx:add(?TYPE, ChannelId, ?MODULE, ChannelArgs).

%% 通道初始化
init(?TYPE, ChannelId, _ChannelArgs) ->
State = #state{id = ChannelId},
{ok, State}.

handle_init(State) ->
{ok, State}.

%% 通道消息处理,注意:进程池调用
handle_event(_EventId, _Event, State) ->
{ok, State}.

handle_message({mqtt_login, do_after, ProductId, DeviceAddr, Ip}, State) ->
io:format("~s ~p DeviceAddr ~p ~n",[?FILE, ?LINE, DeviceAddr]),
dgiot_device:create_device(ProductId, DeviceAddr, Ip),
{ok, State};

handle_message(_Message, State) ->
%% io:format("~s ~p _Message = ~p.~n", [?FILE, ?LINE, _Message]),
{ok, State}.

stop(_ChannelType, _ChannelId, _State) ->
ok.


15 changes: 12 additions & 3 deletions apps/dgiot_dlink/src/mock/dgiot_mock_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,22 @@ handle_init(State) ->
handle_event(_EventId, _Event, State) ->
{ok, State}.

handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, #{<<"profile">> := #{<<"mock">> := #{<<"enable">> := true}}, <<"objectId">> := DeviceId}},
handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, #{<<"profile">> := #{<<"mock">> := #{
<<"enable">> := true, <<"transport">> := <<"mqtt">>} = Mock}, <<"objectId">> := DeviceId}},
#state{id = ChannelId} = State) ->
dgiot_client:start(ChannelId, DeviceId, #{}),
%% io:format("~s ~p DeviceId ~p Mock ~p", [?FILE, ?LINE, DeviceId, Mock]),
dgiot_mock_mqtt:start(ChannelId, DeviceId, Mock),
{ok, State};

handle_message({sync_parse, _Pid, 'after', put, _Token, <<"Device">>, #{<<"profile">> := #{<<"mock">> := #{<<"enable">> := false}}, <<"objectId">> := DeviceId}},
#state{id = ChannelId} = State) ->
dgiot_client:stop(ChannelId, DeviceId),
%% io:format("~s ~p DeviceId ~p", [?FILE, ?LINE, DeviceId]),
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := DevAddr, <<"productid">> := ProductId}} ->
dgiot_client:stop(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>, DevAddr);
_ ->
pass
end,
{ok, State};

handle_message(_Message, State) ->
Expand All @@ -89,3 +97,4 @@ handle_message(_Message, State) ->
stop(_ChannelType, _ChannelId, _State) ->
ok.


38 changes: 37 additions & 1 deletion apps/dgiot_dlink/src/mock/dgiot_mock_mqtt.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
-include_lib("dgiot/include/logger.hrl").
-include_lib("dgiot/include/dgiot_client.hrl").

-export([childspec/2]).
-export([childspec/2,start/3]).

%% API
-export([init/1, handle_cast/2, handle_call/3, handle_info/2, terminate/2, code_change/3]).
Expand Down Expand Up @@ -86,3 +86,39 @@ code_change(_OldVsn, Dclient, _Extra) ->

update(ChannelId) ->
dgiot_data:insert({<<"mqtt_online">>, dlink_metrics}, dgiot_client:count(ChannelId)).


start(ChannelId, DeviceId, #{<<"auth">> := <<"ProductSecret">>}) ->
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := DevAddr, <<"productid">> := ProductId}} ->

Options = #{
host => "127.0.0.1",
port => 1883,
ssl => false,
username => binary_to_list(ProductId),
password => binary_to_list(dgiot_product:get_productSecret(ProductId)),
clean_start => false
},
%% io:format("~s ~p DeviceId ~p DevAddr ~p ", [?FILE, ?LINE, DeviceId, DevAddr]),
dgiot_client:start(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>, #{<<"options">> => Options});
_ ->
#{}
end;

start(ChannelId, DeviceId, #{<<"auth">> := <<"DeviceSecret">>}) ->
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := DevAddr, <<"productid">> := ProductId, <<"devicesecret">> := DeviceSecret}} ->
Options = #{
host => "127.0.0.1",
port => 1883,
ssl => false,
username => binary_to_list(ProductId),
password => binary_to_list(DeviceSecret),
clean_start => false
},
%% io:format("~s ~p DeviceId ~p DevAddr ~p ", [?FILE, ?LINE, DeviceId, DevAddr]),
dgiot_client:start(ChannelId, <<ProductId/binary, "_", DevAddr/binary>>, #{<<"options">> => Options});
_ ->
#{}
end.
42 changes: 9 additions & 33 deletions apps/dgiot_dlink/src/proctol/dgiot_mqtt_auth.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,9 @@
, description/0
]).

check(#{ peerhost := PeerHost}, AuthResult, _) when PeerHost == {127,0,0,1} ->
check(#{peerhost := PeerHost, username := <<"dgiot">> }, AuthResult, _) when PeerHost == {127, 0, 0, 1} ->
{ok, AuthResult#{anonymous => false, auth_result => success}};

check(#{username := Username}, AuthResult, _)
when Username == <<"anonymous">> orelse Username == undefined orelse Username == <<>> ->
%% io:format("~s ~p Username: ~p~n", [?FILE, ?LINE, Username]),
{ok, AuthResult#{anonymous => true, auth_result => success}};

check(#{username := <<"dgiot">>, password := Password}, AuthResult, _) ->
%% io:format("~s ~p Password: ~p~n", [?FILE, ?LINE, Password]),
SuperPwd = dgiot_utils:to_binary(dgiot:get_env(dgiot_dlink, super_pwd, <<"">>)),
Expand All @@ -44,6 +39,11 @@ check(#{username := <<"dgiot">>, password := Password}, AuthResult, _) ->
{stop, AuthResult#{anonymous => false, auth_result => password_error}}
end;

check(#{username := Username}, AuthResult, _)
when Username == <<"anonymous">> orelse Username == undefined orelse Username == <<>> ->
%% io:format("~s ~p Username: ~p~n", [?FILE, ?LINE, Username]),
{ok, AuthResult#{anonymous => true, auth_result => success}};

%% 当 clientid 和 password 为token 且相等的时候为用户登录
check(#{clientid := <<Token:34/binary, _Type/binary>>, username := UserId, password := Token}, AuthResult, #{hash_type := _HashType}) ->
%% io:format("~s ~p UserId: ~p~n", [?FILE, ?LINE, UserId]),
Expand Down Expand Up @@ -79,31 +79,7 @@ description() -> "Authentication with Mnesia".
do_check(AuthResult, Password, ProductID, DeviceAddr, DeviceId, Ip) ->
Result =
case dgiot_product:lookup_prod(ProductID) of
{ok, #{<<"productSecret">> := Password} = Product} ->
case dgiot_device:lookup(DeviceId) of
{ok, _} ->
Body = #{
<<"status">> => <<"ONLINE">>},
dgiot_device:online(DeviceId),
dgiot_parse:update_object(<<"Device">>, DeviceId, Body);
_ ->
case Product of
#{<<"ACL">> := Acl, <<"name">> := Name, <<"devType">> := DevType, <<"dynamicReg">> := true} ->
Device = #{
<<"ip">> => Ip,
<<"status">> => <<"ONLINE">>,
<<"brand">> => Name,
<<"devModel">> => DevType,
<<"name">> => DeviceAddr,
<<"devaddr">> => DeviceAddr,
<<"product">> => ProductID,
<<"ACL">> => Acl
},
dgiot_device:create_device(Device);
_ ->
pass
end
end,
{ok, #{<<"productSecret">> := Password}} ->
{stop, AuthResult#{anonymous => false, auth_result => success}};
_ ->
case dgiot_device:lookup(DeviceId) of
Expand All @@ -117,8 +93,8 @@ do_check(AuthResult, Password, ProductID, DeviceAddr, DeviceId, Ip) ->
case Result of
{stop, #{auth_result := success}} ->
lists:map(fun
({ChannelId, _Ctype}) ->
dgiot_channelx:do_message(ChannelId, {dlink_login, do_after, ProductID, DeviceAddr, DeviceId, Ip});
({ChannelId, <<"DLINK">>}) ->
dgiot_channelx:do_message(ChannelId, {mqtt_login, do_after, ProductID, DeviceAddr, Ip});
(_) ->
pass
end, dgiot_bridge:get_proctol_channel(ProductID));
Expand Down
27 changes: 3 additions & 24 deletions apps/dgiot_dlink/src/proctol/dgiot_mqtt_message.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, pa
%% 初始化请求 $dg/thing/{productId}/{deviceAddr}/init/request
dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));
[ProductId, DevAddr, <<"properties">>, <<"report">>] ->
%% 属性获取 $dg/thing/{productId}/{deviceAddr}/properties/report 设备 平台
%% 属性获取 $dg/thing/{productId}/{deviceAddr}/properties/report 设备 => 平台
DeviceId = dgiot_parse_id:get_deviceid(ProductId, DevAddr),
Ts = dgiot_datetime:now_secs(),
case dgiot_data:get(?DGIOT_DLINK_REQUEST_ID, DeviceId) of
Expand All @@ -52,47 +52,26 @@ on_message_publish(Message = #message{topic = <<"$dg/thing/", Topic/binary>>, pa
[ProductId, DevAddr, <<"firmware">>, <<"report">>] ->
dgiot_dlink_proctol:firmware_report(ProductId, DevAddr, get_payload(Payload));
[DeviceId, <<"properties">>, <<"get">>, <<"request_id=", Request_id/binary>>] ->
%% 属性获取 $dg/thing/{deviceId}/properties/get/request_id={request_id} 用户 平台
%% 属性获取 $dg/thing/{deviceId}/properties/get/request_id={request_id} 用户 => 平台
case dgiot_device:lookup(DeviceId) of
{ok, #{<<"devaddr">> := Devaddr, <<"productid">> := ProductId}} ->
%% 属性获取 $dg/device/{productId}/{deviceAddr}/properties 平台 设备
%% 属性获取 $dg/device/{productId}/{deviceAddr}/properties 平台 => 设备
RequestTopic = <<"$dg/device/", ProductId/binary, "/", Devaddr/binary, "/properties">>,
dgiot_data:insert(?DGIOT_DLINK_REQUEST_ID, DeviceId, {dgiot_datetime:now_secs(), Request_id}),
dgiot_mqtt:publish(DeviceId, RequestTopic, Payload);
_ ->
pass
end;
[<<"uniapp">>, Token, <<"report">>] ->
%% <<"$dg/thing/uniapp/r:e5186aba099ce35105ba811e80bdaefa/report">>
dgiot_hook:run_hook({uniapp, report}, {Token, get_payload(Payload)});
_ ->
%% io:format("~s ~p Payload = ~p.~n", [?FILE, ?LINE, get_payload(Payload)]),
pass
end,
{ok, Message};

on_message_publish(Message = #message{topic = _Topic, payload = _Payload}, _State) ->
%% io:format("~s ~p Topic ~p Payload = ~p.~n", [?FILE, ?LINE,Topic, get_payload(Payload)]),
%% ignore topics starting with $
{ok, Message}.

%%on_message_delivered(#{}, #message{topic = <<$$, _Rest/binary>>}, _State) ->
%% %% ignore topics starting with $
%% ok;
%%on_message_delivered(#{clientid := ClientId, username := Username},
%% Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = Flags = #{retain := Retain}},
%% _State) ->
%% ok.
%%
%%on_message_acked(#{}, #message{topic = <<$$, _Rest/binary>>}, _State) ->
%% %% ignore topics starting with $
%% ok;
%%on_message_acked(#{clientid := ClientId, username := Username},
%% Message = #message{topic = Topic, payload = Payload, qos = QoS, flags = #{retain := Retain}}, _State) ->
%% ?LOG(debug, "Message acked by client(~s): ~s~n",
%% [ClientId, emqx_message:format(Message)]),
%% ok.

get_payload(Payload) ->
%% io:format("~s ~p Payload: ~p~n", [?FILE, ?LINE, Payload]),
case jsx:is_json(Payload) of
Expand Down
Loading

0 comments on commit 01fb404

Please sign in to comment.