Skip to content

Commit

Permalink
对接 Shashlik.EventBus 完成 更换消息组件 IoTSharp#779
Browse files Browse the repository at this point in the history
  • Loading branch information
maikebing committed Sep 17, 2022
1 parent 767a40c commit da68fc2
Show file tree
Hide file tree
Showing 25 changed files with 673 additions and 251 deletions.
2 changes: 2 additions & 0 deletions IoTSharp.Contracts/Enums.cs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ public enum MountType
/// 告警挂载点
/// </summary>
Alarm = 7,
DeleteDevice = 8,
CreateDevice = 9,
}
/// <summary>
/// 折叠数据
Expand Down
52 changes: 52 additions & 0 deletions IoTSharp.EventBus.CAP/CapPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
using DotNetCore.CAP;
using Dynamitey;
using IoTSharp.Contracts;
using IoTSharp.Data;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace IoTSharp.EventBus
{
public class CapPublisher : IPublisher
{
private readonly ICapPublisher _queue;

public CapPublisher(ICapPublisher queue)
{
_queue = queue;
}
public async Task PublishAttributeData(PlayloadData msg)
{
await _queue.PublishAsync("iotsharp.services.datastream.attributedata", msg);
}

public async Task PublishTelemetryData( PlayloadData msg)
{
await _queue.PublishAsync("iotsharp.services.datastream.telemetrydata", msg);
}

public async Task PublishDeviceStatus( Guid devid, DeviceStatus devicestatus)
{
await _queue.PublishAsync("iotsharp.services.datastream.devicestatus", new PlayloadData { DeviceId = devid, DeviceStatus = devicestatus });
}

public async Task PublishDeviceAlarm( CreateAlarmDto alarmDto)
{
await _queue.PublishAsync("iotsharp.services.datastream.alarm", alarmDto);
}

public async Task PublishCreateDevice(Guid devid)
{
await _queue.PublishAsync("iotsharp.services.platform.createdevice", devid);
}

public async Task PublishDeleteDevice(Guid devid)
{
await _queue.PublishAsync("iotsharp.services.platform.deleteDevice", devid);
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,25 @@ public async Task alarm(CreateAlarmDto alarmDto)


[CapSubscribe("iotsharp.services.datastream.devicestatus")]
public void devicestatus(PlayloadData status)
public async Task devicestatus(PlayloadData status)
{
DeviceStatusEvent(status);
await DeviceStatusEvent(status);
}

[CapSubscribe("iotsharp.services.datastream.telemetrydata")]
public async Task telemetrydata(PlayloadData msg)
{
await StoreTelemetryData(msg);
}

[CapSubscribe("iotsharp.services.datastream.deletedevice")]
public async Task deletedevice(Guid deviceId)
{
await DeleteDevice(deviceId);
}
[CapSubscribe("iotsharp.services.datastream.createdevice")]
public async Task createdevice(Guid deviceId)
{
await CreateDevice(deviceId);
}
}
}
145 changes: 145 additions & 0 deletions IoTSharp.EventBus.CAP/DependencyInjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@

using DotNetCore.CAP;
using DotNetCore.CAP.Dashboard.NodeDiscovery;
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using Savorboard.CAP.InMemoryMessageQueue;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using IoTSharp.Contracts;
using static IoTSharp.EventBus.EventBusOption;

namespace IoTSharp.EventBus.CAP
{
public static class DependencyInjection
{

public static IApplicationBuilder UseCAPEventBus(this IApplicationBuilder app)
{
var provider = app.ApplicationServices;
var options = provider.GetService<EventBusOption>();
app.UseCapDashboard();
return app;
}

public static void UserCAP(this EventBusOption opt)
{
var settings = opt.AppSettings;
var healthChecks = opt.HealthChecks;
var _EventBusStore = opt.EventBusStore;
var _EventBusMQ = opt.EventBusMQ;
var services = opt.services;
services.AddTransient<ISubscriber, CapSubscriber>();
services.AddTransient<IPublisher, CapPublisher>();
services.AddCap(x =>
{
string _hc_EventBusStore = $"{nameof(EventBusStore)}-{Enum.GetName(settings.EventBusStore)}";
x.SucceedMessageExpiredAfter = settings.SucceedMessageExpiredAfter;
x.ConsumerThreadCount = settings.ConsumerThreadCount;
switch (settings.EventBusStore)
{
case EventBusStore.PostgreSql:
x.UsePostgreSql(_EventBusStore);
healthChecks.AddNpgSql(_EventBusStore, name: _hc_EventBusStore);
break;

case EventBusStore.MongoDB:
x.UseMongoDB(_EventBusStore); //注意,仅支持MongoDB 4.0+集群
healthChecks.AddMongoDb(_EventBusStore, name: _hc_EventBusStore);
break;

case EventBusStore.LiteDB:
x.UseLiteDBStorage(_EventBusStore);
break;
case EventBusStore.MySql:
x.UseMySql(_EventBusStore);
break;
case EventBusStore.SqlServer:
x.UseSqlServer(_EventBusStore);
break;
case EventBusStore.InMemory:
default:
x.UseInMemoryStorage();
break;
}
string _hc_EventBusMQ = $"{nameof(EventBusMQ)}-{Enum.GetName(settings.EventBusMQ)}";
switch (settings.EventBusMQ)
{
case EventBusMQ.RabbitMQ:
var url = new Uri(_EventBusMQ);
x.UseRabbitMQ(cfg =>
{
cfg.ConnectionFactoryOptions = cf =>
{
cf.AutomaticRecoveryEnabled = true;
cf.Uri = new Uri(_EventBusMQ);
};
});
//amqp://guest:guest@localhost:5672
healthChecks.AddRabbitMQ(connectionFactory =>
{
var factory = new ConnectionFactory()
{
Uri = new Uri(_EventBusMQ),
AutomaticRecoveryEnabled = true
};
return factory.CreateConnection();
}, _hc_EventBusMQ);
break;

case EventBusMQ.Kafka:
x.UseKafka(_EventBusMQ);
healthChecks.AddKafka(cfg =>
{
cfg.BootstrapServers = _EventBusMQ;
}, name: _hc_EventBusMQ);
break;

case EventBusMQ.ZeroMQ:
x.UseZeroMQ(cfg =>
{
cfg.HostName = _EventBusMQ ?? "127.0.0.1";
cfg.Pattern = MaiKeBing.CAP.NetMQPattern.PushPull;
});
break;
case EventBusMQ.AzureServiceBus:
x.UseAzureServiceBus(_EventBusMQ);
break;
case EventBusMQ.AmazonSQS:
x.UseAmazonSQS(opts =>
{
var uri = new Uri(_EventBusMQ);
if (!string.IsNullOrEmpty(uri.UserInfo) && uri.UserInfo?.Split(':').Length == 2)
{
var userinfo = uri.UserInfo.Split(':');
opts.Credentials = new Amazon.Runtime.BasicAWSCredentials(userinfo[0], userinfo[1]);
}
opts.Region = Amazon.RegionEndpoint.GetBySystemName(uri.Host);
});
break;
case EventBusMQ.RedisStreams:
x.UseRedis(_EventBusMQ);
break;
case EventBusMQ.NATS:
x.UseNATS(_EventBusMQ);
break;
case EventBusMQ.Pulsar:
x.UsePulsar(_EventBusMQ);
break;
case EventBusMQ.InMemory:
default:
x.UseInMemoryMessageQueue();
break;
}
x.UseDashboard();
//x.UseDiscovery();
});
}


}
}
42 changes: 42 additions & 0 deletions IoTSharp.EventBus.CAP/IoTSharp.EventBus.CAP.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="AspNetCore.HealthChecks.NpgSql" Version="6.0.2" />
<PackageReference Include="CAP.Extensions" Version="1.0.35" />
<PackageReference Include="DotNetCore.CAP.Dashboard" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.InMemoryStorage" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.Kafka" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.MongoDB" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.MySql" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.PostgreSql" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.SqlServer" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.AzureServiceBus" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.AmazonSQS" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.NATS" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.RedisStreams" Version="6.1.0" />
<PackageReference Include="DotNetCore.CAP.Pulsar" Version="6.1.0" />
<PackageReference Include="EasyCaching.Core" Version="1.6.1" />
<PackageReference Include="AspNetCore.HealthChecks.Kafka" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.MongoDb" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.Rabbitmq" Version="6.0.2" />
<PackageReference Include="AspNetCore.HealthChecks.Redis" Version="6.0.4" />
<PackageReference Include="MaiKeBing.CAP.ZeroMQ" Version="1.0.35" />
<PackageReference Include="MaiKeBing.CAP.LiteDB" Version="1.0.35" />
<PackageReference Include="MaiKeBing.HostedService.ZeroMQ" Version="1.0.35" />
<PackageReference Include="Savorboard.CAP.InMemoryMessageQueue" Version="6.0.0" />

</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\IoTSharp.Contracts\IoTSharp.Contracts.csproj" />
<ProjectReference Include="..\IoTSharp.Data.TimeSeries\IoTSharp.Data.TimeSeries.csproj" />
<ProjectReference Include="..\IoTSharp.Data\IoTSharp.Data.csproj" />
<ProjectReference Include="..\IoTSharp.EventBus\IoTSharp.EventBus.csproj" />
</ItemGroup>

</Project>
118 changes: 118 additions & 0 deletions IoTSharp.EventBus.Shashlik/DependencyInjection.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@

using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using IoTSharp.Contracts;
using static IoTSharp.EventBus.EventBusOption;
using Shashlik.EventBus;
using Shashlik.EventBus.PostgreSQL;
using Shashlik.EventBus.MySql;
using Shashlik.EventBus.SqlServer;
using Shashlik.EventBus.MemoryStorage;
using Shashlik.EventBus.RabbitMQ;
using Shashlik.EventBus.Kafka;
using Shashlik.EventBus.MemoryQueue;

namespace IoTSharp.EventBus.Shashlik
{
public static class DependencyInjection
{

public static IApplicationBuilder UseShashlikEventBus(this IApplicationBuilder app)
{
var provider = app.ApplicationServices;
var options = provider.GetService<EventBusOption>();
return app;
}

public static void UserShashlik(this EventBusOption opt)
{
var settings = opt.AppSettings;
var healthChecks = opt.HealthChecks;
var _EventBusStore = opt.EventBusStore;
var _EventBusMQ = opt.EventBusMQ;
var services = opt.services;

services.AddTransient<ISubscriber, ShashlikSubscriber>();
services.AddTransient<IPublisher, ShashlikPublisher>();
string _hc_EventBusStore = $"{nameof(EventBusStore)}-{Enum.GetName(settings.EventBusStore)}";
var builder = services.AddEventBus((EventBusOptions opts) =>
{
opts.SucceedExpireHour = settings.SucceedMessageExpiredAfter;
// x.ConsumerThreadCount = settings.ConsumerThreadCount;
});

switch (settings.EventBusStore)
{
case EventBusStore.PostgreSql:
builder.AddNpgsql(_EventBusStore);
healthChecks.AddNpgSql(_EventBusStore, name: _hc_EventBusStore);
break;

case EventBusStore.MongoDB:

break;
case EventBusStore.LiteDB:
break;
case EventBusStore.MySql:
builder.AddMySql(_EventBusStore);
break;
case EventBusStore.SqlServer:
builder.AddSqlServer(_EventBusStore);
break;
case EventBusStore.InMemory:
default:
builder.AddMemoryStorage();
break;
}
string _hc_EventBusMQ = $"{nameof(EventBusMQ)}-{Enum.GetName(settings.EventBusMQ)}";
switch (settings.EventBusMQ)
{
case EventBusMQ.RabbitMQ:
var url = new Uri(_EventBusMQ);
builder.AddRabbitMQ(cfg =>
{
cfg.Host = url.Host;
var userinfo = url.UserInfo?.Split(':');
if (userinfo != null)
{
cfg.UserName = userinfo[0];
cfg.Password = userinfo[1];
}
});
break;

case EventBusMQ.Kafka:
builder.AddKafka(cfg =>
{
cfg.AddOrUpdate("bootstrap.servers", _EventBusMQ);
cfg.AddOrUpdate("allow.auto.create.topics", "true");
});
break;
case EventBusMQ.InMemory:
builder.AddMemoryQueue();
break;
case EventBusMQ.ZeroMQ:
break;
case EventBusMQ.AzureServiceBus:
break;
case EventBusMQ.AmazonSQS:
break;
case EventBusMQ.RedisStreams:
break;
case EventBusMQ.NATS:
break;
case EventBusMQ.Pulsar:
break;

default:
break;
}
}
}
}
Loading

0 comments on commit da68fc2

Please sign in to comment.