Skip to content

Commit

Permalink
Spring Cloud Stream消费失败后的处理策略(四):重新入队(RabbitMQ)
Browse files Browse the repository at this point in the history
  • Loading branch information
dyc87112 committed Dec 16, 2018
1 parent a296e0c commit 8f5f3d8
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 0 deletions.
5 changes: 5 additions & 0 deletions 4-Finchley/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
<module>stream-exception-handler-2</module>
<!-- 使用DLQ队列(RabbitMQ) -->
<module>stream-exception-handler-3</module>
<!-- 重入队列(RabbitMQ) -->
<module>stream-exception-handler-4</module>

<!-- @StreamListener根据内容路由 -->
<module>stream-content-route</module>

</modules>

Expand Down
66 changes: 66 additions & 0 deletions 4-Finchley/stream-exception-handler-4/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.didispace</groupId>
<artifactId>stream-exception-handler-4</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Finchley.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.didispace.stream;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;


@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {

public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}

@RestController
static class TestController {

@Autowired
private TestTopic testTopic;

/**
* 消息生产接口
*
* @param message
* @return
*/
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}

}

/**
* 消息消费逻辑
*/
@Slf4j
@Component
static class TestListener {

private int count = 1;

@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload + ", " + count);
throw new RuntimeException("Message consumer failed!");

// 进入DLQ的逻辑
// if (count == 3) {
// count = 1;
// throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
// } else {
// count ++;
// throw new RuntimeException("Message consumer failed!");
// }

}

}

interface TestTopic {

String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";

@Output(OUTPUT)
MessageChannel output();

@Input(INPUT)
SubscribableChannel input();

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
spring.application.name=stream-exception-handler-4
server.port=8080

spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1

spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true

spring.cloud.stream.bindings.example-topic-output.destination=test-topic

0 comments on commit 8f5f3d8

Please sign in to comment.