Skip to content

Commit

Permalink
Fix autocommit (apache#17904)
Browse files Browse the repository at this point in the history
* fix issue-16894

* fix

* fix

* Create pom.xml
  • Loading branch information
jingshanglu authored May 25, 2022
1 parent 01f94d8 commit 4e0f80f
Show file tree
Hide file tree
Showing 7 changed files with 23 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;

import java.sql.SQLException;

/**
* Backend connection for Proxy.
*
Expand All @@ -38,9 +40,9 @@ public interface BackendConnection<T> {
* Prepare for task execution.
*
* @return can be Void or Future
* @throws BackendConnectionException backend connection exception
* @throws SQLException SQL exception
*/
T prepareForTaskExecution() throws BackendConnectionException;
T prepareForTaskExecution() throws SQLException;

/**
* Close resources used in execution.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,12 @@ public void unmarkResourceInUse(final JDBCDatabaseCommunicationEngine databaseCo
}

@Override
public Void prepareForTaskExecution() throws BackendConnectionException {
public Void prepareForTaskExecution() throws SQLException {
synchronized (this) {
connectionReferenceCount++;
if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) {
JDBCBackendTransactionManager transactionManager = new JDBCBackendTransactionManager(this);
try {
transactionManager.begin();
} catch (SQLException ex) {
throw new BackendConnectionException(ex);
}
transactionManager.begin();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ public static TextProtocolBackendHandler newInstance(final DatabaseType database
}
return DistSQLBackendHandlerFactory.newInstance(databaseType, (DistSQLStatement) sqlStatement, connectionSession);
}
handleAutoCommit(sqlStatement, connectionSession);
SQLStatementContext<?> sqlStatementContext = SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabaseMap(),
sqlStatement, connectionSession.getDefaultDatabaseName());
Optional<TextProtocolBackendHandler> backendHandler = DatabaseAdminBackendHandlerFactory.newInstance(databaseType, sqlStatementContext, connectionSession, sql);
Expand Down Expand Up @@ -138,6 +139,12 @@ private static DatabaseType getProtocolType(final DatabaseType defaultDatabaseTy
: ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabaseMetaData(databaseName).getProtocolType();
}

private static void handleAutoCommit(final SQLStatement sqlStatement, final ConnectionSession connectionSession) throws SQLException {
if (!(sqlStatement instanceof TCLStatement)) {
connectionSession.getBackendConnection().prepareForTaskExecution();
}
}

private static Optional<ExtraTextProtocolBackendHandler> findExtraTextProtocolBackendHandler(final SQLStatement sqlStatement) {
for (ExtraTextProtocolBackendHandler each : ShardingSphereServiceLoader.getServiceInstances(ExtraTextProtocolBackendHandler.class)) {
if (each.accept(sqlStatement)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ private Collection<JDBCDatabaseCommunicationEngine> getInUseDatabaseCommunicatio
}

@Test
public void assertPrepareForTaskExecution() throws BackendConnectionException {
public void assertPrepareForTaskExecution() throws SQLException {
backendConnection.prepareForTaskExecution();
verify(backendConnection).closeDatabaseCommunicationEngines(true);
verify(backendConnection).closeConnections(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ public void run() {
if (sqlShowEnabled) {
fillLogMDC();
}
connectionSession.getBackendConnection().prepareForTaskExecution();
isNeedFlush = executeCommand(context, payload);
// CHECKSTYLE:OFF
} catch (final Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,13 @@ public void assertRunByCommandExecutor() throws SQLException, BackendConnectionE

@SuppressWarnings("unchecked")
@Test
public void assertRunWithError() throws BackendConnectionException {
public void assertRunWithError() throws BackendConnectionException, SQLException {
RuntimeException mockException = new RuntimeException("mock");
doThrow(mockException).when(backendConnection).prepareForTaskExecution();
doThrow(mockException).when(commandExecutor).execute();
when(engine.getCodecEngine().createPacketPayload(message, StandardCharsets.UTF_8)).thenReturn(payload);
when(engine.getCommandExecuteEngine().getCommandPacket(payload, commandPacketType, connectionSession)).thenReturn(commandPacket);
when(engine.getCommandExecuteEngine().getCommandPacketType(payload)).thenReturn(commandPacketType);
when(engine.getCommandExecuteEngine().getCommandExecutor(commandPacketType, commandPacket, connectionSession)).thenReturn(commandExecutor);
when(engine.getCommandExecuteEngine().getErrorPacket(mockException)).thenReturn(databasePacket);
when(engine.getCommandExecuteEngine().getOtherPacket(connectionSession)).thenReturn(Optional.of(databasePacket));
CommandExecutorTask actual = new CommandExecutorTask(engine, connectionSession, handlerContext, message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Optional;
Expand Down Expand Up @@ -67,7 +68,7 @@ public final class ReactiveCommandExecuteTaskTest {
private ReactiveCommandExecutor reactiveCommandExecutor;

@Before
public void setup() throws BackendConnectionException {
public void setup() throws BackendConnectionException, SQLException {
reactiveCommandExecuteTask = new ReactiveCommandExecuteTask(frontendEngine, connectionSession, channelHandlerContext, message);
when(connectionSession.getBackendConnection().prepareForTaskExecution()).thenReturn(Future.succeededFuture());
when(connectionSession.getBackendConnection().closeExecutionResources()).thenReturn(Future.succeededFuture());
Expand Down Expand Up @@ -110,8 +111,8 @@ public void assertExecuteAndNoResponse() {
}

@Test
public void assertExecuteAndExceptionOccur() throws BackendConnectionException {
BackendConnectionException ex = new BackendConnectionException(Collections.emptyList());
public void assertExecuteAndExceptionOccur() throws SQLException {
SQLException ex = new SQLException("");
when(connectionSession.getBackendConnection().prepareForTaskExecution()).thenThrow(ex);
DatabasePacket errorPacket = mock(DatabasePacket.class);
when(frontendEngine.getCommandExecuteEngine().getErrorPacket(ex)).thenReturn(errorPacket);
Expand Down

0 comments on commit 4e0f80f

Please sign in to comment.