From 4e0f80f1eca2651323b8dd86725425094a4299a7 Mon Sep 17 00:00:00 2001 From: JingShang Lu Date: Wed, 25 May 2022 08:42:45 +0800 Subject: [PATCH] Fix autocommit (#17904) * fix issue-16894 * fix * fix * Create pom.xml --- .../proxy/backend/communication/BackendConnection.java | 6 ++++-- .../jdbc/connection/JDBCBackendConnection.java | 8 ++------ .../backend/text/TextProtocolBackendHandlerFactory.java | 7 +++++++ .../jdbc/connection/JDBCBackendConnectionTest.java | 2 +- .../proxy/frontend/command/CommandExecutorTask.java | 1 - .../proxy/frontend/command/CommandExecutorTaskTest.java | 7 +++++-- .../reactive/command/ReactiveCommandExecuteTaskTest.java | 7 ++++--- 7 files changed, 23 insertions(+), 15 deletions(-) diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java index 3d853921a30c7..94c18688fe1ec 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/BackendConnection.java @@ -20,6 +20,8 @@ import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException; import org.apache.shardingsphere.proxy.backend.session.ConnectionSession; +import java.sql.SQLException; + /** * Backend connection for Proxy. * @@ -38,9 +40,9 @@ public interface BackendConnection { * Prepare for task execution. * * @return can be Void or Future - * @throws BackendConnectionException backend connection exception + * @throws SQLException SQL exception */ - T prepareForTaskExecution() throws BackendConnectionException; + T prepareForTaskExecution() throws SQLException; /** * Close resources used in execution. diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java index c59d9ac80b2d7..be80251091384 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnection.java @@ -175,16 +175,12 @@ public void unmarkResourceInUse(final JDBCDatabaseCommunicationEngine databaseCo } @Override - public Void prepareForTaskExecution() throws BackendConnectionException { + public Void prepareForTaskExecution() throws SQLException { synchronized (this) { connectionReferenceCount++; if (!connectionSession.isAutoCommit() && !connectionSession.getTransactionStatus().isInTransaction()) { JDBCBackendTransactionManager transactionManager = new JDBCBackendTransactionManager(this); - try { - transactionManager.begin(); - } catch (SQLException ex) { - throw new BackendConnectionException(ex); - } + transactionManager.begin(); } return null; } diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java index 83366e2aa1fe9..5e8e18ead6c5c 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/main/java/org/apache/shardingsphere/proxy/backend/text/TextProtocolBackendHandlerFactory.java @@ -101,6 +101,7 @@ public static TextProtocolBackendHandler newInstance(final DatabaseType database } return DistSQLBackendHandlerFactory.newInstance(databaseType, (DistSQLStatement) sqlStatement, connectionSession); } + handleAutoCommit(sqlStatement, connectionSession); SQLStatementContext sqlStatementContext = SQLStatementContextFactory.newInstance(ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabaseMap(), sqlStatement, connectionSession.getDefaultDatabaseName()); Optional backendHandler = DatabaseAdminBackendHandlerFactory.newInstance(databaseType, sqlStatementContext, connectionSession, sql); @@ -138,6 +139,12 @@ private static DatabaseType getProtocolType(final DatabaseType defaultDatabaseTy : ProxyContext.getInstance().getContextManager().getMetaDataContexts().getDatabaseMetaData(databaseName).getProtocolType(); } + private static void handleAutoCommit(final SQLStatement sqlStatement, final ConnectionSession connectionSession) throws SQLException { + if (!(sqlStatement instanceof TCLStatement)) { + connectionSession.getBackendConnection().prepareForTaskExecution(); + } + } + private static Optional findExtraTextProtocolBackendHandler(final SQLStatement sqlStatement) { for (ExtraTextProtocolBackendHandler each : ShardingSphereServiceLoader.getServiceInstances(ExtraTextProtocolBackendHandler.class)) { if (each.accept(sqlStatement)) { diff --git a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java index 5c52358ff4bab..b9e10676a810b 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java +++ b/shardingsphere-proxy/shardingsphere-proxy-backend/src/test/java/org/apache/shardingsphere/proxy/backend/communication/jdbc/connection/JDBCBackendConnectionTest.java @@ -382,7 +382,7 @@ private Collection getInUseDatabaseCommunicatio } @Test - public void assertPrepareForTaskExecution() throws BackendConnectionException { + public void assertPrepareForTaskExecution() throws SQLException { backendConnection.prepareForTaskExecution(); verify(backendConnection).closeDatabaseCommunicationEngines(true); verify(backendConnection).closeConnections(false); diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java index 5d415b64590ad..bcba994d6f442 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/main/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTask.java @@ -73,7 +73,6 @@ public void run() { if (sqlShowEnabled) { fillLogMDC(); } - connectionSession.getBackendConnection().prepareForTaskExecution(); isNeedFlush = executeCommand(context, payload); // CHECKSTYLE:OFF } catch (final Exception ex) { diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java index 37d0faf84a07e..41c28c7b44e94 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-core/src/test/java/org/apache/shardingsphere/proxy/frontend/command/CommandExecutorTaskTest.java @@ -152,10 +152,13 @@ public void assertRunByCommandExecutor() throws SQLException, BackendConnectionE @SuppressWarnings("unchecked") @Test - public void assertRunWithError() throws BackendConnectionException { + public void assertRunWithError() throws BackendConnectionException, SQLException { RuntimeException mockException = new RuntimeException("mock"); - doThrow(mockException).when(backendConnection).prepareForTaskExecution(); + doThrow(mockException).when(commandExecutor).execute(); when(engine.getCodecEngine().createPacketPayload(message, StandardCharsets.UTF_8)).thenReturn(payload); + when(engine.getCommandExecuteEngine().getCommandPacket(payload, commandPacketType, connectionSession)).thenReturn(commandPacket); + when(engine.getCommandExecuteEngine().getCommandPacketType(payload)).thenReturn(commandPacketType); + when(engine.getCommandExecuteEngine().getCommandExecutor(commandPacketType, commandPacket, connectionSession)).thenReturn(commandExecutor); when(engine.getCommandExecuteEngine().getErrorPacket(mockException)).thenReturn(databasePacket); when(engine.getCommandExecuteEngine().getOtherPacket(connectionSession)).thenReturn(Optional.of(databasePacket)); CommandExecutorTask actual = new CommandExecutorTask(engine, connectionSession, handlerContext, message); diff --git a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java index 6e077b7d38407..01901789c2011 100644 --- a/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java +++ b/shardingsphere-proxy/shardingsphere-proxy-frontend/shardingsphere-proxy-frontend-reactive-core/src/test/java/org/apache/shardingsphere/proxy/frontend/reactive/command/ReactiveCommandExecuteTaskTest.java @@ -34,6 +34,7 @@ import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; +import java.sql.SQLException; import java.util.Arrays; import java.util.Collections; import java.util.Optional; @@ -67,7 +68,7 @@ public final class ReactiveCommandExecuteTaskTest { private ReactiveCommandExecutor reactiveCommandExecutor; @Before - public void setup() throws BackendConnectionException { + public void setup() throws BackendConnectionException, SQLException { reactiveCommandExecuteTask = new ReactiveCommandExecuteTask(frontendEngine, connectionSession, channelHandlerContext, message); when(connectionSession.getBackendConnection().prepareForTaskExecution()).thenReturn(Future.succeededFuture()); when(connectionSession.getBackendConnection().closeExecutionResources()).thenReturn(Future.succeededFuture()); @@ -110,8 +111,8 @@ public void assertExecuteAndNoResponse() { } @Test - public void assertExecuteAndExceptionOccur() throws BackendConnectionException { - BackendConnectionException ex = new BackendConnectionException(Collections.emptyList()); + public void assertExecuteAndExceptionOccur() throws SQLException { + SQLException ex = new SQLException(""); when(connectionSession.getBackendConnection().prepareForTaskExecution()).thenThrow(ex); DatabasePacket errorPacket = mock(DatabasePacket.class); when(frontendEngine.getCommandExecuteEngine().getErrorPacket(ex)).thenReturn(errorPacket);