Skip to content

Latest commit

 

History

History
529 lines (528 loc) · 31.1 KB

SpringBoot整合RabbitMQ.md

File metadata and controls

529 lines (528 loc) · 31.1 KB
1.安装: (windows)
    [1]安装Erlang:
        (1)下载: http://www.erlang.org/downloads
        (2)配置Erlang环境变量: 
            1)新建系统变量: 
                变量名: ERLANG_HOME  变量值: D:\Develop\Erlang10.5
            2)修改系统变量: (在Path变量里添加以下内容)
                %ERLANG_HOME%\bin;
        (3)在cmd中通过命令测试Erlang是否安装成功: erl
    [2]安装RabbitMQ:
        (1)下载: http://www.rabbitmq.com/download.html
            1)exe安装地址: http://www.rabbitmq.com/install-windows.html
            2)解压缩安装地址: http://www.rabbitmq.com/install-windows-manual.html
        (2)配置RabbitMQ环境变量:
            1)新建系统变量: 
                变量名: RABBITMQ_SERVER 变量值: D:\Develop\rabbitmq_server-3.8.1
            2)修改系统变量: (在Path变量里添加以下内容)
                %RABBITMQ_SERVER%\sbin
        (3)安装插件: rabbitmq-plugins.bat enable rabbitmq_management
        (4)启动rabbitmq: rabbitmq-server.bat 
        (5)浏览器中访问rabbitmq控制台: http://localhost:15672 (默认用户名密码: guest,guest)
        (RabbitMQ的状态查询命令: rabbitmqctl status)
2.RabbitMQ的相关概念:
    [1]生产者和消费者:
        (1)Producer(生产者): 投递消息(包含两部分: 消息体和标签)的一方;
            1)消息体: 也称为payload,一般是带有业务逻辑结构的数据,如一个JSON字符串;
            2)标签: 用来标书该消息,如一个交换机的名称和一个路由键;
            (生产者把消息交由RabbitMQ,RabbitMQ之后会根据标签把消息发送给感兴趣的消费者)
        (2)Consumer(消费者): 接收消息的一方;
            1)消费消息时,只是消费消息体(payload),消息的标签在消息路由的过程中被丢弃,存入队列的消息只有消息体;
        (3)Broker(消息中间件的服务节点): RabbitMQ服务节点或服务实例;
    [2]Queue(队列): RabbitMQ的内部对象,用于存储消息;
    [3]交换器,路由键,绑定:
        (1)Exchange(交换器): 
            1)交换器的四种类型:
                ①direct(直连交换机): 
                    把消息路由到哪些BindingKey和RoutingKey完全匹配的队列中;
                ②fanout(扇型交换机): 
                    把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中;
                ③topic(主题交换机):    
                    把消息路由到BindingKey和RoutingKey相匹配的队列中;
                ④headers(头交换机): 
                    不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配;(性能差)
            2)生产者将消息发送到Exchange,由交换器将消息路由到一个或多个队列中,若路由不到则返回给生产者或丢弃;
    (2)RoutingKey(路由键):
        1)生产者将消息发给交换器的时候,一般指定一个RoutingKey,用来指定该消息的路由规则;
        2)此RoutingKey需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效;
        3)在交换器类型和绑定键固定时,生产者可以在发送消息给交换器时,通过指定RoutingKey来决定消息流向哪里;
    (3)Binding(绑定): 通过绑定将交换器与队列关联起来,在绑定时指定绑定键可使消息正确路由到队列;
3.RabbitMQ运转流程:
    [1]生产者发送消息:
        (1)生产者连接到RabbitMQ Broker,建立连接(Connection),开启一个信道(Channel);
        (2)生产者声明一个交换器,并设置相关属性,比如交换机类型,是否持久化等;
        (3)生产者声明一个队列并设置相关属性,比如是否排他,是否持久化,是否自动删除等;
        (4)生产者通过路由键将交换器和队列绑定起来;
        (5)生产者发送消息至RabbitMQ Broker,其中包含路由键,交换器等信息;
        (6)相应的交换器根据接收到的路由键查找相匹配的队列;
        (7)如果找到,则将从生产者发送过来的消息存入相应的队列中;
        (8)如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者;
        (9)关闭信道;
        (10)关闭连接;
    [2]消费者接收消息的过程:
        (1)消费者连接到RabbitMQ Broker,建立连接(Connection),开启一个信道(Channel);
        (2)消费者向RabbitMQ Broker请求消费相应队列中的消息,可能会设置相应的回调函数,以及做一些准备工作;
        (3)等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息;
        (4)消费者确认(ack)接收的消息;
        (5)RabbitMQ从队列中删除相应已经被确认的消息;
        (6)关闭信道;
        (7)关闭连接;
4.SpringBoot整合RabbitMQ:
    [1]引入依赖:
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    [2]编写application.yml配置:
        spring:
          rabbitmq:
            #RabbitMQ的用户名,默认为guest
            username: guest
            #RabbitMQ的密码,默认为guest
            password: guest
            addresses: localhost:5672
            cache:
              connection:
                #缓存连接模式,默认一个连接,多个channel
                mode: channel
                #多个连接,多个channel
                #mode: connection
    [3]几种交换机的使用:
        (1)Direct(直连交换机):
            1)申明一个消费者,申明一个queue和Exchange(若不存在),并binding;
                @Component
                @RabbitListener(bindings = @QueueBinding(
                    value = @Queue("myDirectQueue"),
                    exchange = @Exchange(value = "myDirectExchange", type = ExchangeTypes.DIRECT),
                    key = "mine.direct"
                ))
                public class MyDirectListener {
                    //listenerAdapter
                    //@param msg 消息内容,当只有一个参数的时候可以不加@Payload注解
                    @RabbitHandler
                    public void onMessage(@Payload String msg) {
                        System.out.println("来自 myDirectExchange 的消息:" + msg);
                    }
                }
            2)生产者:
                @Autowired
                private RabbitTemplate rabbitTemplate;
                @GetMapping("/direct")
                public String sendMsgByDirect() {
                    //参数: 交换机,routingKey,消息内容
                    rabbitTemplate.convertAndSend("myDirectExchange","mine.direct","this is a message");
                    return "success";
                }
        (2)Default(默认交换机):
            (名称为空字符串的直连交换机,一个queue若不指定binding的交换机,就被绑定到默认交换机上,routingKey为queue的名称)
            1)消费者: 
                @Component
                @RabbitListener(queuesToDeclare = @Queue("myDefaultExchange"))
                public class MyDefaultListener {
                    @RabbitHandler
                    public void onMessage(String msg) {
                        System.out.println("来自 myDefaultExchange 的消息:" + msg);
                    }
                }
            2)生产者:
                @Autowired
                private RabbitTemplate rabbitTemplate;
                @GetMapping("/default")
                public String sendMsgByDefault() {
                    //参数: 队列,消息内容
                    rabbitTemplate.convertAndSend("myDefaultExchange","this is a message");
                    return "success";
                }
        (3)Fanout(扇型交换机):
            1)申明两个消费者,对应queue-one和queue-two 都与 myFanoutExchange绑定,routingKey随意
                @Component
                @RabbitListeners({
                    @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("myFanoutQueue-one"),
                        exchange = @Exchange(value = "myFanoutExchange", type = ExchangeTypes.FANOUT),
                        key = "key.one")),
                     @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("myFanoutQueue-two"),
                        exchange = @Exchange(value = "myFanoutExchange", type = ExchangeTypes.FANOUT),
                        key = "key.two")),
                })
                public class MyFanoutListener {
                    @RabbitHandler
                    public void onMessage(@Payload String msg, @Headers Map<String, Object> headers) {
                        System.out.println("来自 " + headers.get(AmqpHeaders.CONSUMER_QUEUE) + " 的消息:" + msg);
                    }
                }
            2)生产者:
                @Autowired
                private RabbitTemplate rabbitTemplate;
                @GetMapping("/fanout")
                public String sendMsgByFanout() {
                    //参数: 交换机,routingKey(随意),消息内容
                    rabbitTemplate.convertAndSend("myFanoutExchange","key.one","this is a message");
                    return "success";
                }
        (4)Topic(主题交换机):
            1)申明多个消费者: ("#": 匹配一个或多个单词,"*": 匹配一个单词)
                @Component
                public class MyTopicListener {
                    @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("province-news-queue"),
                        exchange = @Exchange(value = "news-exchange", type = ExchangeTypes.TOPIC),
                        key = "province.#"))
                    @RabbitHandler
                    public void provinceNews(String msg) {
                        System.out.println("来自省TV的消息:" + msg);
                    }
                    @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("city-news-queue"),
                        exchange = @Exchange(value = "news-exchange", type = ExchangeTypes.TOPIC),
                        key = "province.city.#"))
                    @RabbitHandler
                    public void cityNews(String msg) {
                        System.out.println("来自市TV的消息:" + msg);
                    }
                    @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("street-news-queue"),
                        exchange = @Exchange(value = "news-exchange", type = ExchangeTypes.TOPIC),
                        key = "province.city.street.*"))
                    @RabbitHandler
                    public void streetNews(String msg) {
                        System.out.println("来自街区TV的消息:" + msg);
                    }
                }
            2)生产者:
                @GetMapping("/topic")
                public String sendMsgByTopic() {
                    //模拟某人在商店买彩票中奖了
                    rabbitTemplate.convertAndSend("news-exchange","province.city.street.shop","有人中了大奖");
                    return "success";
                }
        (5)Headers(头交换机):
            1)定义两个消费者: (一个全匹配,一个任意匹配)
                @Component
                public class MyHeadListener {
                    //任意匹配
                    @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("headQueue-one"),
                        exchange = @Exchange(value = "myHeadExchange", type = ExchangeTypes.HEADERS),
                        arguments = {
                            @Argument(name = "key-one", value = "1"),
                            @Argument(name = "key-two", value = "2"),
                            @Argument(name = "x-match", value = "any")
                    }))
                    public void anyMatchOnMessage(String msg) {
                        System.out.println("来自 headQueue-one 的消息:" + msg);
                    }
                    //全匹配
                    @RabbitListener(
                        bindings = @QueueBinding(value = @Queue("headQueue-two"),
                        exchange = @Exchange(value = "myHeadExchange", type = ExchangeTypes.HEADERS),
                        arguments = {
                            @Argument(name = "key-one", value = "1"),
                            @Argument(name = "x-match", value = "all")
                    }))
                    public void allMatchOnMessage(String msg) {
                        System.out.println("来自 headQueue-two 的消息:" + msg);
                    }
                }
            2)生产者:
                @GetMapping("/head")
                public String sendMsgByHead() {
                    rabbitTemplate.convertAndSend("myHeadExchange", "", "this is a message", message -> {
                        MessageProperties properties = message.getMessageProperties();
                        properties.setHeader("key-one", "1");
                        return message;
                    });
                    return "success";
                }
    [4]手动ack:
        (1)消息确认模式:
            1)在amqp协议中消息确认有两种模式:
                ①自动确认模式(automatic acknowledgement model): 当消息代理将消息发送给应用后立即删除;
                ②显式确认模式(explicit acknowledgement model): 待应用发送一个确认回执后再删除消息;
            2)而在spring-boot-starter-amqp,spring定义了三种:
                ①NONE: 没有ack的意思,对应rabbitMQ的自动确认模式;
                ②MANUAL: 手动模式,对应rabbitMQ的显式确认模式;
                ③AUTO: 自动模式,对应rabbitMQ的显式确认模式;
            3)注意:
                ①spring-amqp中的自动模式与rabbit中的自动模式是不一样的;
                ②在spring-amqp中MANUAL 与 AUTO的关系有点类似于在spring中手动提交事务与自动提交事务的区别,
                    一个是手动发送ack,一个是在方法执行完,没有异常的情况下自动发送ack;
        (2)代码实现:
            1)设置消费者的消息确认模式:
                @Configuration
                public class ListenerConfig {
                    @Bean("myListenerFactory")
                    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
                        SimpleRabbitListenerContainerFactory containerFactory = 
                            new SimpleRabbitListenerContainerFactory();
                        containerFactory.setConnectionFactory(connectionFactory);
                        //设置消费者的消息确认模式
                        containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
                        return containerFactory;
                    }
                }
            2)手动确认/拒绝消息:
                @Component
                @RabbitListener(
                    containerFactory = "myListenerFactory",
                    bindings = @QueueBinding(value = @Queue("myManualAckQueue"),
                    exchange = @Exchange(value = "myManualAckExchange", type = ExchangeTypes.DIRECT),
                    key = "mine.manual"))
                public class MyAckListener {
                    @RabbitHandler
                    public void onMessage(@Payload String msg, 
                          @Headers Map<String, Object> headers, 
                          Channel channel) throws Exception{
                        try {
                            System.out.println(msg);
                            //消息确认,(deliveryTag,multiple是否确认所有消息)
                            channel.basicAck((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false);
                        } catch (Exception e) {
                            //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
                            channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
                            // 拒绝一条
                            // channel.basicReject();
                        }
                    }
                }
            3)设置消息拒绝策略:
                (拒绝策略是指:当消息被消费者拒绝时该如何处理,丢弃或者是重新回到队列;)
                ①在MANUAL模式下,在拒绝消息的方法中设置:
                    //消息拒绝(deliveryTag,multiple,requeue拒绝后是否重新回到队列)
                    channel.basicNack((Long) headers.get(AmqpHeaders.DELIVERY_TAG), false, false);
                ②在AUTO 模式下可通过RabbitListenerContainerFactory或是ListenerContainer设置:
                    @Bean("myListenerFactory")
                    public RabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory){
                        SimpleRabbitListenerContainerFactory containerFactory =
                            new SimpleRabbitListenerContainerFactory();
                        containerFactory.setConnectionFactory(connectionFactory);
                        //自动ack
                        containerFactory.setAcknowledgeMode(AcknowledgeMode.AUTO);
                        //拒绝策略,true回到队列 false丢弃
                        containerFactory.setDefaultRequeueRejected(false);
                        return containerFactory;
                    }
                    (注: 默认的拒绝策略是回到队列,所以,如果队列只有一个消费者的话就会产生死循环;)
            4)测试手动确认及消息拒绝:
                @GetMapping("/ack")
                public String sendAckMsg() {
                    //模拟某人在商店买彩票中奖了
                    rabbitTemplate.convertAndSend("myManualAckExchange","mine.manual","this is a message");
                    return "success";
                }
    [5]work模式(能者多劳):
        (1)默认情况: 
            有多个消费者在一个队列上,消息是公平的分发给消费者,一人一个轮着来,不考虑各消费者之间处理能力的差异,
            这可以通过设置预处理消息数(prefetchCount)缓解,或是使用work-能者多劳模式;
        (2)work-能者多劳模式: 
            1)在配置类中添加一下配置:
                @Bean("workListenerFactory")
                public RabbitListenerContainerFactory workListenerFactory(ConnectionFactory connectionFactory) {
                    SimpleRabbitListenerContainerFactory containerFactory = new SimpleRabbitListenerContainerFactory();
                    containerFactory.setConnectionFactory(connectionFactory);
                    //最大的并发的消费者数量
                    containerFactory.setMaxConcurrentConsumers(10);
                    //最小的并发消费者的数量
                    containerFactory.setConcurrentConsumers(1);
                    //消息确认机制更改为手动(MANUAL或AUTO)
                    containerFactory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
                    //work模式下公平分配(每次读取1条消息,在消费者未回执确认之前,不在进行下一条消息的投送)
                    containerFactory.setPrefetchCount(1);
                    return containerFactory;
                }
            2)定义两个消费者:(消费同一个queue上的消息)
                @Component
                public class WorkListener {
                    private volatile static AtomicInteger one = new AtomicInteger(0);
                    private volatile static AtomicInteger two = new AtomicInteger(0);
                    @RabbitListener(containerFactory = "workListenerFactory",queuesToDeclare = @Queue("workQueue"))
                    public void onMessageOne(@Payload Message message, Channel channel) throws InterruptedException, IOException {
                        Thread.sleep(300);
                        System.out.println("consumer-one 第 " + one.incrementAndGet() + " 个消息 :" + new String(message.getBody()));
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                    }

                    @RabbitListener(containerFactory = "workListenerFactory",queuesToDeclare = @Queue("workQueue"))
                    public void onMessageTwo(@Payload Message message, Channel channel) throws InterruptedException, IOException {
                        Thread.sleep(600);
                        System.out.println("consumer-two 第 " + two.incrementAndGet() + " 个消息 :" + new String(message.getBody()));
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                    }
                }
            2)生产者:
                @Autowired
                private RabbitTemplate rabbitTemplate;
                @GetMapping("/work")
                public String work() {
                    for (int i = 0; i < 66; i++) {
                        rabbitTemplate.convertAndSend("workQueue", "this is a message");
                    }
                    return "success";
                }
    [6]消息格式转换:
        rabbirMQ中的消息对应到java中对应的实体类是 org.springframework.amqp.core.Message;
        消息转换接口MessageConverter有两个主要方法toMessage 和fromMessage,将发送的内容与Message的互转;
        (1)SimpleMessageConverter:(spring中默认使用)
            1)转化方法toMessage: 根据 object instanceof xxx 转化,自行查看源码;
            2)转化方法fromMessage: 根据MessageProperties的ContentType转换,自行查看源码;
        (2)Jackson2JsonMessageConverter: (常用的将object与json互转)
            1)生产者:
                //①直接设置转化器
                @Autowired
                private RabbitTemplate rabbitTemplate;
                @GetMapping("/messageConverter")
                private String messageConverter() {
                    //实际项目不建议这么干,spring 默认单例模式,
                    //所以最好自己构建一个"jasonRabbitTemplate",使用时注入
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    rabbitTemplate.convertAndSend("jsonQueue", new Student("zhangSan",15,"男"));
                    return "success";
                }
                //②在配置类中设置转化器:
                //配置类中定义支持消息转化的rabbitTemplate
                @Bean("jasonTemplate")
                public RabbitTemplate jasonRabbitTemplate(ConnectionFactory connectionFactory) {
                    RabbitTemplate rabbitTemplate = new RabbitTemplate();
                    rabbitTemplate.setConnectionFactory(connectionFactory);
                    //设置转化类
                    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
                    return rabbitTemplate;
                }
                //在发送消息处使用支持消息转化的rabbitTemplate
                @Qualifier("jasonTemplate")
                @Autowired
                private RabbitTemplate jasonRabbitTemplate;
                @GetMapping("/messageConverter")
                private String messageConverter() {
                     jasonRabbitTemplate.convertAndSend("jsonQueue", new Student("zhangSan",15,"男"));
                }
            2)消费者:
                //在配置类中设置支持消息转化的工厂类
                @Bean("jsonListenerFactory")
                public RabbitListenerContainerFactory jsonListenerFactory(ConnectionFactory connectionFactory){
                    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
                    factory.setConnectionFactory(connectionFactory);
                    //设置消费者接收时的消息转化类
                    factory.setMessageConverter(new Jackson2JsonMessageConverter());
                    return factory;
                }
                //消费类中使用支持消息转化的工厂类
                @Component
                @RabbitListener(containerFactory = "jsonListenerFactory",
                    queuesToDeclare = @Queue("jsonQueue"))
                public class JasonListener {
                    @RabbitHandler
                    public void onMessage(Student student) {
                        System.out.println(student);
                    }
                }
    [7]延迟队列:(基于消息TTL与死信交换或基于插件)
        (1)死信队列与延迟队列: 
            1)死信队列: 用来保存处理失败或者过期的消息,确保消息不被丢失以便排查问题;
            2)延迟队列: 顾名思义就是消息在队列中存在一定时间后再被消费;(如订单超时自动取消)
        (2)基于消息TTL与死信交换:
            1)死信交换: 为队列设置一个死信exchange和routingKey,当队列上产生死信时,死信会被投递到设置好的exchange及对应的routingKey;
            2)产生死信的方式:
                ①被拒绝,并且拒绝后没有重新进入队列的消息;
                ②消息TTL(Time To Live)过期的消息;
                ③超过队列长度而被删除的消息;
            3)思路:
                ①为消息设置过期时间;
                ②生产者发送消息时,把消息投递到一个没有消费者的队列(mine.ttl.queue),使消息成为死信;
                ③为队列(mine.ttl.queue)设置死信交换(mine.dead.letter.exchange);
                ④为死信交换设置死信队列(mine.dead.letter.queue),并添加消费者,监听消息;
            4)代码实现:
                //编写配置类:
                @Configuration
                public class TtlConfig {
                    @Bean("myTtlExchange")
                    public DirectExchange ttlExchange() {
                        return new DirectExchange("mine.ttl.exchange");
                    }
                    //定义队列 绑定死信队列(其实是绑定的交换器,然后通过交换器路由键绑定队列) 设置过期时间
                    @Bean("myTtlQueue")
                    public Queue ttlQueue() {
                        Map<String, Object> args = new HashMap<>(3);
                        //声明死信交换器
                        args.put("x-dead-letter-exchange", "mine.dead.letter.exchange");
                        //声明死信路由键
                        args.put("x-dead-letter-routing-key", "mine.dead.letter.key");
                        //可在发送消息时设置过期时间,也可在配置类中设置整个队列的过期时间,两个都设置以最早过期时间为准
                        //声明队列消息过期时间
                        args.put("x-message-ttl", 10000);
                        return new Queue("mine.ttl.queue", true, false, false, args);
                    }
                    //队列绑定
                    @Bean
                    @DependsOn({"myTtlExchange", "myTtlQueue"})
                    public Binding bindingOrderDirect(Queue myTtlQueue, DirectExchange myTtlExchange) {
                        return BindingBuilder.bind(myTtlQueue).to(myTtlExchange).with("mine.ttl.key");
                    }
                }
                //消费者
                @Component
                public class MyDeadLetterListener {
                    @RabbitListener(
                        containerFactory = "jsonListenerFactory",
                        bindings = {@QueueBinding(
                            value = @Queue(value = "mine.dead.letter.queue"),
                            exchange = @Exchange(value = "mine.dead.letter.exchange"),
                            key = {"mine.dead.letter.key"})})
                    public void getDLMessage(Student user) {
                        LocalDateTime now = LocalDateTime.now();
                        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                        // 模拟执行任务
                        System.out.println(now.format(dateTimeFormatter) +" 延迟队列之消费消息:" + user.toString() );
                    }
                }
                //生产者
                @Qualifier("jasonTemplate")
                @Autowired
                private RabbitTemplate jasonRabbitTemplate;
                @GetMapping("/ttlMsg")
                public List<Student> directDelayMQ() {
                    LocalDateTime now = LocalDateTime.now();
                    DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                    List<Student> students = new ArrayList<>();
                    students.add(new Student("张三", 20, "男"));
                    students.add(new Student("李四", 24, "男"));
                    students.add(new Student("王五", 21, "女"));
                    for (Student student : students) {
                        jasonRabbitTemplate.convertAndSend("mine.ttl.exchange", "mine.ttl.key", student,
                            message -> {
                                 //可在发送消息时设置过期时间,也可在配置类中设置整个队列的过期时间,两个都设置以最早过期时间为准
                                message.getMessageProperties().setExpiration("15000");
                                return message;
                        });
                        System.out.println(now.format(dateTimeFormatter) + " 消息发送:"+student.toString());
                    }
                    return students;
                }
        (3)基于rabbitmq-delayed-message-exchange插件:
            1)下载插件: https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
            2)选择与RabbitMQ版本匹配的版本,解压成.ez的文件,放到RabbitMQ安装目录的plugins文件夹下;
            3)停止服务器,开启插件,启动服务器: (插件前有[E*]表示插件启用了)
                开启插件: rabbitmq-plugins enable rabbitmq_delayed_message_exchange
                关闭插件: rabbitmq-plugins disable rabbitmq_delayed_message_exchange
                查看插件: rabbitmq-plugins list
            4)消费者:
                @Component
                public class MyPluginListener {
                    @RabbitListener(
                        containerFactory = "jsonListenerFactory",
                        bindings = {@QueueBinding(
                            value = @Queue(value = "mine.plugin.delay.queue"),
                            exchange = @Exchange(
                                value = "mine.plugin.delay.exchange",
                                type = "x-delayed-message",
                                arguments = {@Argument(name="x-delayed-type",value = ExchangeTypes.DIRECT)}),
                            key = {"mine.plugin.key"})})
                    public void getPDLMessage(Student user) {
                        LocalDateTime now = LocalDateTime.now();
                        DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
                        // 模拟执行任务
                        System.out.println(now.format(dateTimeFormatter) +" 延迟队列之消费消息:" + user.toString() );
                    }
                }