Skip to content

Commit

Permalink
add ExecuteResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jul 21, 2018
1 parent 8e03c73 commit 42a43a5
Show file tree
Hide file tree
Showing 12 changed files with 212 additions and 158 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package io.shardingsphere.proxy.backend.common;

import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.SQLExecuteResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse;

import java.sql.SQLException;

Expand All @@ -34,8 +34,8 @@ public interface SQLExecuteEngine {
*
* @param routeResult route result
* @param isReturnGeneratedKeys is return generated keys
* @return SQL execute responses
* @return execute response
* @throws SQLException SQL exception
*/
SQLExecuteResponse execute(SQLRouteResult routeResult, boolean isReturnGeneratedKeys) throws SQLException;
ExecuteResponse execute(SQLRouteResult routeResult, boolean isReturnGeneratedKeys) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter;
import io.shardingsphere.proxy.backend.common.BackendHandler;
import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.SQLExecuteResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteUpdateResponse;
import io.shardingsphere.proxy.config.RuleRegistry;
import io.shardingsphere.proxy.metadata.ProxyShardingRefreshHandler;
import io.shardingsphere.proxy.transport.common.packet.DatabasePacket;
Expand Down Expand Up @@ -71,7 +73,7 @@ public abstract class JDBCBackendHandler implements BackendHandler {

private final JDBCExecuteEngine executeEngine;

private SQLExecuteResponse responses;
private ExecuteResponse executeResponse;

private MergedResult mergedResult;

Expand Down Expand Up @@ -111,8 +113,8 @@ private CommandResponsePackets execute(final SQLRouteResult routeResult) throws
return new CommandResponsePackets(new ErrPacket(1,
ServerErrorCode.ER_ERROR_ON_MODIFYING_GTID_EXECUTED_TABLE, sqlStatement.getTables().isSingleTable() ? sqlStatement.getTables().getSingleTableName() : "unknown_table"));
}
responses = executeEngine.execute(routeResult, isReturnGeneratedKeys);
CommandResponsePackets result = merge(sqlStatement, responses.getCommandResponsePacketsList());
executeResponse = executeEngine.execute(routeResult, isReturnGeneratedKeys);
CommandResponsePackets result = merge(sqlStatement);
if (!ruleRegistry.isMasterSlaveOnly()) {
ProxyShardingRefreshHandler.build(routeResult).execute();
}
Expand All @@ -124,32 +126,30 @@ private boolean isUnsupportedXA(final SQLType sqlType) throws SystemException {
return TransactionType.XA == ruleRegistry.getTransactionType() && SQLType.DDL == sqlType && Status.STATUS_NO_TRANSACTION != AtomikosUserTransaction.getInstance().getStatus();
}

private CommandResponsePackets merge(final SQLStatement sqlStatement, final Collection<CommandResponsePackets> packets) {
Collection<DatabasePacket> headPackets = new LinkedList<>();
for (CommandResponsePackets each : packets) {
if (null != each) {
private CommandResponsePackets merge(final SQLStatement sqlStatement) {
if (executeResponse instanceof ExecuteUpdateResponse) {
Collection<DatabasePacket> headPackets = new LinkedList<>();
for (CommandResponsePackets each : ((ExecuteUpdateResponse) executeResponse).getPacketsList()) {
if (each.getHeadPacket() instanceof ErrPacket) {
return new CommandResponsePackets(each.getHeadPacket());
}
headPackets.add(each.getHeadPacket());
}
return mergeUpdate(headPackets);
}
CommandResponsePackets firstCommandResponsePackets = packets.iterator().next();
if (firstCommandResponsePackets instanceof QueryResponsePackets) {
currentSequenceId += firstCommandResponsePackets.getPackets().size();
try {
mergedResult = mergeQuery(sqlStatement);
return firstCommandResponsePackets;
} catch (final SQLException ex) {
return new CommandResponsePackets(new ErrPacket(1, ex));
}
QueryResponsePackets result = ((ExecuteQueryResponse) executeResponse).getQueryResponsePackets();
currentSequenceId += result.getPackets().size();
try {
mergedResult = mergeQuery(sqlStatement);
return result;
} catch (final SQLException ex) {
return new CommandResponsePackets(new ErrPacket(1, ex));
}
return mergeUpdate(headPackets);
}

private MergedResult mergeQuery(final SQLStatement sqlStatement) throws SQLException {
isMerged = true;
return MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), responses.getQueryResults(), sqlStatement, ruleRegistry.getShardingMetaData()).merge();
return MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), ((ExecuteQueryResponse) executeResponse).getQueryResults(), sqlStatement, ruleRegistry.getShardingMetaData()).merge();
}

private CommandResponsePackets mergeUpdate(final Collection<DatabasePacket> packets) {
Expand Down Expand Up @@ -194,12 +194,13 @@ public final DatabasePacket getResultValue() {
if (!hasMoreResultValueFlag) {
return new EofPacket(++currentSequenceId);
}
QueryResponsePackets queryResponsePackets = ((ExecuteQueryResponse) executeResponse).getQueryResponsePackets();
try {
List<Object> data = new ArrayList<>(responses.getColumnCount());
for (int i = 1; i <= responses.getColumnCount(); i++) {
List<Object> data = new ArrayList<>(queryResponsePackets.getColumnCount());
for (int i = 1; i <= queryResponsePackets.getColumnCount(); i++) {
data.add(mergedResult.getValue(i, Object.class));
}
return newDatabasePacket(++currentSequenceId, data, responses.getColumnCount(), responses.getColumnTypes());
return newDatabasePacket(++currentSequenceId, data, queryResponsePackets.getColumnCount(), queryResponsePackets.getColumnTypes());
} catch (final SQLException ex) {
return new ErrPacket(1, ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.proxy.backend.common.SQLExecuteEngine;
import io.shardingsphere.proxy.backend.common.jdbc.BackendConnection;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteUpdateResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteQueryResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteUpdateResponseUnit;
import io.shardingsphere.proxy.transport.mysql.constant.ColumnType;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.QueryResponsePackets;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.core.routing.SQLUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.SQLExecuteResponse;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteQueryResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.ExecuteUpdateResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteQueryResponseUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit;

import java.sql.Connection;
import java.sql.ResultSet;
Expand All @@ -49,13 +50,13 @@
public abstract class ConnectionStrictlyExecuteEngine extends JDBCExecuteEngine {

@Override
public final SQLExecuteResponse execute(final SQLRouteResult routeResult, final boolean isReturnGeneratedKeys) throws SQLException {
public final ExecuteResponse execute(final SQLRouteResult routeResult, final boolean isReturnGeneratedKeys) throws SQLException {
Map<String, Collection<SQLUnit>> sqlExecutionUnits = routeResult.getSQLUnitGroups();
Entry<String, Collection<SQLUnit>> firstEntry = sqlExecutionUnits.entrySet().iterator().next();
sqlExecutionUnits.remove(firstEntry.getKey());
List<Future<Collection<ExecuteResponseUnit>>> futureList = asyncExecute(isReturnGeneratedKeys, sqlExecutionUnits);
Collection<ExecuteResponseUnit> firstJDBCExecuteResponses = syncExecute(isReturnGeneratedKeys, firstEntry.getKey(), firstEntry.getValue());
return buildCommandResponsePackets(firstJDBCExecuteResponses, futureList);
Collection<ExecuteResponseUnit> firstExecuteResponseUnits = syncExecute(isReturnGeneratedKeys, firstEntry.getKey(), firstEntry.getValue());
return buildCommandResponsePackets(firstExecuteResponseUnits, futureList);
}

private List<Future<Collection<ExecuteResponseUnit>>> asyncExecute(final boolean isReturnGeneratedKeys, final Map<String, Collection<SQLUnit>> sqlUnitGroups) throws SQLException {
Expand Down Expand Up @@ -98,33 +99,45 @@ private Collection<ExecuteResponseUnit> syncExecute(final boolean isReturnGenera
return result;
}

private SQLExecuteResponse buildCommandResponsePackets(final Collection<ExecuteResponseUnit> firstJDBCExecuteResponses, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
List<CommandResponsePackets> commandResponsePackets = new LinkedList<>();
List<QueryResult> queryResults = new LinkedList<>();
for (ExecuteResponseUnit each : firstJDBCExecuteResponses) {
if (null != each.getCommandResponsePackets()) {
commandResponsePackets.add(each.getCommandResponsePackets());
}
if (each instanceof ExecuteQueryResponseUnit) {
queryResults.add(((ExecuteQueryResponseUnit) each).getQueryResult());
}
private ExecuteResponse buildCommandResponsePackets(final Collection<ExecuteResponseUnit> firstExecuteResponseUnits, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
ExecuteResponseUnit firstExecuteResponseUnit = firstExecuteResponseUnits.iterator().next();
return firstExecuteResponseUnit instanceof ExecuteQueryResponseUnit
? getExecuteQueryResponse((ExecuteQueryResponseUnit) firstExecuteResponseUnit, firstExecuteResponseUnits, futureList) : getExecuteUpdateResponse(firstExecuteResponseUnits, futureList);
}

private ExecuteResponse getExecuteQueryResponse(
final ExecuteQueryResponseUnit firstExecuteResponseUnit, final Collection<ExecuteResponseUnit> firstExecuteResponseUnits, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
ExecuteQueryResponse result = new ExecuteQueryResponse(firstExecuteResponseUnit.getCommandResponsePackets());
for (ExecuteResponseUnit each : firstExecuteResponseUnits) {
result.getQueryResults().add(((ExecuteQueryResponseUnit) each).getQueryResult());
}
for (Future<Collection<ExecuteResponseUnit>> each : futureList) {
try {
Collection<ExecuteResponseUnit> executeResponses = each.get();
for (ExecuteResponseUnit executeResponse : executeResponses) {
if (executeResponse instanceof ExecuteQueryResponseUnit) {
queryResults.add(((ExecuteQueryResponseUnit) executeResponse).getQueryResult());
} else {
commandResponsePackets.add(executeResponse.getCommandResponsePackets());
result.getQueryResults().add(((ExecuteQueryResponseUnit) executeResponse).getQueryResult());
}
commandResponsePackets.add(executeResponse.getCommandResponsePackets());
}
} catch (final InterruptedException | ExecutionException ex) {
throw new ShardingException(ex.getMessage(), ex);
}
}
return new SQLExecuteResponse(commandResponsePackets, queryResults);
return result;
}

private ExecuteResponse getExecuteUpdateResponse(final Collection<ExecuteResponseUnit> firstExecuteResponseUnits, final List<Future<Collection<ExecuteResponseUnit>>> futureList) {
ExecuteUpdateResponse result = new ExecuteUpdateResponse(firstExecuteResponseUnits);
for (Future<Collection<ExecuteResponseUnit>> each : futureList) {
try {
for (ExecuteResponseUnit executeResponse : each.get()) {
result.getPacketsList().add(executeResponse.getCommandResponsePackets());
}
} catch (final InterruptedException | ExecutionException ex) {
throw new ShardingException(ex.getMessage(), ex);
}
}
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.proxy.backend.common.jdbc.execute.response;

import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.QueryResponsePackets;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

import java.util.LinkedList;
import java.util.List;

/**
* Execute query response.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class ExecuteQueryResponse implements ExecuteResponse {

private final QueryResponsePackets queryResponsePackets;

private final List<QueryResult> queryResults = new LinkedList<>();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.proxy.backend.common.jdbc.execute.response;

/**
* Execute response.
*
* @author zhangliang
*/
public interface ExecuteResponse {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.proxy.backend.common.jdbc.execute.response;

import io.shardingsphere.proxy.backend.common.jdbc.execute.response.unit.ExecuteResponseUnit;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
import lombok.Getter;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

/**
* Execute update response.
*
* @author zhangliang
*/
@Getter
public final class ExecuteUpdateResponse implements ExecuteResponse {

private final List<CommandResponsePackets> packetsList = new LinkedList<>();

private final CommandResponsePackets firstPackets;

public ExecuteUpdateResponse(final CommandResponsePackets packets) {
packetsList.add(packets);
firstPackets = packetsList.iterator().next();
}

public ExecuteUpdateResponse(final Collection<ExecuteResponseUnit> responseUnits) {
for (ExecuteResponseUnit each : responseUnits) {
packetsList.add(each.getCommandResponsePackets());
}
firstPackets = packetsList.iterator().next();
}
}
Loading

0 comments on commit 42a43a5

Please sign in to comment.