Skip to content

Commit

Permalink
[fix]: mqtt protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
cxntsh committed Dec 21, 2021
1 parent d2b55de commit 5012901
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 36 deletions.
4 changes: 4 additions & 0 deletions iot-server/server-camel-receiver/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<version>1.0.3-RELEASE</version>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-paho-mqtt5</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty-http</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ public MqttClientRouteBuilder(String routeId, Map<String, Object> options) {

@Override
public void configure() throws Exception {
fromF("mqtt:zeus-iot-mqtt?host=tcp://%s:%s&subscribeTopicNames=%s", options.get("hostIp"), options.get("port"), options.get("topicNames"))
fromF("paho-mqtt5:%s?brokerUrl=tcp://%s:%s", options.get("topicNames"), options.get("hostIp"), options.get("port"))
.routeId(routeId)
.log(LoggingLevel.DEBUG, log, ">>> Message received from Mqtt Client : \n${body}")
.dynamicRouter(method(RouteJudge.class, "slip")).to("Zabbix");
.dynamicRouter(method(RouteJudge.class, "slip")).to("Zabbix:mqtt");
}


Expand All @@ -36,12 +36,12 @@ public static String slip(Exchange exchange) {
return null;
}

String topicName = exchange.getIn().getHeader("CamelMQTTSubscribeTopic").toString();
String topicName = exchange.getIn().getHeader("CamelMqttTopic").toString();

//TODO 这里需要动态加载规则

if (topicName.equals("zeusiot/123")) {
return "ArkBiz?uniqueId=198909118";
return "ArkBiz:mqtt?uniqueId=19890918";
}

if (topicName.equals("zeusiot/1234")) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
import org.apache.camel.CamelContext;
import org.apache.camel.Route;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.model.FromDefinition;

import java.util.List;

/**
* @author nantian created at 2021/8/17 0:24
Expand Down Expand Up @@ -46,37 +43,18 @@ public void addRoutes(RoutesBuilder builder) {
* @param routeId 路由ID
*/
public void routeShutDown(String routeId) {
// EventDrivenConsumerRoute route = (EventDrivenConsumerRoute) camelContext.getRoute(routeId);
// List<FromDefinition> fromDefinitions = camelContext.getRouteDefinition(routeId).getInputs();
//
//
// try {
// camelContext.stopRoute(routeId);
// camelContext.removeRoute(routeId);
//// camelContext.removeRouteDefinition(camelContext.getRouteDefinition(routeId));
// } catch (Exception e) {
// e.printStackTrace();
// }

Route route = camelContext.getRoute(routeId);
if (route == null) {
return;
}

// if (route == null) {
// log.error("当前 routeId : {} 对应的路由不存在", routeId);
// return;
// }
// try {
// route.getEndpoint().stop();
// route.getConsumer().stop();
//
// } catch (Exception e) {
// e.printStackTrace();
// }
//
//
// try {
// camelContext.removeRoute(routeId);
// } catch (Exception e) {
// e.printStackTrace();
// }
try {
camelContext.getRouteController().stopRoute(routeId);
camelContext.removeRoute(routeId);
} catch (Exception e) {
e.printStackTrace();
}
}


Expand Down

0 comments on commit 5012901

Please sign in to comment.