Skip to content

Commit

Permalink
统一topic
Browse files Browse the repository at this point in the history
  • Loading branch information
maikebing committed Mar 20, 2022
1 parent ea5cda1 commit caef717
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
12 changes: 6 additions & 6 deletions IoTSharp.SDKs/IoTSharp.Sdk.MQTT/MQTTClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public async Task<bool> ConnectAsync(Uri uri, string username, string password)
.Build();
Client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceived;
Client.ConnectedAsync += e => {
Client.SubscribeAsync($"/devices/{DeviceId}/rpc/request/+/+");
Client.SubscribeAsync($"/devices/{DeviceId}/attributes/update/", MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync($"devices/{DeviceId}/rpc/request/+/+");
Client.SubscribeAsync($"devices/{DeviceId}/attributes/update/", MqttQualityOfServiceLevel.ExactlyOnce);
LogInformation?.Invoke($"CONNECTED WITH SERVER ");
return Task.CompletedTask;
};
Expand Down Expand Up @@ -90,11 +90,11 @@ private Task Client_ApplicationMessageReceived( MqttApplicationMessageReceivedEv
LogDebug?.Invoke($"ApplicationMessageReceived Topic {e.ApplicationMessage.Topic} QualityOfServiceLevel:{e.ApplicationMessage.QualityOfServiceLevel} Retain:{e.ApplicationMessage.Retain} ");
try
{
if (e.ApplicationMessage.Topic.StartsWith($"/devices/") && e.ApplicationMessage.Topic.Contains("/response/"))
if (e.ApplicationMessage.Topic.StartsWith($"devices/") && e.ApplicationMessage.Topic.Contains("/response/"))
{
ReceiveAttributes(e);
}
else if (e.ApplicationMessage.Topic.StartsWith($"/devices/") && e.ApplicationMessage.Topic.Contains("/rpc/request/"))
else if (e.ApplicationMessage.Topic.StartsWith($"devices/") && e.ApplicationMessage.Topic.Contains("/rpc/request/"))
{
var tps = e.ApplicationMessage.Topic.Split(new char[] { '/' }, StringSplitOptions.RemoveEmptyEntries);
var rpcmethodname = tps[4];
Expand Down Expand Up @@ -165,7 +165,7 @@ public Task UploadTelemetryDataAsync(string _devicename, object obj)
public Task ResponseExecommand(RpcResponse rpcResult)
{
///IoTSharp/Clients/RpcClient.cs#L65 var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
string topic = $"/devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method.ToString()}/{rpcResult.ResponseId}";
string topic = $"devices/{rpcResult.DeviceId}/rpc/response/{rpcResult.Method.ToString()}/{rpcResult.ResponseId}";
return Client.PublishAsync(new MqttApplicationMessageBuilder().WithTopic( topic).WithPayload( rpcResult.Data.ToString()).WithQualityOfServiceLevel( MqttQualityOfServiceLevel.ExactlyOnce).Build());
}
public Task RequestAttributes(params string[] args) => RequestAttributes("me", false, args);
Expand All @@ -178,7 +178,7 @@ public Task RequestAttributes(string _device, bool anySide , params string[] ar
string topic = $"devices/{_device}/attributes/request/{id}";
Dictionary<string, string> keys = new Dictionary<string, string>();
keys.Add(anySide ? "anySide" : "server", string.Join(",", args));
Client.SubscribeAsync($"/devices/{_device}/attributes/response/{id}", MqttQualityOfServiceLevel.ExactlyOnce);
Client.SubscribeAsync($"devices/{_device}/attributes/response/{id}", MqttQualityOfServiceLevel.ExactlyOnce);
return Client.PublishStringAsync(topic, Newtonsoft.Json.JsonConvert.SerializeObject(keys), MqttQualityOfServiceLevel.ExactlyOnce);
}
}
Expand Down
4 changes: 2 additions & 2 deletions IoTSharp/Clients/RpcClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public async Task<byte[]> ExecuteAsync(TimeSpan timeout, string deviceid, string
throw new ArgumentException("The method name cannot contain /, + or #.");
}
string rpcid = $"{Guid.NewGuid():N}";
var requestTopic = $"/devices/{deviceid}/rpc/request/{methodName}/{rpcid}";
var responseTopic = $"/devices/{deviceid}/rpc/response/{methodName}/{rpcid}";
var requestTopic = $"devices/{deviceid}/rpc/request/{methodName}/{rpcid}";
var responseTopic = $"devices/{deviceid}/rpc/response/{methodName}/{rpcid}";

var requestMessage = new MqttApplicationMessageBuilder()
.WithTopic(requestTopic)
Expand Down
8 changes: 5 additions & 3 deletions IoTSharp/Handlers/MQTTServerHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,13 @@ private async Task RequestAttributes(string[] tpary, string senderClientId, Dict
{
var qf = from at in _dbContext.AttributeLatest where at.Type == DataType.XML && at.KeyName == tpary[5] select at;
await qf.LoadAsync();
await _serverEx.PublishAsync(senderClientId, $"/devices/me/attributes/response/{tpary[5]}", qf.FirstOrDefault()?.Value_XML);
await _serverEx.PublishAsync(senderClientId, $"devices/me/attributes/response/{tpary[5]}", qf.FirstOrDefault()?.Value_XML);
}
else if (tpary.Length > 5 && tpary[4] == "binary")
{
var qf = from at in _dbContext.AttributeLatest where at.Type == DataType.Binary && at.KeyName == tpary[5] select at;
await qf.LoadAsync();
await _serverEx.PublishAsync(senderClientId, $"/devices/me/attributes/response/{tpary[5]}", qf.FirstOrDefault()?.Value_Binary );
await _serverEx.PublishAsync(senderClientId, $"devices/me/attributes/response/{tpary[5]}", qf.FirstOrDefault()?.Value_Binary );
}
else
{
Expand Down Expand Up @@ -365,6 +365,8 @@ private async Task RequestAttributes(string[] tpary, string senderClientId, Dict
break;
}
}

await _serverEx.PublishAsync(senderClientId, $"devices/me/attributes/response/{tpary[4]}", reps);
}
}

Expand Down Expand Up @@ -413,7 +415,7 @@ internal Task Server_ClientSubscribedTopic( ClientSubscribedTopicEventArgs e)


}
if (e.TopicFilter.Topic.ToLower().StartsWith("/devices/telemetry"))
if (e.TopicFilter.Topic.ToLower().StartsWith("devices/telemetry"))
{


Expand Down

0 comments on commit caef717

Please sign in to comment.