Skip to content

Commit

Permalink
feat:send_realtimedata
Browse files Browse the repository at this point in the history
  • Loading branch information
AvantLiu committed Dec 23, 2021
1 parent 591a444 commit 4664414
Show file tree
Hide file tree
Showing 5 changed files with 185 additions and 61 deletions.
3 changes: 2 additions & 1 deletion apps/dgiot_evidence/src/handler/dgiot_evidence_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -984,7 +984,6 @@ post_report(#{<<"name">> := Name, <<"product">> := ProductId, <<"parentId">> :=
<<"class">> => <<"Device">>}
}]
end, [], Dicts),
io:format("DictRequests ~p~n", [DictRequests]),
dgiot_parse:batch(DictRequests);
_R3 ->
?LOG(info, "R1 ~p", [_R3])
Expand Down Expand Up @@ -1015,6 +1014,8 @@ arrtojsonlist(_Data) ->
#{}.

python_drawxnqx(TaskId, NewData) ->
io:format("~s ~p TaskId = ~p.~n", [?FILE, ?LINE, TaskId]),
io:format("~s ~p NewData = ~p.~n", [?FILE, ?LINE, NewData]),
PythonBody = #{<<"name">> => <<TaskId/binary, ".png">>, <<"data">> => NewData, <<"path">> => <<"/data/dgiot/go_fastdfs/files/dgiot_file/pump_pytoh/">>},
Imagepath =
case catch base64:decode(os:cmd("python3 /data/dgiot/dgiot/lib/dgiot_evidence-4.3.0/priv/python/drawxnqx.py " ++ dgiot_utils:to_list(base64:encode(jsx:encode(PythonBody))))) of
Expand Down
100 changes: 50 additions & 50 deletions apps/dgiot_task/src/dgiot_task.erl
Original file line number Diff line number Diff line change
Expand Up @@ -198,56 +198,56 @@ get_calculated(ProductId, Ack) ->
get_collection(ProductId, [], Payload, Ack) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(X, Acc2) ->
case Acc2 of
error ->
Acc2;
_ ->
case X of
#{<<"dataForm">> := #{<<"strategy">> := Strategy},
<<"dataType">> := #{<<"type">> := <<"geopoint">>},
<<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> ->
case maps:find(Identifier, Payload) of
{ok, Value} ->
Addr = dgiot_topo:get_gpsaddr(Value),
dgiot_data:insert({topogps, dgiot_parse:get_shapeid(ProductId, Identifier)}, Addr),
Acc2#{Identifier => Value};
_ ->
dgiot_data:insert({topogps, dgiot_parse:get_shapeid(ProductId, Identifier)}, <<"无GPS信息"/utf8>>),
Acc2
end;
#{<<"dataForm">> := #{<<"address">> := Address, <<"strategy">> := Strategy, <<"collection">> := Collection},
<<"dataType">> := #{<<"type">> := Type, <<"specs">> := Specs},
<<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> ->
case maps:find(Identifier, Payload) of
{ok, Value} ->
Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
case string2value(Str1, Type, Specs) of
error ->
maps:without([Identifier], Acc2);
Value1 ->
Acc2#{Identifier => Value1}
end;
_ ->
case maps:find(Address, Payload) of
{ok, Value} ->
Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
case string2value(Str1, Type, Specs) of
error ->
maps:without([Identifier], Acc2);
Value1 ->
Acc2#{Identifier => Value1}
end;
_ -> Acc2
end
end;
_ ->
Acc2
end
end
end, Ack, Props);
lists:foldl(fun(X, Acc2) ->
case Acc2 of
error ->
Acc2;
_ ->
case X of
#{<<"dataForm">> := #{<<"strategy">> := Strategy},
<<"dataType">> := #{<<"type">> := <<"geopoint">>},
<<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> ->
case maps:find(Identifier, Payload) of
{ok, Value} ->
Addr = dgiot_topo:get_gpsaddr(Value),
dgiot_data:insert({topogps, dgiot_parse:get_shapeid(ProductId, Identifier)}, Addr),
Acc2#{Identifier => Value};
_ ->
dgiot_data:insert({topogps, dgiot_parse:get_shapeid(ProductId, Identifier)}, <<"无GPS信息"/utf8>>),
Acc2
end;
#{<<"dataForm">> := #{<<"address">> := Address, <<"strategy">> := Strategy, <<"collection">> := Collection},
<<"dataType">> := #{<<"type">> := Type, <<"specs">> := Specs},
<<"identifier">> := Identifier} when Strategy =/= <<"计算值"/utf8>> ->
case maps:find(Identifier, Payload) of
{ok, Value} ->
Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
case string2value(Str1, Type, Specs) of
error ->
maps:without([Identifier], Acc2);
Value1 ->
Acc2#{Identifier => Value1}
end;
_ ->
case maps:find(Address, Payload) of
{ok, Value} ->
Str = re:replace(Collection, dgiot_utils:to_list(<<"%%", Identifier/binary>>), "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
Str1 = re:replace(Str, "%s", "(" ++ dgiot_utils:to_list(Value) ++ ")", [global, {return, list}]),
case string2value(Str1, Type, Specs) of
error ->
maps:without([Identifier], Acc2);
Value1 ->
Acc2#{Identifier => Value1}
end;
_ -> Acc2#{Identifier => 0}
end
end;
_ ->
Acc2
end
end
end, Ack, Props);
_Error ->
Ack
end;
Expand Down
14 changes: 7 additions & 7 deletions apps/dgiot_tdengine/src/handler/dgiot_tdengine_handler.erl
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
-include_lib("dgiot/include/logger.hrl").

%% API
-export([swagger_tdengine/0, get_props/1]).
-export([swagger_tdengine/0, get_props/1, get_time/2]).
-export([handle/4]).


Expand Down Expand Up @@ -395,12 +395,12 @@ get_app(ProductId, Results, DeviceId) ->
{Type5, <<Url/binary, "/dgiot_file/", DeviceId/binary, "/", BinV/binary, ".", Imagevalue/binary>>, Unit1, Ico1, Devicetype1};
Type6 when Type6 == <<"date">> ->
V1 =
case V of
<<"1970-01-01 08:00:00.000">> ->
<<"--">>;
_->
V
end,
case V of
<<"1970-01-01 08:00:00.000">> ->
<<"--">>;
_ ->
V
end,
Unit1 = maps:get(<<"unit">>, Specs, <<"">>),
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
{Type6, V1, Unit1, Ico1, Devicetype1};
Expand Down
125 changes: 123 additions & 2 deletions apps/dgiot_topo/src/dgiot_topo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
-author("johnliu").
-include_lib("dgiot/include/logger.hrl").

-export([docroot/0, get_topo/2, send_topo/3, get_Product/0, get_name/3, put_topo/2, get_konva_thing/2, edit_konva/2, push/4, get_gpsaddr/1]).
-export([docroot/0, get_topo/2, send_topo/3, send_realtimedata/3, get_Product/0, get_name/3, put_topo/2, get_konva_thing/2, edit_konva/2, push/4, get_gpsaddr/1]).

docroot() ->
{file, Here} = code:is_loaded(?MODULE),
Expand Down Expand Up @@ -373,7 +373,6 @@ send_topo(ProductId, DeviceId, Payload) ->
Pubtopic = <<"thing/", DeviceId/binary, "/post">>,
dgiot_mqtt:publish(self(), Pubtopic, Base64).


get_optshape(ProductId, DeviceId, Payload) ->
Topo =
case dgiot_data:get({toponotext, ProductId}) of
Expand Down Expand Up @@ -447,3 +446,125 @@ get_gpsaddr(V) ->
_ ->
<<"无GPS信息"/utf8>>
end.

send_realtimedata(ProductId, DeviceId, Payload) ->
Base64 = get_realtimedata(ProductId, DeviceId, Payload),
Pubtopic = <<"thing/", DeviceId/binary, "/realtimedata/post">>,
dgiot_mqtt:publish(self(), Pubtopic, Base64).

get_realtimedata(ProductId, DeviceId, Payload) ->
Maps = get_prop(ProductId),
Props = get_props(ProductId),
Data =
maps:fold(fun(K, V, Acc) ->
Time = dgiot_datetime:now_secs(),
NewTime = dgiot_tdengine_handler:get_time(dgiot_utils:to_binary(Time), <<"111">>),
case maps:find(K, Maps) of
error ->
Acc;
{ok, Name} ->
{Type, NewV, Unit, Ico, Devicetype} =
case maps:find(K, Props) of
error ->
{V, <<"">>, <<"">>, <<"others">>};
{ok, #{<<"dataType">> := #{<<"type">> := Typea} = DataType} = Prop} ->
Devicetype1 = maps:get(<<"devicetype">>, Prop, <<"others">>),
Specs = maps:get(<<"specs">>, DataType, #{}),
case Typea of
Type1 when Type1 == <<"enum">>; Type1 == <<"bool">> ->
Value = maps:get(dgiot_utils:to_binary(V), Specs, V),
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
{Type1, Value, <<"">>, Ico1, Devicetype1};
Type2 when Type2 == <<"struct">> ->
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
{Type2, V, <<"">>, Ico1, Devicetype1};
Type3 when Type3 == <<"geopoint">> ->
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
BinV = dgiot_utils:to_binary(V),
Addr =
case binary:split(BinV, <<$_>>, [global, trim]) of
[Longitude, Latitude] ->
case dgiot_gps:get_baidu_addr(Longitude, Latitude) of
#{<<"baiduaddr">> := #{<<"formatted_address">> := FormattedAddress}} ->
case dgiot_parse:get_object(<<"Device">>, DeviceId) of
{ok, #{<<"location">> := #{<<"__type">> := <<"GeoPoint">>, <<"longitude">> := Longitude, <<"latitude">> := Latitude}}} ->
pass;
{ok, #{<<"detail">> := Detail}} ->
dgiot_parse:update_object(<<"Device">>, DeviceId, #{
<<"location">> => #{<<"__type">> => <<"GeoPoint">>, <<"longitude">> => Longitude, <<"latitude">> => Latitude},
<<"detail">> => Detail#{<<"address">> => FormattedAddress}});
_ ->
pass
end,
FormattedAddress;
_ ->
<<"[", BinV/binary, "]经纬度解析错误"/utf8>>
end;
_ ->
<<"无GPS信息"/utf8>>
end,
{Type3, Addr, <<"">>, Ico1, Devicetype1};
Type4 when Type4 == <<"float">>; Type4 == <<"double">> ->
Unit1 = maps:get(<<"unit">>, Specs, <<"">>),
Precision = maps:get(<<"precision">>, Specs, 3),
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
{Type4, dgiot_utils:to_float(V, Precision), Unit1, Ico1, Devicetype1};
Type5 when Type5 == <<"image">> ->
AppName = dgiot_device:get_appname(DeviceId),
Url = dgiot_device:get_url(AppName),
Unit1 = maps:get(<<"unit">>, Specs, <<"">>),
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
Imagevalue = maps:get(<<"imagevalue">>, DataType, <<"">>),
BinV = dgiot_utils:to_binary(V),
{Type5, <<Url/binary, "/dgiot_file/", DeviceId/binary, "/", BinV/binary, ".", Imagevalue/binary>>, Unit1, Ico1, Devicetype1};
Type6 when Type6 == <<"date">> ->
V1 =
case V of
<<"1970-01-01 08:00:00.000">> ->
<<"--">>;
_ ->
V
end,
Unit1 = maps:get(<<"unit">>, Specs, <<"">>),
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
{Type6, V1, Unit1, Ico1, Devicetype1};
_ ->
Unit1 = maps:get(<<"unit">>, Specs, <<"">>),
Ico1 = maps:get(<<"ico">>, Prop, <<"">>),
{Typea, V, Unit1, Ico1, Devicetype1}
end;
_ ->
{<<"others">>, V, <<"">>, <<"">>, <<"others">>}
end,
Acc ++ [#{<<"name">> => Name, <<"type">> => Type, <<"number">> => NewV, <<"time">> => NewTime, <<"unit">> => Unit, <<"imgurl">> => Ico, <<"devicetype">> => Devicetype}]
end
end, [], Payload),
base64:encode(jsx:encode(#{<<"data">> => Data})).

get_prop(ProductId) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(X, Acc) ->
case X of
#{<<"identifier">> := Identifier, <<"name">> := Name, <<"isshow">> := true} ->
Acc#{Identifier => Name};
_ -> Acc
end
end, #{}, Props);
_ ->
#{}
end.

get_props(ProductId) ->
case dgiot_product:lookup_prod(ProductId) of
{ok, #{<<"thing">> := #{<<"properties">> := Props}}} ->
lists:foldl(fun(X, Acc) ->
case X of
#{<<"identifier">> := Identifier, <<"isshow">> := true} ->
Acc#{Identifier => X};
_ -> Acc
end
end, #{}, Props);
_ ->
#{}
end.
4 changes: 3 additions & 1 deletion apps/dgiot_topo/src/dgiot_topo_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ handle_message({deliver, _Topic, Msg}, #state{id = ChannelId} = State) ->
Data = jsx:decode(Payload, [{labels, binary}, return_maps]),
DeviceId = dgiot_parse:get_deviceid(ProductId, DtuAddr),
Thingdata = maps:get(<<"thingdata">>, Data, #{}),
dgiot_topo:send_topo(ProductId, DeviceId, Thingdata);
dgiot_topo:send_topo(ProductId, DeviceId, Thingdata),
%% 发送实时数据
dgiot_topo:send_realtimedata(ProductId, DeviceId, Thingdata);
Other ->
?LOG(info, "Other ~p", [Other]),
pass
Expand Down

0 comments on commit 4664414

Please sign in to comment.