Skip to content

Commit

Permalink
增加 spring cloud alibaba rocketmq 消息队列的示例
Browse files Browse the repository at this point in the history
  • Loading branch information
YunaiV committed Feb 20, 2020
1 parent d4c80a1 commit 73605ed
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 0 deletions.
66 changes: 66 additions & 0 deletions labx-06/labx-06-sca-stream-rocketmq-producer-transaction/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>labx-06</artifactId>
<groupId>cn.iocoder.springboot.labs</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>labx-06-sca-stream-rocketmq-producer-transaction</artifactId>

<properties>
<maven.compiler.target>1.8</maven.compiler.target>
<maven.compiler.source>1.8</maven.compiler.source>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<spring.cloud.version>Hoxton.SR1</spring.cloud.version>
<spring.cloud.alibaba.version>2.2.0.RELEASE</spring.cloud.alibaba.version>
</properties>

<!--
引入 Spring Boot、Spring Cloud、Spring Cloud Alibaba 三者 BOM 文件,进行依赖版本的管理,防止不兼容。
在 https://dwz.cn/mcLIfNKt 文章中,Spring Cloud Alibaba 开发团队推荐了三者的依赖关系
-->
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring.cloud.alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<!-- 引入 SpringMVC 相关依赖,并实现对其的自动配置 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 引入 Spring Cloud Alibaba Stream RocketMQ 相关依赖,将 RocketMQ 作为消息队列,并实现对其的自动配置 -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo;

import cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.message.MySource;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;

@SpringBootApplication
@EnableBinding(MySource.class)
public class ProducerApplication {

public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.controller;

import cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.message.Demo01Message;
import cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.message.MySource;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.common.message.MessageConst;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Random;

@RestController
@RequestMapping("/demo01")
public class Demo01Controller {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private MySource mySource;

@GetMapping("/send")
public boolean send() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.build();
// 发送消息
return mySource.demo01Output().send(springMessage);
}

@GetMapping("/send_delay")
public boolean sendDelay() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_DELAY_TIME_LEVEL, "3") // 设置延迟级别为 3,10 秒后消费。
.build();
// 发送消息
boolean sendResult = mySource.demo01Output().send(springMessage);
logger.info("[sendDelay][发送消息完成, 结果 = {}]", sendResult);
return sendResult;
}

@GetMapping("/send_tag")
public boolean sendTag() {
for (String tag : new String[]{"yunai", "yutou", "tudou"}) {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader(MessageConst.PROPERTY_TAGS, tag) // 设置 Tag
.build();
// 发送消息
mySource.demo01Output().send(springMessage);
}
return true;
}

@GetMapping("/send_transaction")
public boolean sendTransaction() {
// 创建 Message
Demo01Message message = new Demo01Message()
.setId(new Random().nextInt());
// 创建 Spring Message 对象
Args args = new Args().setArgs1(1).setArgs2("2");
Message<Demo01Message> springMessage = MessageBuilder.withPayload(message)
.setHeader("args", JSON.toJSONString(args))
.build();
// 发送消息
return mySource.demo01Output().send(springMessage);
}

public static class Args {

private Integer args1;

private String args2;

public Integer getArgs1() {
return args1;
}

public Args setArgs1(Integer args1) {
this.args1 = args1;
return this;
}

public String getArgs2() {
return args2;
}

public Args setArgs2(String args2) {
this.args2 = args2;
return this;
}

@Override
public String toString() {
return "Args{" +
"args1=" + args1 +
", args2='" + args2 + '\'' +
'}';
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.listener;

import cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.controller.Demo01Controller;
import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;

@RocketMQTransactionListener(txProducerGroup = "test")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {

private Logger logger = LoggerFactory.getLogger(getClass());

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 从消息 Header 中解析到 args 参数
Demo01Controller.Args args = JSON.parseObject(msg.getHeaders().get("args", String.class),
Demo01Controller.Args.class);
// ... local transaction process, return rollback, commit or unknown
logger.info("[executeLocalTransaction][执行本地事务,消息:{} args:{}]", msg, args);
return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// ... check transaction status and return rollback, commit or unknown
logger.info("[checkLocalTransaction][回查消息:{}]", msg);
return RocketMQLocalTransactionState.COMMIT;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.message;

/**
* 示例 01 的 Message 消息
*/
public class Demo01Message {

/**
* 编号
*/
private Integer id;

public Demo01Message setId(Integer id) {
this.id = id;
return this;
}

public Integer getId() {
return id;
}

@Override
public String toString() {
return "Demo01Message{" +
"id=" + id +
'}';
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cn.iocoder.springcloudalibaba.labx6.rocketmqdemo.producerdemo.message;

import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;

public interface MySource {

@Output("demo01-output")
MessageChannel demo01Output();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
spring:
application:
name: demo-producer-application
cloud:
# Spring Cloud Stream 配置项,对应 BindingServiceProperties 类
stream:
# Binding 配置项,对应 BindingProperties Map
bindings:
demo01-output:
destination: DEMO-TOPIC-01 # 目的地。这里使用 RocketMQ Topic
content-type: application/json # 内容格式。这里使用 JSON
# Spring Cloud Stream RocketMQ 配置项
rocketmq:
# RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类
binder:
name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址
# RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map
bindings:
demo01-output:
# RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类
producer:
group: test # 生产者分组
sync: true # 是否同步发送消息,默认为 false 异步。
transactional: true # 是否发送事务消息,默认为 false。

server:
port: 18080
2 changes: 2 additions & 0 deletions labx-06/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
<module>labx-06-sca-stream-rocketmq-consumer-orderly</module>

<module>labx-06-sca-stream-rocketmq-consumer-filter</module>

<module>labx-06-sca-stream-rocketmq-producer-transaction</module>
</modules>


Expand Down

0 comments on commit 73605ed

Please sign in to comment.