Skip to content

Commit

Permalink
Saga implementation base on state machine (apache#1608)
Browse files Browse the repository at this point in the history
  • Loading branch information
long187 authored and zhangthen committed Oct 11, 2019
1 parent 42cc591 commit 6864338
Show file tree
Hide file tree
Showing 217 changed files with 18,345 additions and 71 deletions.
41 changes: 41 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,38 @@
<version>${project.version}</version>
</dependency>

<!-- saga -->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-processctrl</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-statelang</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-engine</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-rm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-tm</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-saga-engine-store</artifactId>
<version>${project.version}</version>
</dependency>

<!-- spring -->
<dependency>
<groupId>org.springframework</groupId>
Expand Down Expand Up @@ -420,6 +452,7 @@
<artifactId>protostuff-runtime</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
Expand Down Expand Up @@ -545,6 +578,14 @@
<include>io.seata:seata-tm</include>
<include>io.seata:seata-codec-seata</include>
<include>io.seata:seata-codec-protobuf</include>
<!-- saga -->
<include>io.seata:seata-saga-processctrl</include>
<include>io.seata:seata-saga-statelang</include>
<include>io.seata:seata-saga-engine</include>
<include>io.seata:seata-saga-rm</include>
<include>io.seata:seata-saga-tm</include>
<include>io.seata:seata-saga-engine-store</include>

<include>io.seata:seata-codec-kryo</include>
</includes>
</artifactSet>
Expand Down
3 changes: 3 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@
<maven.compiler.target>1.8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<protobuf.version>3.7.1</protobuf.version>

<junit.version>4.12</junit.version>

<kryo.version>4.0.2</kryo.version>
<kryo-serializers.version>0.42</kryo-serializers.version>
</properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import io.seata.core.protocol.transaction.GlobalCommitResponse;
import io.seata.core.protocol.transaction.GlobalLockQueryRequest;
import io.seata.core.protocol.transaction.GlobalLockQueryResponse;
import io.seata.core.protocol.transaction.GlobalReportRequest;
import io.seata.core.protocol.transaction.GlobalReportResponse;
import io.seata.core.protocol.transaction.GlobalRollbackRequest;
import io.seata.core.protocol.transaction.GlobalRollbackResponse;
import io.seata.core.protocol.transaction.GlobalStatusRequest;
Expand Down Expand Up @@ -160,6 +162,8 @@ public Kryo create() {
kryo.register(GlobalStatusRequest.class);
kryo.register(GlobalStatusResponse.class);
kryo.register(UndoLogDeleteRequest.class);
kryo.register(GlobalReportRequest.class);
kryo.register(GlobalReportResponse.class);

kryo.register(MergedWarpMessage.class);
kryo.register(MergeResultMessage.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.codec.protobuf.convertor;

import io.seata.codec.protobuf.generated.AbstractGlobalEndRequestProto;
import io.seata.codec.protobuf.generated.AbstractMessageProto;
import io.seata.codec.protobuf.generated.AbstractTransactionRequestProto;
import io.seata.codec.protobuf.generated.GlobalReportRequestProto;
import io.seata.codec.protobuf.generated.GlobalStatusProto;
import io.seata.codec.protobuf.generated.MessageTypeProto;
import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.transaction.GlobalReportRequest;

/**
* @author lorne.cl
*/
public class GlobalReportRequestConvertor implements PbConvertor<GlobalReportRequest, GlobalReportRequestProto> {
@Override
public GlobalReportRequestProto convert2Proto(GlobalReportRequest globalReportRequest) {
final short typeCode = globalReportRequest.getTypeCode();

final AbstractMessageProto abstractMessage = AbstractMessageProto.newBuilder().setMessageType(
MessageTypeProto.forNumber(typeCode)).build();

final AbstractTransactionRequestProto abstractTransactionRequestProto = AbstractTransactionRequestProto
.newBuilder().setAbstractMessage(
abstractMessage).build();

final String extraData = globalReportRequest.getExtraData();
AbstractGlobalEndRequestProto abstractGlobalEndRequestProto = AbstractGlobalEndRequestProto.newBuilder()
.setAbstractTransactionRequest(abstractTransactionRequestProto)
.setXid(globalReportRequest.getXid())
.setExtraData(extraData == null ? "" : extraData)
.build();

GlobalReportRequestProto result = GlobalReportRequestProto.newBuilder().setAbstractGlobalEndRequest(
abstractGlobalEndRequestProto)
.setGlobalStatus(GlobalStatusProto.valueOf(globalReportRequest.getGlobalStatus().name()))
.build();

return result;
}

@Override
public GlobalReportRequest convert2Model(GlobalReportRequestProto globalReportRequestProto) {
GlobalReportRequest globalReportRequest = new GlobalReportRequest();
globalReportRequest.setExtraData(globalReportRequestProto.getAbstractGlobalEndRequest().getExtraData());
globalReportRequest.setXid(globalReportRequestProto.getAbstractGlobalEndRequest().getXid());
globalReportRequest.setGlobalStatus(GlobalStatus.valueOf(globalReportRequestProto.getGlobalStatus().name()));
return globalReportRequest;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.codec.protobuf.convertor;

import io.seata.codec.protobuf.generated.AbstractGlobalEndResponseProto;
import io.seata.codec.protobuf.generated.AbstractMessageProto;
import io.seata.codec.protobuf.generated.AbstractResultMessageProto;
import io.seata.codec.protobuf.generated.AbstractTransactionResponseProto;
import io.seata.codec.protobuf.generated.GlobalReportResponseProto;
import io.seata.codec.protobuf.generated.GlobalStatusProto;
import io.seata.codec.protobuf.generated.MessageTypeProto;
import io.seata.codec.protobuf.generated.ResultCodeProto;
import io.seata.codec.protobuf.generated.TransactionExceptionCodeProto;
import io.seata.core.exception.TransactionExceptionCode;
import io.seata.core.model.GlobalStatus;
import io.seata.core.protocol.ResultCode;
import io.seata.core.protocol.transaction.GlobalReportResponse;

/**
* @author lorne.cl
*/
public class GlobalReportResponseConvertor implements PbConvertor<GlobalReportResponse, GlobalReportResponseProto> {
@Override
public GlobalReportResponseProto convert2Proto(GlobalReportResponse globalStatusResponse) {
final short typeCode = globalStatusResponse.getTypeCode();

final AbstractMessageProto abstractMessage = AbstractMessageProto.newBuilder().setMessageType(
MessageTypeProto.forNumber(typeCode)).build();

final String msg = globalStatusResponse.getMsg();
final AbstractResultMessageProto abstractResultMessageProto = AbstractResultMessageProto.newBuilder().setMsg(
msg == null ? "" : msg)
.setResultCode(ResultCodeProto.valueOf(globalStatusResponse.getResultCode().name())).setAbstractMessage(
abstractMessage).build();

AbstractTransactionResponseProto abstractTransactionResponseProto = AbstractTransactionResponseProto
.newBuilder().setAbstractResultMessage(abstractResultMessageProto)
.setTransactionExceptionCode(
TransactionExceptionCodeProto.valueOf(globalStatusResponse.getTransactionExceptionCode().name()))
.build();

AbstractGlobalEndResponseProto abstractGlobalEndResponseProto = AbstractGlobalEndResponseProto.newBuilder()
.setAbstractTransactionResponse(abstractTransactionResponseProto)
.setGlobalStatus(GlobalStatusProto.valueOf(globalStatusResponse.getGlobalStatus().name()))
.build();

GlobalReportResponseProto result = GlobalReportResponseProto.newBuilder().setAbstractGlobalEndResponse(
abstractGlobalEndResponseProto).build();
return result;
}

@Override
public GlobalReportResponse convert2Model(GlobalReportResponseProto globalStatusResponseProto) {
GlobalReportResponse branchRegisterResponse = new GlobalReportResponse();
final AbstractGlobalEndResponseProto abstractGlobalEndResponse = globalStatusResponseProto
.getAbstractGlobalEndResponse();
AbstractTransactionResponseProto abstractResultMessage = abstractGlobalEndResponse
.getAbstractTransactionResponse();
branchRegisterResponse.setMsg(
abstractResultMessage.getAbstractResultMessage().getMsg());
branchRegisterResponse.setResultCode(
ResultCode.valueOf(abstractResultMessage.getAbstractResultMessage().getResultCode().name()));
branchRegisterResponse.setTransactionExceptionCode(TransactionExceptionCode.valueOf(
abstractResultMessage.getTransactionExceptionCode().name()));
branchRegisterResponse.setGlobalStatus(
GlobalStatus.valueOf(abstractGlobalEndResponse.getGlobalStatus().name()));

return branchRegisterResponse;

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.seata.codec.protobuf.convertor.GlobalCommitResponseConvertor;
import io.seata.codec.protobuf.convertor.GlobalLockQueryRequestConvertor;
import io.seata.codec.protobuf.convertor.GlobalLockQueryResponseConvertor;
import io.seata.codec.protobuf.convertor.GlobalReportRequestConvertor;
import io.seata.codec.protobuf.convertor.GlobalReportResponseConvertor;
import io.seata.codec.protobuf.convertor.GlobalRollbackRequestConvertor;
import io.seata.codec.protobuf.convertor.GlobalRollbackResponseConvertor;
import io.seata.codec.protobuf.convertor.GlobalStatusRequestConvertor;
Expand All @@ -41,6 +43,15 @@
import io.seata.codec.protobuf.convertor.RegisterRMResponseConvertor;
import io.seata.codec.protobuf.convertor.RegisterTMRequestConvertor;
import io.seata.codec.protobuf.convertor.RegisterTMResponseConvertor;
import io.seata.codec.protobuf.generated.GlobalReportRequestProto;
import io.seata.codec.protobuf.generated.GlobalReportResponseProto;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterRMResponse;
import io.seata.core.protocol.RegisterTMRequest;
import io.seata.core.protocol.RegisterTMResponse;
import io.seata.codec.protobuf.convertor.UndoLogDeleteRequestConvertor;
import io.seata.codec.protobuf.generated.BranchCommitRequestProto;
import io.seata.codec.protobuf.generated.BranchCommitResponseProto;
Expand Down Expand Up @@ -68,13 +79,6 @@
import io.seata.codec.protobuf.generated.RegisterTMRequestProto;
import io.seata.codec.protobuf.generated.RegisterTMResponseProto;
import io.seata.codec.protobuf.generated.UndoLogDeleteRequestProto;
import io.seata.core.protocol.HeartbeatMessage;
import io.seata.core.protocol.MergeResultMessage;
import io.seata.core.protocol.MergedWarpMessage;
import io.seata.core.protocol.RegisterRMRequest;
import io.seata.core.protocol.RegisterRMResponse;
import io.seata.core.protocol.RegisterTMRequest;
import io.seata.core.protocol.RegisterTMResponse;
import io.seata.core.protocol.transaction.BranchCommitRequest;
import io.seata.core.protocol.transaction.BranchCommitResponse;
import io.seata.core.protocol.transaction.BranchRegisterRequest;
Expand All @@ -89,6 +93,8 @@
import io.seata.core.protocol.transaction.GlobalCommitResponse;
import io.seata.core.protocol.transaction.GlobalLockQueryRequest;
import io.seata.core.protocol.transaction.GlobalLockQueryResponse;
import io.seata.core.protocol.transaction.GlobalReportRequest;
import io.seata.core.protocol.transaction.GlobalReportResponse;
import io.seata.core.protocol.transaction.GlobalRollbackRequest;
import io.seata.core.protocol.transaction.GlobalRollbackResponse;
import io.seata.core.protocol.transaction.GlobalStatusRequest;
Expand Down Expand Up @@ -150,8 +156,12 @@ private static class SingletonHolder {
new GlobalStatusRequestConvertor());
protobufConvertManager.convertorMap.put(GlobalStatusResponse.class.getName(),
new GlobalStatusResponseConvertor());
protobufConvertManager.convertorMap.put(GlobalReportRequest.class.getName(),
new GlobalReportRequestConvertor());
protobufConvertManager.convertorMap.put(GlobalReportResponse.class.getName(),
new GlobalReportResponseConvertor());
protobufConvertManager.convertorMap.put(UndoLogDeleteRequest.class.getName(),
new UndoLogDeleteRequestConvertor());
new UndoLogDeleteRequestConvertor());

protobufConvertManager.convertorMap.put(MergedWarpMessage.class.getName(),
new MergedWarpMessageConvertor());
Expand Down Expand Up @@ -203,8 +213,12 @@ private static class SingletonHolder {
GlobalStatusRequestProto.class);
protobufConvertManager.protoClazzMap.put(GlobalStatusResponseProto.getDescriptor().getFullName(),
GlobalStatusResponseProto.class);
protobufConvertManager.protoClazzMap.put(GlobalReportRequestProto.getDescriptor().getFullName(),
GlobalReportRequestProto.class);
protobufConvertManager.protoClazzMap.put(GlobalReportResponseProto.getDescriptor().getFullName(),
GlobalReportResponseProto.class);
protobufConvertManager.protoClazzMap.put(UndoLogDeleteRequestProto.getDescriptor().getFullName(),
UndoLogDeleteRequestProto.class);
UndoLogDeleteRequestProto.class);

protobufConvertManager.protoClazzMap.put(MergedWarpMessageProto.getDescriptor().getFullName(),
MergedWarpMessageProto.class);
Expand Down Expand Up @@ -257,6 +271,10 @@ private static class SingletonHolder {
new GlobalStatusRequestConvertor());
protobufConvertManager.reverseConvertorMap.put(GlobalStatusResponseProto.class.getName(),
new GlobalStatusResponseConvertor());
protobufConvertManager.reverseConvertorMap.put(GlobalReportRequestProto.class.getName(),
new GlobalReportRequestConvertor());
protobufConvertManager.reverseConvertorMap.put(GlobalReportResponseProto.class.getName(),
new GlobalReportResponseConvertor());
protobufConvertManager.reverseConvertorMap.put(UndoLogDeleteRequestProto.class.getName(),
new UndoLogDeleteRequestConvertor());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ enum BranchTypeProto {

TCC = 1;

SAGA = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";

package io.seata.protocol.protobuf;

import "abstractGlobalEndRequest.proto";
import "globalStatus.proto";

option java_multiple_files = true;
option java_outer_classname = "GlobalReportRequest";
option java_package = "io.seata.codec.protobuf.generated";

message GlobalReportRequestProto {
AbstractGlobalEndRequestProto abstractGlobalEndRequest = 1;
GlobalStatusProto globalStatus = 2;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";

package io.seata.protocol.protobuf;

import "abstractGlobalEndResponse.proto";

option java_multiple_files = true;
option java_outer_classname = "GlobalReportResponse";
option java_package = "io.seata.codec.protobuf.generated";

message GlobalReportResponseProto {
AbstractGlobalEndResponseProto abstractGlobalEndResponse = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ enum MessageTypeProto {
* The constant TYPE_GLOBAL_STATUS_RESULT.
*/
TYPE_GLOBAL_STATUS_RESULT = 16;
/**
* The constant TYPE_GLOBAL_REPORT.
*/
TYPE_GLOBAL_REPORT = 17;
/**
* The constant TYPE_GLOBAL_REPORT_RESULT.
*/
TYPE_GLOBAL_REPORT_RESULT = 18;
/**
* The constant TYPE_GLOBAL_LOCK_QUERY.
*/
Expand Down
Loading

0 comments on commit 6864338

Please sign in to comment.