Skip to content

Commit

Permalink
相关开发
Browse files Browse the repository at this point in the history
  • Loading branch information
yu.xiao committed Dec 12, 2017
1 parent 719726f commit 8391ce5
Show file tree
Hide file tree
Showing 58 changed files with 1,459 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,12 @@ public class CoordinatorRepositoryAdapter {
private String targetMethod;


/**
* 错误信息
*/
private String errorMsg;




}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

Expand All @@ -28,6 +29,7 @@
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MythParticipant implements Serializable {

private static final long serialVersionUID = -2590970715288987627L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public class MythTransaction implements Serializable {
*/
private String targetMethod;

/**
* 调用错误信息
*/
private String errorMsg;

/**
* 参与协调的方法集合
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,13 @@ public Boolean submit(CoordinatorAction coordinatorAction) {
@Override
public Boolean processMessage(byte[] message) {
try {
final MessageEntity entity =
serializer.deSerialize(message, MessageEntity.class);
MessageEntity entity;
try {
entity = serializer.deSerialize(message, MessageEntity.class);
} catch (MythException e) {
e.printStackTrace();
return Boolean.FALSE;
}
/*
* 1 检查该事务有没被处理过,已经处理过的 则不处理
* 2 发起发射调用,调用接口,进行处理
Expand Down Expand Up @@ -251,17 +256,13 @@ public Boolean processMessage(byte[] message) {
//会进入LocalMythTransactionHandler 那里有保存

} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
throw new MythRuntimeException(e.getMessage());
} finally {
TransactionContextLocal.getInstance().remove();
}
}


} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
} finally {
LOCK.unlock();
}
Expand Down Expand Up @@ -296,11 +297,14 @@ public Boolean sendMessage(MythTransaction mythTransaction) {
getMythMqSendService().sendMessage(mythParticipant.getDestination(),
mythParticipant.getPattern(),
message);
} catch (MythException e) {
} catch (Exception e) {
e.printStackTrace();
return Boolean.FALSE;
}
}
//这里为什么要这么做呢? 主要是为了防止在极端情况下,发起者执行过程中,突然自身down 机
//造成消息未发送,新增一个状态标记,如果出现这种情况,通过定时任务发送消息
this.updateStatus(mythTransaction.getTransId(), MythStatusEnum.COMMIT.getCode());
}
return Boolean.TRUE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ public static String buildCreateTableSql(String driverClassName, String tableNam
.append(" `status` int(2) NOT NULL,\n")
.append(" `invocation` longblob,\n")
.append(" `role` int(2) NOT NULL,\n")
.append(" PRIMARY KEY (`trans_id`)\n")
.append(" KEY `status_last_time` (`last_time`,`status`) USING BTREE \n")
.append(" `error_msg` varchar(1000) ,\n")
.append(" PRIMARY KEY (`trans_id`),\n")
.append(" KEY `status_last_time` (`last_time`,`status`) USING BTREE \n")
.append(")");
break;

Expand All @@ -65,6 +66,7 @@ public static String buildCreateTableSql(String driverClassName, String tableNam
.append(" `status` int(2) NOT NULL,\n")
.append(" `invocation` BLOB ,\n")
.append(" `role` int(2) NOT NULL,\n")
.append(" `error_msg` varchar(1000) ,\n")
.append(" PRIMARY KEY (`trans_id`)\n")
.append(")");
break;
Expand All @@ -84,6 +86,7 @@ public static String buildCreateTableSql(String driverClassName, String tableNam
.append(" `status` int(2) NOT NULL,\n")
.append(" `invocation` varbinary ,\n")
.append(" `role` int(2) NOT NULL,\n")
.append(" `error_msg` varchar(1000) ,\n")
.append(" PRIMARY KEY (`trans_id`)\n")
.append(")");
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTran
//发起调用 执行try方法
final Object proceed = point.proceed();

//执行成功 记录日志信息,不成功就不记录,通过mq来执行
//执行成功 记录日志信息,通过mq来执行

mythTransactionManager.saveTransaction(point, mythTransactionContext);
mythTransactionManager.commitTransaction(point, mythTransactionContext);

return proceed;

} catch (Throwable throwable) {
mythTransactionManager.failureTransaction(point, mythTransactionContext.getTransId(), throwable.getMessage());
throw throwable;
} finally {
LOCK.unlock();
TransactionContextLocal.getInstance().remove();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTran

final Object proceed = point.proceed();

mythTransactionManager.saveTransaction(point, mythTransactionContext);
mythTransactionManager.commitTransaction(point, mythTransactionContext);

return proceed;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,27 +69,16 @@ public StartMythTransactionHandler(MythTransactionManager mythTransactionManager
public Object handler(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) throws Throwable {

try {
MythTransaction begin;
//主要防止并发问题,对事务日志的写造成压力,加了锁进行处理
try {
LOCK.lock();
begin = mythTransactionManager.begin(point);
mythTransactionManager.begin(point);
} finally {
LOCK.unlock();
}
//发起调用 执行try方法
final Object proceed = point.proceed();

try {
//这里为什么要这么做呢? 主要是为了防止在极端情况下,发起者执行过程中,突然自身down 机
//造成消息未发送,新增一个状态标记,如果出现这种情况,通过定时任务发送消息
LOCK.lock();
mythTransactionManager.updateStatus(begin.getTransId(), MythStatusEnum.COMMIT.getCode());
} finally {
LOCK.unlock();
}

return proceed;
return point.proceed();

} finally {
//发送消息
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,7 @@ public MythTransaction begin(ProceedingJoinPoint point) {
}


public MythTransaction saveTransaction(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) {
LogUtil.debug(LOGGER, () -> "开始执行 参与者分布式事务!start");
public MythTransaction commitTransaction(ProceedingJoinPoint point, MythTransactionContext mythTransactionContext) {
MythTransaction mythTransaction = new MythTransaction(mythTransactionContext.getTransId());

MethodSignature signature = (MethodSignature) point.getSignature();
Expand All @@ -140,6 +139,23 @@ public MythTransaction saveTransaction(ProceedingJoinPoint point, MythTransactio

}

public void failureTransaction(ProceedingJoinPoint point, String transId,String errorMessage) {
MythTransaction mythTransaction = new MythTransaction(transId);

MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();

Class<?> clazz = point.getTarget().getClass();

mythTransaction.setStatus(MythStatusEnum.FAILURE.getCode());
mythTransaction.setRole(MythRoleEnum.PROVIDER.getCode());
mythTransaction.setTargetClass(clazz.getName());
mythTransaction.setTargetMethod(method.getName());
mythTransaction.setErrorMsg(errorMessage);
//保存当前事务信息
coordinatorCommand.execute(new CoordinatorAction(CoordinatorActionEnum.SAVE, mythTransaction));
}


public void sendMessage() {
MythTransaction mythTransaction = getCurrentTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ public int create(MythTransaction mythTransaction) {
StringBuilder sql = new StringBuilder()
.append("insert into ")
.append(tableName)
.append("(trans_id,target_class,target_method,retried_count,create_time,last_time,version,status,invocation,role)")
.append(" values(?,?,?,?,?,?,?,?,?,?)");
.append("(trans_id,target_class,target_method,retried_count,create_time,last_time,version,status,invocation,role,error_msg)")
.append(" values(?,?,?,?,?,?,?,?,?,?,?)");
try {

final byte[] serialize = serializer.serialize(mythTransaction.getMythParticipants());
Expand All @@ -88,7 +88,8 @@ public int create(MythTransaction mythTransaction) {
mythTransaction.getVersion(),
mythTransaction.getStatus(),
serialize,
mythTransaction.getRole());
mythTransaction.getRole(),
mythTransaction.getErrorMsg());

} catch (MythException e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public int create(MythTransaction mythTransaction) {
mongoBean.setTargetMethod(mythTransaction.getTargetMethod());
final byte[] cache = objectSerializer.serialize(mythTransaction.getMythParticipants());
mongoBean.setContents(cache);
mongoBean.setErrorMsg(mythTransaction.getErrorMsg());
template.save(mongoBean, collectionName);
return CommonConstant.SUCCESS;
} catch (MythException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.github.myth.annotation.Myth;
import com.github.myth.demo.dubbo.account.api.dto.AccountDTO;
import com.github.myth.demo.dubbo.account.api.entity.AccountDO;

/**
* @author xiaoyu
Expand All @@ -36,4 +37,12 @@ public interface AccountService {
*/
@Myth(destination = "account")
boolean payment(AccountDTO accountDTO);


/**
* 获取用户资金信息
* @param userId 用户id
* @return AccountDO
*/
AccountDO findByUserId(Integer userId);
}
28 changes: 26 additions & 2 deletions myth-demo/myth-demo-dubbo/myth-demo-dubbo-account/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,22 @@
<groupId>com.github.myth</groupId>
<artifactId>myth-dubbo</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>



<!-- RocketMQ 客户端相关依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-common -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-common</artifactId>
</dependency>

<!--spring boot的核心启动器-->
<dependency>
Expand All @@ -59,6 +72,17 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.3.0</version>
</dependency>
<!--自动配置-->
<dependency>
<groupId>org.springframework.boot</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

Expand All @@ -14,18 +15,19 @@
* @since JDK 1.8
*/
@Component
public class AmqConsumer {
@ConditionalOnProperty(prefix = "spring.activemq", name = "broker-url")
public class ActivemqConsumer {

/**
* logger
*/
private static final Logger LOGGER = LoggerFactory.getLogger(AmqConsumer.class);
private static final Logger LOGGER = LoggerFactory.getLogger(ActivemqConsumer.class);


private final MythMqReceiveService mythMqReceiveService;

@Autowired
public AmqConsumer(MythMqReceiveService mythMqReceiveService) {
public ActivemqConsumer(MythMqReceiveService mythMqReceiveService) {
this.mythMqReceiveService = mythMqReceiveService;
}

Expand Down
Loading

0 comments on commit 8391ce5

Please sign in to comment.