Skip to content

Commit

Permalink
分布式事务-阿里云MQ实现
Browse files Browse the repository at this point in the history
  • Loading branch information
刘正航 committed Mar 14, 2018
1 parent df0f995 commit 67410e7
Show file tree
Hide file tree
Showing 24 changed files with 433 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,15 @@

/**
* The destination name for this listener, resolved through the container-wide
* 消息队列的唯一标识(在rocketmq或者aliyunmq中是topic)
*/
String destination();

/**
* rocketmq特有的tag区分方式,tags的值需要完全满足rocketmq规则
* @return
*/
String tags() default "";

/**
* 目标接口类
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class MythParticipant implements Serializable {


/**
* 队列
* 队列(TOPIC,如果是rocketmq或者aliyunmq,这里包含TOPIC和TAG),用,区分
*/
private String destination;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.github.myth.common.bean.entity.MythTransaction;
import com.github.myth.common.config.MythConfig;
import com.github.myth.common.config.MythDbConfig;
import com.github.myth.common.enums.MythStatusEnum;
import com.github.myth.common.enums.RepositorySupportEnum;
import com.github.myth.common.exception.MythException;
import com.github.myth.common.exception.MythRuntimeException;
Expand Down Expand Up @@ -228,7 +229,7 @@ public MythTransaction findByTransId(String transId) {
public List<MythTransaction> listAllByDelay(Date date) {
String sb = "select * from " +
tableName +
" where last_time <? and status = 2";
" where last_time <? and status = " + MythStatusEnum.BEGIN.getCode();

List<Map<String, Object>> list = executeQuery(sb, date);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public interface AccountService {
* @param accountDTO 参数dto
* @return true
*/
@Myth(destination = "account")
@Myth(destination = "ORDER_FLOW_BQ",tags = "account")
boolean payment(AccountDTO accountDTO);


Expand Down
5 changes: 5 additions & 0 deletions myth-demo/myth-demo-dubbo/myth-demo-dubbo-account/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
<artifactId>rocketmq-common</artifactId>
</dependency>

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
</dependency>

<!--spring boot的核心启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package com.github.myth.demo.dubbo.account.mq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.github.myth.core.service.MythMqReceiveService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.util.Properties;

/**
* <p>Description: .</p>
*
* @author xiaoyu(Myth)
* @version 1.0
* @date 2017/12/12 14:29
* @since JDK 1.8
*/
@Configuration
@ConditionalOnProperty(prefix = "spring.aliyunmq", name = "broker-url")
public class AliyunmqConsumer {


private static final String TAG = "account";

@Autowired
private Environment env;

@Autowired
private MythMqReceiveService mythMqReceiveService;

@Bean
public Consumer pushConsumer() throws MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ConsumerId,env.getProperty("spring.aliyunmq.consumerId"));
properties.setProperty(PropertyKeyConst.AccessKey,env.getProperty("spring.aliyunmq.accessKey"));
properties.setProperty(PropertyKeyConst.SecretKey,env.getProperty("spring.aliyunmq.secretKey"));
properties.setProperty(PropertyKeyConst.ONSAddr,env.getProperty("spring.aliyunmq.broker-url"));

Consumer consumer = ONSFactory.createConsumer(properties);

String topic = env.getProperty("spring.aliyunmq.topic");
consumer.subscribe(topic, TAG, (message, consumeContext) -> {
try {
final byte[] body = message.getBody();
mythMqReceiveService.processMessage(body);
return Action.CommitMessage;
}catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
});

/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();

return consumer;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.github.myth.demo.dubbo.account.mq;

import com.aliyun.openservices.ons.api.Consumer;
import com.github.myth.common.config.MythConfig;
import com.github.myth.core.service.MythMqReceiveService;

Expand Down Expand Up @@ -30,7 +31,7 @@
public class RocketmqConsumer {


private static final String TOPIC = "account";
private static final String TAGS = "account";

@Autowired
private Environment env;
Expand All @@ -51,16 +52,16 @@ public DefaultMQPushConsumer pushConsumer() throws MQClientException {
new DefaultMQPushConsumer(env.getProperty("spring.rocketmq.consumerGroupName"));
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr(env.getProperty("spring.rocketmq.namesrvAddr"));
consumer.setInstanceName(env.getProperty("spring.rocketmq.instanceName"));
// consumer.setInstanceName(env.getProperty("spring.rocketmq.instanceName"));
//设置批量消费,以提升消费吞吐量,默认是1
consumer.setConsumeMessageBatchMaxSize(1);
//RECONSUME_LATER的重试次数,RocketMQ默认是16次
consumer.setMaxReconsumeTimes(mythConfig.getRetryMax());

/**
* 订阅指定topic下tags
*/
consumer.subscribe(TOPIC, TOPIC);
String topic = env.getProperty("spring.rocketmq.topic");
consumer.subscribe(topic, TAGS);

consumer.registerMessageListener((List<MessageExt> msgList, ConsumeConcurrentlyContext context) -> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public AccountServiceImpl(AccountMapper accountMapper) {
* @return true
*/
@Override
@Myth(destination = "account")
@Myth(destination = "ORDER_FLOW_BQ",tags = "account")
@Transactional(rollbackFor = Exception.class)
public boolean payment(AccountDTO accountDTO) {
final AccountDO accountDO = accountMapper.findByUserId(accountDTO.getUserId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ spring:
# port: 5672
# username: guest
# password: guest
rocketmq:
namesrvAddr: localhost:9876
consumerGroupName: account
instanceName: account
aliyunmq:
broker-url: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
accessKey: LTAI44DGK0oAutoj
secretKey: RBXTnPSVxHYfrn2IFFoVtgExZdw1M2
topic: ORDER_FLOW_BQ
consumerId: CID_ORDER_FLOW_BQ_ACCOUNT
# rocketmq:
# namesrvAddr: localhost:9876
# consumerGroupName: CID_ORDER_FLOW_BQ_ACCOUNT
# instanceName: ORDER_FLOW_BQ_CONSUMER
# topic: ORDER_FLOW_BQ
#kafka:
# consumer:
# bootstrap-servers: localhost:9092
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public interface InventoryService {
* @param inventoryDTO 库存DTO对象
* @return true
*/
@Myth(destination = "inventory")
@Myth(destination = "ORDER_FLOW_BQ",tags = "inventory")
Boolean decrease(InventoryDTO inventoryDTO);


Expand Down
5 changes: 5 additions & 0 deletions myth-demo/myth-demo-dubbo/myth-demo-dubbo-inventory/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@
<artifactId>rocketmq-common</artifactId>
</dependency>

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
</dependency>

<!--spring boot的核心启动器-->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package com.github.myth.demo.dubbo.inventory.mq;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.shade.com.alibaba.rocketmq.client.exception.MQClientException;
import com.github.myth.core.service.MythMqReceiveService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;

import java.util.Properties;

/**
* <p>Description: .</p>
*
* @author xiaoyu(Myth)
* @version 1.0
* @date 2017/12/12 14:29
* @since JDK 1.8
*/
@Configuration
@ConditionalOnProperty(prefix = "spring.aliyunmq", name = "broker-url")
public class AliyunmqConsumer {


private static final String TAG = "inventory";

@Autowired
private Environment env;

@Autowired
private MythMqReceiveService mythMqReceiveService;

@Bean
public Consumer pushConsumer() throws MQClientException {
/**
* 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
* 注意:ConsumerGroupName需要由应用来保证唯一
*/
Properties properties = new Properties();
properties.setProperty(PropertyKeyConst.ConsumerId,env.getProperty("spring.aliyunmq.consumerId"));
properties.setProperty(PropertyKeyConst.AccessKey,env.getProperty("spring.aliyunmq.accessKey"));
properties.setProperty(PropertyKeyConst.SecretKey,env.getProperty("spring.aliyunmq.secretKey"));
properties.setProperty(PropertyKeyConst.ONSAddr,env.getProperty("spring.aliyunmq.broker-url"));

Consumer consumer = ONSFactory.createConsumer(properties);

String topic = env.getProperty("spring.aliyunmq.topic");
consumer.subscribe(topic, TAG, (message, consumeContext) -> {
try {
final byte[] body = message.getBody();
mythMqReceiveService.processMessage(body);
return Action.CommitMessage;
}catch (Exception e) {
//消费失败
return Action.ReconsumeLater;
}
});

/**
* Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
*/
consumer.start();

return consumer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
public class RocketmqConsumer {


private static final String QUEUE = "inventory";
private static final String TAGS = "inventory";

@Autowired
private Environment env;
Expand All @@ -51,16 +51,16 @@ public DefaultMQPushConsumer pushConsumer() throws MQClientException {
new DefaultMQPushConsumer(env.getProperty("spring.rocketmq.consumerGroupName"));
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.setNamesrvAddr(env.getProperty("spring.rocketmq.namesrvAddr"));
consumer.setInstanceName(env.getProperty("spring.rocketmq.instanceName"));
// consumer.setInstanceName(env.getProperty("spring.rocketmq.instanceName"));
//设置批量消费,以提升消费吞吐量,默认是1
consumer.setConsumeMessageBatchMaxSize(2);
//RECONSUME_LATER的重试次数,RocketMQ默认是16次
consumer.setMaxReconsumeTimes(mythConfig.getRetryMax());

/**
* 订阅指定topic下tags
*/
consumer.subscribe(QUEUE, QUEUE);
String topic = env.getProperty("spring.rocketmq.topic");
consumer.subscribe(topic, TAGS);

consumer.registerMessageListener((List<MessageExt> msgList,
ConsumeConcurrentlyContext context) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public InventoryServiceImpl(InventoryMapper inventoryMapper) {
* @return true
*/
@Override
@Myth(destination = "inventory")
@Myth(destination = "ORDER_FLOW_BQ",tags = "inventory")
@Transactional(rollbackFor = Exception.class)
public Boolean decrease(InventoryDTO inventoryDTO) {
final Inventory entity = inventoryMapper.findByProductId(inventoryDTO.getProductId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ spring:
# port: 5672
# username: guest
# password: guest
rocketmq:
namesrvAddr: localhost:9876
consumerGroupName: inventory
instanceName: inventory
aliyunmq:
broker-url: http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet
accessKey: LTAI44DGK0oAutoj
secretKey: RBXTnPSVxHYfrn2IFFoVtgExZdw1M2
topic: ORDER_FLOW_BQ
consumerId: CID_ORDER_FLOW_BQ
# rocketmq:
# namesrvAddr: localhost:9876
# consumerGroupName: CID_ORDER_FLOW_BQ_INVENTORY
# instanceName: ORDER_FLOW_BQ_CONSUMER
# topic: ORDER_FLOW_BQ
#kafka:
# consumer:
# bootstrap-servers: localhost:9092
Expand Down
9 changes: 9 additions & 0 deletions myth-demo/myth-demo-dubbo/myth-demo-dubbo-order/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
<artifactId>myth-rocketmq</artifactId>
</dependency>

<dependency>
<groupId>com.github.myth</groupId>
<artifactId>myth-aliyunmq</artifactId>
</dependency>

<!--spring boot的核心启动器-->
<dependency>
Expand Down Expand Up @@ -174,6 +178,11 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.aliyun.openservices</groupId>
<artifactId>ons-client</artifactId>
</dependency>

</dependencies>


Expand Down
Loading

0 comments on commit 67410e7

Please sign in to comment.