Skip to content

Commit

Permalink
feat: Read the meter in turn
Browse files Browse the repository at this point in the history
  • Loading branch information
AvantLiu committed Mar 8, 2022
1 parent dba1c6c commit f3450e4
Show file tree
Hide file tree
Showing 8 changed files with 213 additions and 59 deletions.
16 changes: 8 additions & 8 deletions apps/dgiot_device/src/dgiot_device.erl
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,14 @@ sync_parse(OffLine) ->
{[_, Last, Acl, Devaddr, ProductId, DeviceSecret], Node} when (Now - Last) < 0 ->
case dgiot_parse:update_object(<<"Device">>, DeviceId, #{<<"status">> => <<"ONLINE">>}) of
{ok, _R} ->
Productname =
case dgiot_parse:get_object(<<"Product">>, ProductId) of
{ok, #{<<"name">> := Productname1}} ->
Productname1;
_ ->
<<"">>
end,
?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => Devaddr, <<"productid">> => ProductId, <<"productname">> => Productname, <<"status">> => <<"上线"/utf8>>}, ['device_statuslog']),
%% Productname =
%% case dgiot_parse:get_object(<<"Product">>, ProductId) of
%% {ok, #{<<"name">> := Productname1}} ->
%% Productname1;
%% _ ->
%% <<"">>
%% end,
%% ?MLOG(info, #{<<"deviceid">> => DeviceId, <<"devaddr">> => Devaddr, <<"productid">> => ProductId, <<"productname">> => Productname, <<"status">> => <<"上线"/utf8>>}, ['device_statuslog']),
dgiot_mnesia:insert(DeviceId, {[true, Now, Acl, Devaddr, ProductId, DeviceSecret], Node});
_ ->
pass
Expand Down
4 changes: 2 additions & 2 deletions apps/dgiot_meter/src/dgiot_meter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ create_meter(MeterAddr, ChannelId, DTUIP, DtuId, DtuAddr) ->
pass
end.


create_meter4G(MeterAddr, MDa, ChannelId, DTUIP, DtuId, DtuAddr) ->
case dgiot_data:get({meter, ChannelId}) of
{ProductId, ACL, _Properties} ->
Expand All @@ -122,6 +121,8 @@ create_meter4G(MeterAddr, MDa, ChannelId, DTUIP, DtuId, DtuAddr) ->
<<"devModel">> => <<"Meter">>
},
dgiot_device:create_device(Requests),
DeviceId = dgiot_parse:get_deviceid(ProductId, MeterAddr),
dgiot_data:insert({metetda, DeviceId}, {dgiot_utils:to_binary(MDa), DtuAddr}),
Topic = <<"profile/", ProductId/binary, "/", MeterAddr/binary>>,
dgiot_mqtt:subscribe(Topic),
{DtuProductId, _, _} = dgiot_data:get({dtu, ChannelId}),
Expand All @@ -130,7 +131,6 @@ create_meter4G(MeterAddr, MDa, ChannelId, DTUIP, DtuId, DtuAddr) ->
pass
end.


create_meter4G(DevAddr, ChannelId, DTUIP) ->
case dgiot_data:get({dtu, ChannelId}) of
{ProductId, ACL, _Properties} ->
Expand Down
169 changes: 154 additions & 15 deletions apps/dgiot_meter/src/dgiot_meter_tcp.erl

Large diffs are not rendered by default.

10 changes: 6 additions & 4 deletions apps/dgiot_meter/src/proctol/dlt376_decoder.erl
Original file line number Diff line number Diff line change
Expand Up @@ -660,19 +660,18 @@ process_message(Frames, ChannelId, DTUIP, DtuId) ->
[#{<<"afn">> := 16#0A, <<"di">> := <<16#00, 16#00, 16#02, 16#01>>, <<"addr">> := DevAddr, <<"value">> := Value} | _] = Frames,
lists:map(fun(#{<<"addr">> := MeterAddr, <<"da">> := Da}) ->
MAddr = dgiot_utils:binary_to_hex(MeterAddr),
dgiot_data:insert({metetda, MAddr}, dgiot_utils:to_binary(Da)),
dgiot_meter:create_meter4G(MAddr, dgiot_utils:to_binary(Da), ChannelId, DTUIP, DtuId, DevAddr),
timer:sleep(1 * 1000)
end, Value).

send_childvalue(DeviceId, ChildValue) ->
case dgiot_parse:query_object(<<"Device">>, #{<<"where">> => #{<<"parentId">> => DeviceId}}) of
{ok, #{<<"results">> := ChildDevices}} ->
lists:foldl(fun(#{<<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}, _Acc) ->
case dgiot_data:get({metetda, Devaddr}) of
lists:foldl(fun(#{<<"objectId">> := ChildId, <<"devaddr">> := Devaddr, <<"product">> := #{<<"objectId">> := ProductId}}, _Acc) ->
case dgiot_data:get({metetda, ChildId}) of
not_find ->
pass;
Da ->
{Da, _Dtuaddr} ->
DA = dgiot_utils:binary_to_hex(dlt376_decoder:pn_to_da(dgiot_utils:to_int(Da))),
case maps:find(DA, ChildValue) of
error ->
Expand Down Expand Up @@ -761,6 +760,7 @@ pn_to_da(Pn) ->
%% ConAddr = <<"000033010048">>,
frame_write_param(#{<<"concentrator">> := ConAddr, <<"payload">> := Frame}) ->
Length = length(maps:keys(Frame)),
%% io:format("~s ~p SortFrame ~p.~n", [?FILE, ?LINE, Length]),
{BitList, Afn, Da, Fn} =
lists:foldl(fun(Index, {Acc, A, D, F}) ->
case maps:find(dgiot_utils:to_binary(Index), Frame) of
Expand All @@ -770,7 +770,9 @@ frame_write_param(#{<<"concentrator">> := ConAddr, <<"payload">> := Frame}) ->
{Acc, A, D, F}
end
end, {[], 0, <<>>, <<>>}, lists:seq(1, Length)),
%% io:format("~s ~p BitList = ~p.~n", [?FILE, ?LINE, BitList]),
UserZone = <<<<V:BitLen>> || {V, BitLen} <- BitList>>,
%% io:format("~s ~p UserZone ~p. Afn ~p ~n", [?FILE, ?LINE, dgiot_utils:binary_to_hex(UserZone), Afn]),
UserData = add_to_userzone(UserZone, Afn, Fn),
dlt376_decoder:to_frame(#{<<"command">> => 16#4B,
<<"addr">> => concentrator_to_addr(ConAddr),
Expand Down
13 changes: 12 additions & 1 deletion apps/dgiot_task/src/dgiot_instruct.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
createsub/6,
create/6,
create_group/7,
get_instruct/4
get_instruct/4,
get_child_instruct/3
]).

createsub(ProductId, DeviceId, DtuAddr, ACL, Rotation, #{<<"parentDtu">> := ParentDtu}) ->
Expand Down Expand Up @@ -224,6 +225,16 @@ get_instruct(ProductId, Round) ->
[]
end.

get_child_instruct(DeviceId, Round, thing) ->
case dgiot_parse:query_object(<<"Device">>, #{<<"where">> => #{<<"parentId">> => DeviceId}}) of
{ok, #{<<"results">> := ChildDevices}} ->
lists:foldl(fun(#{<<"product">> := #{<<"objectId">> := ProductId}}, Acc) ->
Acc ++ dgiot_instruct:get_instruct(ProductId, DeviceId, Round, thing)
end, [], ChildDevices);
_ ->
[]
end.

get_que(_ProductId, DeviceId, Round) ->
case dgiot_data:get({instuct, DeviceId}) of
not_find ->
Expand Down
6 changes: 6 additions & 0 deletions apps/dgiot_task/src/dgiot_task_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -262,5 +262,11 @@ stop(_ChannelType, ChannelId, #state{env = #{<<"product">> := ProductId, <<"args
dgiot_task:stop(Args#{<<"product">> => ProductId, <<"channel">> => ChannelId})
end),
%% ?LOG(warning, "channel stop ~p,~p", [ChannelType, ChannelId]),
ok;


stop(_ChannelType, _ChannelId, _State) ->

ok.


44 changes: 20 additions & 24 deletions apps/dgiot_task/src/dgiot_task_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -66,20 +66,17 @@ init([#{<<"app">> := App, <<"channel">> := ChannelId, <<"dtuid">> := DtuId, <<"m
?LOG(info, "not_find ~p", [DtuId]),
pass;
{ProductId, DevAddr} ->
%% io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, DtuId]),
DeviceId = dgiot_parse:get_deviceid(ProductId, DevAddr),
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, 1, dgiot_utils:to_atom(thing)),
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, 1, dgiot_utils:to_atom(Mode)),
%% ChildQue = dgiot_instruct:get_child_instruct(DeviceId, 1, dgiot_utils:to_atom(Mode)),
Tsendtime = dgiot_datetime:localtime_to_unixtime(dgiot_datetime:to_localtime(Endtime)),
Nowstamp = dgiot_datetime:nowstamp(),
case length(Que) of
0 ->
erlang:send_after(300, self(), stop);
_ ->
case Tsendtime > Nowstamp of
true ->
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after(300, self(), stop)
end
case Tsendtime > Nowstamp of
true ->
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after(300, self(), stop)
end,
Topic = <<"thing/", ProductId/binary, "/", DevAddr/binary, "/post">>,
dgiot_mqtt:subscribe(Topic),
Expand All @@ -106,6 +103,10 @@ handle_info({'EXIT', _From, Reason}, State) ->
erlang:garbage_collect(self()),
{stop, Reason, State};

handle_info(stop, State) ->
erlang:garbage_collect(self()),
{stop, normal, State};

handle_info(init, #task{dtuid = DtuId, mode = Mode, round = Round, ts = Oldstamp, freq = Freq, endtime = Tsendtime} = State) ->
dgiot_datetime:now_secs(),
%% io:format("~s ~p DtuId = ~p.~n", [?FILE, ?LINE, DtuId]),
Expand All @@ -119,21 +120,16 @@ handle_info(init, #task{dtuid = DtuId, mode = Mode, round = Round, ts = Oldstamp
Que = dgiot_instruct:get_instruct(ProductId, DeviceId, NewRound, dgiot_utils:to_atom(Mode)),
Nowstamp = dgiot_datetime:nowstamp(),
Newfreq = Nowstamp - Oldstamp,
case length(Que) of
0 ->
erlang:send_after(300, self(), stop);
_ ->
case Tsendtime > Nowstamp of
case Tsendtime > Nowstamp of
true ->
case Newfreq > Freq of
true ->
case Newfreq > Freq of
true ->
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after((Freq - Newfreq) * 1000, self(), retry)
end;
erlang:send_after(1000, self(), retry);
false ->
erlang:send_after(300, self(), stop)
end
erlang:send_after((Freq - Newfreq) * 1000, self(), retry)
end;
false ->
erlang:send_after(300, self(), stop)
end,
{noreply, State#task{product = ProductId, devaddr = DevAddr, round = NewRound, firstid = DeviceId, que = Que, ts = Nowstamp}}
end;
Expand Down
10 changes: 5 additions & 5 deletions apps/dgiot_tdengine/src/dgiot_tdengine.erl
Original file line number Diff line number Diff line change
Expand Up @@ -370,12 +370,12 @@ get_chartdata(Channel, TableName, Query) ->
transaction(Channel,
fun(Context) ->
Database = maps:get(<<"db">>, Query),
Function = maps:get(<<"function">>, Query),
Keys = maps:get(<<"keys">>, Query),
Function = maps:get(<<"function">>, Query, <<"last">>),
Keys = maps:get(<<"keys">>, Query, <<"*">>),
Limit = format_limit(Query),
Interval = maps:get(<<"interval">>, Query),
Starttime = maps:get(<<"starttime">>, Query),
Endtime = maps:get(<<"endtime">>, Query),
Interval = maps:get(<<"interval">>, Query, <<"1d">>),
Starttime = maps:get(<<"starttime">>, Query, dgiot_datetime:now_ms() - 604800000),
Endtime = maps:get(<<"endtime">>, Query, dgiot_datetime:now_ms()),
{Names, Newkeys} = get_keys(Database, Function, Keys),
DB = format_db(?Database(Database)),
Tail = <<" where createdat >= ", Starttime/binary, " AND createdat <= ", Endtime/binary, " INTERVAL(", Interval/binary, ") ", Limit/binary, ";">>,
Expand Down

0 comments on commit f3450e4

Please sign in to comment.