Skip to content

Commit

Permalink
增加 rabbitmq 消费模式
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Dec 11, 2019
1 parent 7e9a6eb commit fca8209
Show file tree
Hide file tree
Showing 27 changed files with 681 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.consumer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo07Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = Demo07Message.DEAD_QUEUE)
public class Demo07DeadConsumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@RabbitHandler
public void onMessage(Demo07Message message) {
logger.info("[onMessage][【死信队列】线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,16 @@ spring:
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
template:
# 对应 RabbitProperties.Retry 类
retry:
enabled: true # 开启发送机制
max-attempts: 3 # 最大重试次数。默认为 3 。
initial-interval: 1000 # 重试间隔,单位为毫秒。默认为 1000 。
listener:
simple:
# 对应 RabbitProperties.ListenerRetry 类
retry:
enabled: true

enabled: true # 开启消费重试机制
max-attempts: 3 # 最大重试次数。默认为 3 。
initial-interval: 1000 # 重试间隔,单位为毫秒。默认为 1000 。
30 changes: 30 additions & 0 deletions lab-04/lab-04-rabbitmq-demo-delay/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-04-rabbitmq-demo-dely</artifactId>

<dependencies>
<!-- 实现对 RabbitMQ 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.config;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo08Message;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

/**
* Direct Exchange 示例的配置类
*/
public static class DirectExchangeDemoConfiguration {

// 创建 Queue
@Bean
public Queue demo08Queue() {
return QueueBuilder.durable(Demo08Message.QUEUE) // durable: 是否持久化
.exclusive() // exclusive: 是否排它
.autoDelete() // autoDelete: 是否自动删除
.ttl(10 * 1000) // 设置队列里的默认过期时间为 10 秒
.deadLetterExchange(Demo08Message.EXCHANGE)
.deadLetterRoutingKey(Demo08Message.DELAY_ROUTING_KEY)
.build();
}

// 创建 Delay Queue
@Bean
public Queue demo08DelayQueue() {
return new Queue(Demo08Message.DELAY_QUEUE, // Queue 名字
true, // durable: 是否持久化
false, // exclusive: 是否排它
false); // autoDelete: 是否自动删除
}

// 创建 Direct Exchange
@Bean
public DirectExchange demo08Exchange() {
return new DirectExchange(Demo08Message.EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}

// 创建 Binding
// Exchange:Demo08Message.EXCHANGE
// Routing key:Demo08Message.ROUTING_KEY
// Queue:Demo08Message.QUEUE
@Bean
public Binding demo08Binding() {
return BindingBuilder.bind(demo08Queue()).to(demo08Exchange()).with(Demo08Message.ROUTING_KEY);
}

// 创建 Delay Binding
// Exchange:Demo08Message.EXCHANGE
// Routing key:Demo08Message.DELAY_ROUTING_KEY
// Queue:Demo08Message.DELAY_QUEUE
@Bean
public Binding demo08DelayBinding() {
return BindingBuilder.bind(demo08DelayQueue()).to(demo08Exchange()).with(Demo08Message.DELAY_ROUTING_KEY);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.consumer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo08Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = Demo08Message.DELAY_QUEUE)
public class Demo08Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@RabbitHandler
public void onMessage(Demo08Message message) {
logger.info("[onMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.message;

import java.io.Serializable;

public class Demo08Message implements Serializable {

public static final String QUEUE = "QUEUE_DEMO_08"; // 正常队列
public static final String DELAY_QUEUE = "DELAY_QUEUE_DEMO_08"; // 延迟队列(死信队列)

public static final String EXCHANGE = "EXCHANGE_DEMO_08";

public static final String ROUTING_KEY = "ROUTING_KEY_08"; // 正常路由键
public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY_08"; // 延迟路由键(死信路由键)

/**
* 编号
*/
private Integer id;

public Demo08Message setId(Integer id) {
this.id = id;
return this;
}

public Integer getId() {
return id;
}

@Override
public String toString() {
return "Demo08Message{" +
"id=" + id +
'}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo08Message;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo08Producer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void syncSend(Integer id, Integer delay) {
// 创建 Demo07Message 消息
Demo08Message message = new Demo08Message();
message.setId(id);
// 同步发送消息
rabbitTemplate.convertAndSend(Demo08Message.EXCHANGE, Demo08Message.ROUTING_KEY, message, new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息的 TTL 过期时间
if (delay != null && delay > 0) {
message.getMessageProperties().setExpiration(String.valueOf(delay)); // Spring-AMQP API 设计有问题,所以传入了 String = =
}
return message;
}

});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo;
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo08ProducerTest {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private Demo08Producer producer;

@Test
public void testSyncSend01() throws InterruptedException {
// 不设置消息的过期时间,使用队列默认的消息过期时间
this.testSyncSendDelay(null);
}

@Test
public void testSyncSend02() throws InterruptedException {
// 设置发送消息的过期时间为 5000 毫秒
this.testSyncSendDelay(5000);
}

private void testSyncSendDelay(Integer delay) throws InterruptedException {
int id = (int) (System.currentTimeMillis() / 1000);
producer.syncSend(id, delay);
logger.info("[testSyncSendDelay][发送编号:[{}] 发送成功]", id);

// 阻塞等待,保证消费
new CountDownLatch(1).await();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
30 changes: 30 additions & 0 deletions lab-04/lab-04-rabbitmq-demo-message-model/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-04-rabbitmq-demo-message-consume</artifactId>

<dependencies>
<!-- 实现对 RabbitMQ 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.config;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.BroadcastMessage;
import cn.iocoder.springboot.lab04.rabbitmqdemo.message.ClusteringMessage;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

/**
* 广播消费的示例的配置
*/
public static class BroadcastingConfiguration {

// 创建 Topic Exchange
@Bean
public TopicExchange broadcastingExchange() {
return new TopicExchange(BroadcastMessage.EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}

}

/**
* 集群消费的示例的配置
*/
public static class ClusteringConfiguration {

// 创建 Topic Exchange
@Bean
public TopicExchange clusteringExchange() {
return new TopicExchange(ClusteringMessage.EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}

}

}
Loading

0 comments on commit fca8209

Please sign in to comment.