Skip to content

Commit

Permalink
optimize: abstract common undolog manager method (apache#1712)
Browse files Browse the repository at this point in the history
  • Loading branch information
github-ygy authored and slievrly committed Oct 10, 2019
1 parent 78aeabb commit b2d695f
Show file tree
Hide file tree
Showing 3 changed files with 229 additions and 418 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,36 @@
*/
package io.seata.rm.datasource.undo;

import io.seata.common.Constants;
import io.seata.common.util.BlobUtils;
import io.seata.common.util.CollectionUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ClientTableColumnsName;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.exception.BranchTransactionException;
import io.seata.core.exception.TransactionException;
import io.seata.rm.datasource.ConnectionContext;
import io.seata.rm.datasource.ConnectionProxy;
import io.seata.rm.datasource.DataSourceProxy;
import io.seata.rm.datasource.sql.struct.TableMeta;
import io.seata.rm.datasource.sql.struct.TableMetaCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static io.seata.core.exception.TransactionExceptionCode.BranchRollbackFailed_Retriable;

/**
* @author jsbxyyx
* @date 2019/09/07
Expand Down Expand Up @@ -184,14 +200,204 @@ protected static boolean canUndo(int state) {
return state == State.Normal.getValue();
}

protected static String buildContext(String serializer) {
protected String buildContext(String serializer) {
Map<String, String> map = new HashMap<>();
map.put(UndoLogConstants.SERIALIZER_KEY, serializer);
return CollectionUtils.encodeMap(map);
}

protected static Map<String, String> parseContext(String data) {
protected Map<String, String> parseContext(String data) {
return CollectionUtils.decodeMap(data);
}

/**
* Flush undo logs.
*
* @param cp the cp
* @throws SQLException the sql exception
*/
@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
ConnectionContext connectionContext = cp.getContext();
String xid = connectionContext.getXid();
long branchID = connectionContext.getBranchId();

BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchID);
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());

UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}

insertUndoLogWithNormal(xid, branchID, buildContext(parser.getName()), undoLogContent,
cp.getTargetConnection());
}

/**
*
* Undo.
*
* @param dataSourceProxy the data source proxy
* @param xid the xid
* @param branchId the branch id
* @throws TransactionException the transaction exception
*/
@Override
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;

for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();

// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}

// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();

boolean exists = false;
while (rs.next()) {
exists = true;

// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log",
xid, branchId, state);
}
return;
}

String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
byte[] rollbackInfo = BlobUtils.blob2Bytes(b);

String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() :
UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

try {
// put serializer name to local
setCurrentSerializer(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(),
sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}

// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489

if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}",
xid, branchId, State.GlobalFinished.name());
}
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}",
xid, branchId, State.GlobalFinished.name());
}
}

return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback",
xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable,
String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid, branchId, e.getMessage()), e);

} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}

/**
* insert uodo log when global finished
* @param xid the xid
* @param branchId the branchId
* @param undoLogParser the undoLogParse
* @param conn sql connection
* @throws SQLException
*/
protected abstract void insertUndoLogWithGlobalFinished(String xid, long branchId, UndoLogParser undoLogParser, Connection conn) throws SQLException;

/**
* insert uodo log when normal
* @param xid the xid
* @param branchId the branchId
* @param rollbackCtx the rollbackContext
* @param undoLogContent the undoLogContent
* @param conn sql connection
* @throws SQLException
*/
protected abstract void insertUndoLogWithNormal(String xid, long branchId, String rollbackCtx, byte[] undoLogContent, Connection conn) throws SQLException;
}
Loading

0 comments on commit b2d695f

Please sign in to comment.