Skip to content

Commit

Permalink
IoTSharp#779 重构消息处理
Browse files Browse the repository at this point in the history
  • Loading branch information
maikebing committed Sep 17, 2022
1 parent 1c177c3 commit 3078606
Show file tree
Hide file tree
Showing 23 changed files with 75 additions and 65 deletions.
11 changes: 9 additions & 2 deletions IoTSharp.EventBus/CAP/CapPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public void PublishAttributeData(PlayloadData msg)
_queue.Publish("iotsharp.services.datastream.attributedata", msg);
}


public void PublishTelemetryData( PlayloadData msg)
{
_queue.Publish("iotsharp.services.datastream.telemetrydata", msg);
Expand All @@ -39,6 +38,14 @@ public void PublishDeviceAlarm( CreateAlarmDto alarmDto)
_queue.Publish("iotsharp.services.datastream.alarm", alarmDto);
}


public void PublishCreateDevice(Guid devid)
{
_queue.Publish("iotsharp.services.platform.createdevice", devid);
}

public void PublishDeleteDevice(Guid devid)
{
_queue.Publish("iotsharp.services.platform.deleteDevice", devid);
}
}
}
4 changes: 4 additions & 0 deletions IoTSharp.EventBus/IPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ namespace IoTSharp.EventBus
{
public interface IPublisher
{

public void PublishCreateDevice(Guid devid);
public void PublishDeleteDevice(Guid devid);

public void PublishAttributeData(PlayloadData msg);
public void PublishTelemetryData(PlayloadData msg);
public void PublishDeviceStatus(Guid devid, DeviceStatus devicestatus);
Expand Down
8 changes: 4 additions & 4 deletions IoTSharp.Extensions/StringExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ public static bool IsNullOrEmpty(this object obj)
/// </summary>
/// <param name="obj">需要序列化的对象</param>
/// <returns></returns>
public static string ToJson(this object obj)
public static string ToJson<T>(this T obj) where T:class
{
return JsonConvert.SerializeObject(obj);
}
Expand Down Expand Up @@ -492,9 +492,9 @@ public static T DeepClone<T>(this T obj) where T : class
/// <typeparam name="T">对象类型</typeparam>
/// <param name="obj">对象</param>
/// <returns></returns>
public static string ToXmlStr<T>(this T obj)
public static string ToXmlStr<T>(this T obj) where T:class
{
var jsonStr = obj.ToJson();
var jsonStr = obj.ToJson<T>();
var xmlDoc = JsonConvert.DeserializeXmlNode(jsonStr);
string xmlDocStr = xmlDoc.InnerXml;

Expand All @@ -508,7 +508,7 @@ public static string ToXmlStr<T>(this T obj)
/// <param name="obj">对象</param>
/// <param name="rootNodeName">根节点名(建议设为xml)</param>
/// <returns></returns>
public static string ToXmlStr<T>(this T obj, string rootNodeName)
public static string ToXmlStr<T>(this T obj, string rootNodeName) where T:class
{
var jsonStr = obj.ToJson();
var xmlDoc = JsonConvert.DeserializeXmlNode(jsonStr, rootNodeName);
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp.TaskActions/CustomeAlarmPullExcutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
using System.Linq;
using System.Net;
using System.Threading.Tasks;
using DotNetCore.CAP;
using IoTSharp.EventBus;
using IoTSharp.Contracts;
using IoTSharp.Data;
using IoTSharp.Extensions;
Expand All @@ -19,9 +19,9 @@ namespace IoTSharp.TaskActions
public class CustomeAlarmPullExcutor : TaskAction
{

private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly ApplicationDbContext _context;
public CustomeAlarmPullExcutor(ApplicationDbContext context, ICapPublisher queue)
public CustomeAlarmPullExcutor(ApplicationDbContext context, IPublisher queue)
{
this._context = context; _queue = queue;
}
Expand Down
4 changes: 2 additions & 2 deletions IoTSharp.TaskActions/IoTSharp.TaskActions.csproj
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
Expand All @@ -10,12 +10,12 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="DotNetCore.CAP" Version="6.1.0" />
<PackageReference Include="RestSharp" Version="108.0.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\IoTSharp.Data\IoTSharp.Data.csproj" />
<ProjectReference Include="..\IoTSharp.EventBus\IoTSharp.EventBus.csproj" />
</ItemGroup>

</Project>
9 changes: 4 additions & 5 deletions IoTSharp/Controllers/DevicesController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using EasyCaching.Core;
using Esprima.Ast;
using IoTSharp.Contracts;
Expand Down Expand Up @@ -57,13 +57,13 @@ public class DevicesController : ControllerBase
private readonly IStorage _storage;
private readonly MqttServer _serverEx;
private readonly AppSettings _setting;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly IEasyCachingProvider _caching;
private readonly IServiceScopeFactory _scopeFactor;

public DevicesController(UserManager<IdentityUser> userManager,
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, MqttServer serverEx, ApplicationDbContext context, MqttClientOptions mqtt, IStorage storage, IOptions<AppSettings> options, ICapPublisher queue
SignInManager<IdentityUser> signInManager, ILogger<DevicesController> logger, MqttServer serverEx, ApplicationDbContext context, MqttClientOptions mqtt, IStorage storage, IOptions<AppSettings> options, IPublisher queue
, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor, IServiceScopeFactory scopeFactor)
{
string _hc_Caching = $"{nameof(CachingUseIn)}-{Enum.GetName(options.Value.CachingUseIn)}";
Expand Down Expand Up @@ -728,8 +728,7 @@ public async Task<ApiResult<Device>> PostDevice(DevicePostDto device)
_context.DeviceIdentities.Update(identity);
await _context.SaveChangesAsync();
}

await this._queue.PublishAsync("iotsharp.services.platform.addnewdevice", devvalue);
_queue.PublishCreateDevice(devvalue.Id) ;
return new ApiResult<Device>(ApiCode.Success, "Ok", await FoundAsync(devvalue.Id));
}

Expand Down
2 changes: 1 addition & 1 deletion IoTSharp/Extensions/IoTSharpExtension.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using HealthChecks.UI.Configuration;
using IoTSharp.Contracts;
using IoTSharp.Data;
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Gateways/RawDataGateway.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
Expand Down Expand Up @@ -33,13 +33,13 @@ public class RawDataGateway
private const string _map_var_ts_field = "_map_var_ts_field";
private readonly AppSettings _setting;
private readonly ILogger _logger;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IEasyCachingProvider _caching;
private readonly ApplicationDbContext _context;

public RawDataGateway(ILogger<RawDataGateway> logger, IServiceScopeFactory scopeFactor
, IOptions<AppSettings> options, ICapPublisher queue, IEasyCachingProviderFactory factory
, IOptions<AppSettings> options, IPublisher queue, IEasyCachingProviderFactory factory
, ApplicationDbContext context
)
{
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Jobs/CheckDevices.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using EasyCaching.Core;
using IoTSharp.Contracts;
using IoTSharp.Data;
Expand All @@ -24,10 +24,10 @@ public class CheckDevices : IJob
private readonly ILogger<CheckDevices> _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly MqttServer _serverEx;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;

public CheckDevices(ILogger<CheckDevices> logger, IServiceScopeFactory scopeFactor, MqttServer serverEx
, IOptions<AppSettings> options, ICapPublisher queue)
, IOptions<AppSettings> options, IPublisher queue)
{
_mcsetting = options.Value.MqttClient;
_logger = logger;
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/CoAPService.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
using CoAP.Server;
using DotNetCore.CAP;
using IoTSharp.EventBus;
using IoTSharp.Contracts;
using IoTSharp.Data;
using IoTSharp.Services.CoApResources;
Expand All @@ -22,11 +22,11 @@ public class CoAPService : IHostedService
private readonly ILogger _logger;

private ApplicationDbContext _dbContext;
private readonly ICapPublisher _capBus;
private readonly IPublisher _capBus;
private IServiceScope _serviceScope;
private CoapServer server;
private readonly AppSettings _settings;
public CoAPService(ILogger<CoAPService> logger, IServiceScopeFactory scopeFactor, IOptions<AppSettings> options, ICapPublisher capBus)
public CoAPService(ILogger<CoAPService> logger, IServiceScopeFactory scopeFactor, IOptions<AppSettings> options, IPublisher capBus)
{
_settings = options.Value;
server = new CoapServer();
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/CoApResources/CoAPResource.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using CoAP;
using CoAP.Server.Resources;
using DotNetCore.CAP;
using IoTSharp.EventBus;
using IoTSharp.Contracts;
using IoTSharp.Data;
using IoTSharp.Extensions;
Expand All @@ -26,9 +26,9 @@ public class CoApResource : Resource
};

private readonly ILogger _logger;
private readonly ICapPublisher _eventBus;
private readonly IPublisher _eventBus;

public CoApResource(string name, ApplicationDbContext dbContext, ILogger logger, ICapPublisher eventBus)
public CoApResource(string name, ApplicationDbContext dbContext, ILogger logger, IPublisher eventBus)
: base(name)
{
Attributes.Title = name;
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/MQTTControllers/AlarmController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
Expand All @@ -22,12 +22,12 @@ public class AlarmController : MqttBaseController
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;

private string _devname;
private Device device;

public AlarmController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, ICapPublisher queue)
public AlarmController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, IPublisher queue)
{
_logger = logger;
_scopeFactor = scopeFactor;
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/MQTTControllers/AttributesController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
Expand Down Expand Up @@ -26,7 +26,7 @@ public class AttributesController : MqttBaseController
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IEasyCachingProviderFactory _factory;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly IEasyCachingProvider _caching;
private readonly MQTTService _service;
Expand All @@ -36,7 +36,7 @@ public class AttributesController : MqttBaseController
private Device device;

public AttributesController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, MQTTService mqttService,
IOptions<AppSettings> options, ICapPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
IOptions<AppSettings> options, IPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
)
{
string _hc_Caching = $"{nameof(CachingUseIn)}-{Enum.GetName(options.Value.CachingUseIn)}";
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/MQTTControllers/DataController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
Expand All @@ -23,7 +23,7 @@ public class DataController : MqttBaseController
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IEasyCachingProviderFactory _factory;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly IEasyCachingProvider _caching;
private readonly MQTTService _service;
Expand All @@ -34,7 +34,7 @@ public class DataController : MqttBaseController
private Device device;

public DataController(ILogger<DataController> logger, IServiceScopeFactory scopeFactor, MQTTService mqttService,
IOptions<AppSettings> options, ICapPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
IOptions<AppSettings> options, IPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
)
{
string _hc_Caching = $"{nameof(CachingUseIn)}-{Enum.GetName(options.Value.CachingUseIn)}";
Expand Down
8 changes: 4 additions & 4 deletions IoTSharp/Services/MQTTControllers/GatewayController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using IoTSharp.Contracts;
using IoTSharp.Data;
Expand All @@ -25,7 +25,7 @@ namespace IoTSharp.Services.MQTTControllers
public class V1GatewayController : GatewayController
{
public V1GatewayController(ILogger<GatewayController> logger, IServiceScopeFactory scopeFactor,
IOptions<AppSettings> options, ICapPublisher queue, RawDataGateway rawDataGateway
IOptions<AppSettings> options, IPublisher queue, RawDataGateway rawDataGateway
) : base(logger, scopeFactor, options, queue, rawDataGateway)
{
}
Expand All @@ -37,11 +37,11 @@ public class GatewayController : MqttBaseController
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly RawDataGateway _rawData;

public GatewayController(ILogger<GatewayController> logger, IServiceScopeFactory scopeFactor,
IOptions<AppSettings> options, ICapPublisher queue, RawDataGateway rawDataGateway
IOptions<AppSettings> options, IPublisher queue, RawDataGateway rawDataGateway
)
{
_logger = logger;
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/MQTTControllers/RpcController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
Expand All @@ -22,7 +22,7 @@ public class RpcController : MqttBaseController
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly IEasyCachingProviderFactory _factory;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private readonly FlowRuleProcessor _flowRuleProcessor;
private readonly IEasyCachingProvider _caching;
private readonly MQTTService _service;
Expand All @@ -33,7 +33,7 @@ public class RpcController : MqttBaseController
private Device device;

public RpcController(ILogger<RpcController> logger, IServiceScopeFactory scopeFactor, MQTTService mqttService,
IOptions<AppSettings> options, ICapPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
IOptions<AppSettings> options, IPublisher queue, IEasyCachingProviderFactory factory, FlowRuleProcessor flowRuleProcessor
)
{
string _hc_Caching = $"{nameof(CachingUseIn)}-{Enum.GetName(options.Value.CachingUseIn)}";
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/MQTTControllers/StatusController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Contracts;
Expand All @@ -20,11 +20,11 @@ public class StatusController : MqttBaseController
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private string _devname;
private Device device;

public StatusController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, ICapPublisher queue)
public StatusController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, IPublisher queue)
{
_logger = logger;
_scopeFactor = scopeFactor;
Expand Down
6 changes: 3 additions & 3 deletions IoTSharp/Services/MQTTControllers/TelemetryController.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using DotNetCore.CAP;
using IoTSharp.EventBus;
using Dynamitey.DynamicObjects;
using EasyCaching.Core;
using IoTSharp.Data;
Expand All @@ -21,11 +21,11 @@ public class TelemetryController : MqttBaseController
{
private readonly ILogger _logger;
private readonly IServiceScopeFactory _scopeFactor;
private readonly ICapPublisher _queue;
private readonly IPublisher _queue;
private string _devname;
private Device device;

public TelemetryController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, ICapPublisher queue)
public TelemetryController(ILogger<TelemetryController> logger, IServiceScopeFactory scopeFactor, IPublisher queue)
{

_logger = logger;
Expand Down
Loading

0 comments on commit 3078606

Please sign in to comment.