Skip to content

Commit

Permalink
mqtt demo
Browse files Browse the repository at this point in the history
  • Loading branch information
smltq committed Mar 22, 2021
1 parent d43ef7c commit d06deac
Show file tree
Hide file tree
Showing 14 changed files with 469 additions and 246 deletions.
43 changes: 43 additions & 0 deletions mqtt/mqtt-publisher/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

<parent>
<artifactId>mqtt</artifactId>
<groupId>com.easy</groupId>
<version>1.0</version>
</parent>

<modelVersion>4.0.0</modelVersion>
<artifactId>mqtt-publisher</artifactId>
<packaging>jar</packaging>
<description>Demo project for Spring Boot</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<!--mqtt相关依赖 start-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mqtt相关依赖 end-->

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.easy.mqtt;
package com.easy.mqtt.publisher;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MqttApplication {
public class MqttPublisherApplication {
public static void main(String[] args) {
SpringApplication.run(MqttApplication.class, args);
SpringApplication.run(MqttPublisherApplication.class, args);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.easy.mqtt.publisher.config;

import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = MqttPublisherConfig.CHANNEL_NAME_OUT)
public interface MqttGateway {

/**
* 发送信息到MQTT服务器
*
* @param payload 发送的文本
*/
void sendToMqtt(String payload);

/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, String payload);

/**
* 发送信息到MQTT服务器
*
* @param topic 主题
* @param qos 对消息处理的几种机制。
* 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
* 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
* 2 多了一次去重的动作,确保订阅者收到的消息有一次。
* @param payload 消息主体
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
@Header(MqttHeaders.QOS) int qos,
String payload);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.easy.mqtt.publisher.config;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

import javax.annotation.PostConstruct;
import java.util.Objects;

@Slf4j
@Configuration
@IntegrationComponentScan
@Getter
@Setter
public class MqttPublisherConfig {

public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

// 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
private static final byte[] WILL_DATA;

static {
WILL_DATA = "offline".getBytes();
}

@Value("${mqtt.username}")
private String username;

@Value("${mqtt.password}")
private String password;

@Value("${mqtt.serverURIs}")
private String hostUrl;

@Value("${mqtt.client.id}")
private String clientId;

@Value("${mqtt.topic}")
private String defaultTopic;

@PostConstruct
public void init() {
log.debug("username:{} password:{} hostUrl:{} clientId :{} ",
this.username, this.password, this.hostUrl, this.clientId, this.defaultTopic);
}

/**
* MQTT连接器选项
*
* @return
*/
@Bean
public MqttConnectOptions getSenderMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
// 设置连接的用户名
if (!username.trim().equals("")) {
options.setUserName(username);
}
// 设置连接的密码
options.setPassword(password.toCharArray());
// 设置连接的地址
options.setServerURIs(new String[]{hostUrl});
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送心跳判断客户端是否在线
// 但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 设置“遗嘱”消息的话题,若客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息。
options.setWill("willTopic", WILL_DATA, 2, false);
return options;
}

/**
* MQTT客户端
*/
@Bean
public MqttPahoClientFactory senderMqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setConnectionOptions(getSenderMqttConnectOptions());
return factory;
}

/**
* MQTT信息通道(生产者)
*/
@Bean(name = CHANNEL_NAME_OUT)
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}

/**
* MQTT消息处理器(生产者)
*/
@Bean
@ServiceActivator(inputChannel = CHANNEL_NAME_OUT)
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(
clientId,
senderMqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(defaultTopic);
return messageHandler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.easy.mqtt.publisher.controller;

import com.easy.mqtt.publisher.config.MqttGateway;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
* MQTT消息发送
*/
@RestController
@Slf4j
public class MqttTestController {
/**
* 注入发送MQTT的Bean
*/
@Resource
private MqttGateway mqttGateway;

/**
* 发送自定义消息内容(使用默认主题)
*
* @param msg 消息内容
* @return 返回
*/
@ResponseBody
@PostMapping(value = "/sendMqtt", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMqtt(@RequestParam(value = "msg") String msg) {
log.info("================生产默认主题的MQTT消息===={}============", msg);
mqttGateway.sendToMqtt(msg);
return new ResponseEntity<>("发送成功", HttpStatus.OK);
}

/**
* 发送自定义消息内容,且指定主题
*
* @param msg 消息内容
* @return 返回
*/
@ResponseBody
@PostMapping(value = "/sendMqtt2", produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<String> sendMqtt2(@RequestParam("topic") String topic, @RequestParam(value = "msg") String msg) {
log.info("================生产自定义主题的MQTT消息===={}============", msg);
mqttGateway.sendToMqtt(topic, msg);
return new ResponseEntity<>("发送成功", HttpStatus.OK);
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
server:
port: 8081
# Mqtt配置
mqtt:
serverURIs: tcp://127.0.0.1:61613
Expand Down
47 changes: 47 additions & 0 deletions mqtt/mqtt-subscriber/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>mqtt</artifactId>
<groupId>com.easy</groupId>
<version>1.0</version>
</parent>

<groupId>com.easy</groupId>
<artifactId>mqtt-subscriber</artifactId>
<version>1.0</version>
<packaging>jar</packaging>

<name>mqtt-subscriber</name>
<description>Demo project for Spring Boot</description>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<!--mqtt相关依赖 start-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<!--mqtt相关依赖 end-->

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.easy.mqtt.subscriber;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MqttSubscriberApplication {
public static void main(String[] args) {
SpringApplication.run(MqttSubscriberApplication.class, args);
}
}
Loading

0 comments on commit d06deac

Please sign in to comment.