Skip to content

Commit

Permalink
增加 rabbitmq 示例
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Dec 10, 2019
1 parent 112b42f commit b196a5f
Show file tree
Hide file tree
Showing 40 changed files with 1,104 additions and 0 deletions.
30 changes: 30 additions & 0 deletions lab-04/lab-04-rabbitmq-demo-batch-consume-02/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-04-rabbitmq-demo-batch-consume-02</artifactId>

<dependencies>
<!-- 实现对 RabbitMQ 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync // 开启异步
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.config;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo06Message;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

/**
* Direct Exchange 示例的配置类
*/
public static class DirectExchangeDemoConfiguration {

// 创建 Queue
@Bean
public Queue demo06Queue() {
return new Queue(Demo06Message.QUEUE, // Queue 名字
true, // durable: 是否持久化
false, // exclusive: 是否排它
false); // autoDelete: 是否自动删除
}

// 创建 Direct Exchange
@Bean
public DirectExchange demo06Exchange() {
return new DirectExchange(Demo06Message.EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}

// 创建 Binding
// Exchange:Demo06Message.EXCHANGE
// Routing key:Demo06Message.ROUTING_KEY
// Queue:Demo06Message.QUEUE
@Bean
public Binding demo06Binding() {
return BindingBuilder.bind(demo06Queue()).to(demo06Exchange()).with(Demo06Message.ROUTING_KEY);
}

}

@Bean(name = "consumerBatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// 创建 SimpleRabbitListenerContainerFactory 对象
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// 额外添加批量消费的属性
factory.setBatchListener(true);
factory.setBatchSize(10);
factory.setReceiveTimeout(10 * 1000L);
factory.setConsumerBatchEnabled(true);
return factory;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.consumer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo06Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@RabbitListener(queues = Demo06Message.QUEUE,
containerFactory = "consumerBatchContainerFactory")
public class Demo06Consumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@RabbitHandler
public void onMessage(List<Demo06Message> messages) {
logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());
}

// @RabbitHandler(isDefault = true)
// public void onMessageX(List<Message> messages) {
// logger.info("[onMessage][线程编号:{} 消息数量:{}]", Thread.currentThread().getId(), messages.size());
// }

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.message;

import java.io.Serializable;

public class Demo06Message implements Serializable {

public static final String QUEUE = "QUEUE_DEMO_06";

public static final String EXCHANGE = "EXCHANGE_DEMO_06";

public static final String ROUTING_KEY = "ROUTING_KEY_06";

/**
* 编号
*/
private Integer id;

public Demo06Message setId(Integer id) {
this.id = id;
return this;
}

public Integer getId() {
return id;
}

@Override
public String toString() {
return "Demo06Message{" +
"id=" + id +
'}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo06Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Demo06Producer {

@Autowired
private RabbitTemplate rabbitTemplate;

public void syncSend(Integer id) {
// 创建 Demo06Message 消息
Demo06Message message = new Demo06Message();
message.setId(id);
// 同步发送消息
rabbitTemplate.convertAndSend(Demo06Message.EXCHANGE, Demo06Message.ROUTING_KEY, message);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo;
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.producer;

import cn.iocoder.springboot.lab04.rabbitmqdemo.Application;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.CountDownLatch;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class Demo06ProducerTest {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private Demo06Producer producer;

@Test
public void testSyncSend() throws InterruptedException {
for (int i = 0; i < 3; i++) {
// 同步发送消息
int id = (int) (System.currentTimeMillis() / 1000);
producer.syncSend(id);

// 故意每条消息之间,隔离 10 秒
logger.info("[testASyncSend][发送编号:[{}] 发送成功]", id);
// Thread.sleep(10 * 1000L);
}

// 阻塞等待,保证消费
new CountDownLatch(1).await();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
rabbitmq:
host: 127.0.0.1 # RabbitMQ 服务的地址
port: 5672 # RabbitMQ 服务的端口
username: guest # RabbitMQ 服务的账号
password: guest # RabbitMQ 服务的密码
30 changes: 30 additions & 0 deletions lab-04/lab-04-rabbitmq-demo-batch-consume/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>lab-04-rabbitmq-demo-batch-consume</artifactId>

<dependencies>
<!-- 实现对 RabbitMQ 的自动化配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<!-- 方便等会写单元测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync // 开启异步
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package cn.iocoder.springboot.lab04.rabbitmqdemo.config;

import cn.iocoder.springboot.lab04.rabbitmqdemo.message.Demo05Message;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.BatchingRabbitTemplate;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;

@Configuration
public class RabbitConfig {

/**
* Direct Exchange 示例的配置类
*/
public static class DirectExchangeDemoConfiguration {

// 创建 Queue
@Bean
public Queue demo05Queue() {
return new Queue(Demo05Message.QUEUE, // Queue 名字
true, // durable: 是否持久化
false, // exclusive: 是否排它
false); // autoDelete: 是否自动删除
}

// 创建 Direct Exchange
@Bean
public DirectExchange demo05Exchange() {
return new DirectExchange(Demo05Message.EXCHANGE,
true, // durable: 是否持久化
false); // exclusive: 是否排它
}

// 创建 Binding
// Exchange:Demo05Message.EXCHANGE
// Routing key:Demo05Message.ROUTING_KEY
// Queue:Demo05Message.QUEUE
@Bean
public Binding demo05Binding() {
return BindingBuilder.bind(demo05Queue()).to(demo05Exchange()).with(Demo05Message.ROUTING_KEY);
}

}

@Bean
public BatchingRabbitTemplate batchRabbitTemplate(ConnectionFactory connectionFactory) {
// 创建 BatchingStrategy 对象,代表批量策略
int batchSize = 16384; // 超过收集的消息数量的最大条数。
int bufferLimit = 33554432; // 每次批量发送消息的最大内存
int timeout = 30000; // 超过收集的时间的最大等待时长,单位:毫秒
BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);

// 创建 TaskScheduler 对象,用于实现超时发送的定时器
TaskScheduler taskScheduler = new ConcurrentTaskScheduler();

// 创建 BatchingRabbitTemplate 对象
BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
batchTemplate.setConnectionFactory(connectionFactory);
return batchTemplate;
}

@Bean(name = "consumerBatchContainerFactory")
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
// 创建 SimpleRabbitListenerContainerFactory 对象
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
// 额外添加批量消费的属性
factory.setBatchListener(true);
return factory;
}

}
Loading

0 comments on commit b196a5f

Please sign in to comment.