Skip to content

Commit

Permalink
Merge pull request MyCATApache#687 from digdeep126/master
Browse files Browse the repository at this point in the history
fix当update语句的where中有子查询时无法处理的bug
  • Loading branch information
mycatmerger committed Dec 25, 2015
2 parents 4ae453c + 4a86c09 commit ba0c9c8
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 4 deletions.
5 changes: 3 additions & 2 deletions src/main/java/io/mycat/MycatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ public void startup() throws IOException {
dataNodeIldeCheckPeriod);
timer.schedule(dataNodeHeartbeat(), 0L,
system.getDataNodeHeartbeatPeriod());
timer.schedule(glableTableConsistencyCheck(), 0L,
system.getGlableTableCheckPeriod());
if(system.isGlobalTableCheckSwitchOn()) // 全局表一致性检测是否开启
timer.schedule(glableTableConsistencyCheck(), 0L,
system.getGlableTableCheckPeriod());
timer.schedule(catletClassClear(), 30000);

}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/mycat/server/MySQLFrontConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ public MySQLFrontConnection(SocketChannel channel) throws IOException {
remoteAddr = (InetSocketAddress) ((SocketChannel) channel)
.getRemoteAddress();
this.host = remoteAddr.getHostString();
this.port = localAddr.getPort();
this.localPort = remoteAddr.getPort();
this.port = remoteAddr.getPort();
this.localPort = localAddr.getPort();
loadDataInfileHandler = new ServerLoadDataInfileHandler(this);
prepareHandler = new ServerPrepareHandler(this);
}
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/mycat/server/config/node/SystemConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public final class SystemConfig {
private static final int DEFAULT_SQL_RECORD_COUNT = 10;
// 全局表一致性检测任务,默认24小时调度一次
private static final long DEFAULT_GLOBAL_TABLE_CHECK_PERIOD = 24 * 60 * 60 * 1000L;
private boolean globalTableCheckSwitchOn = true; // 全局表一致性检查开关
private int maxStringLiteralLength = 65535;
private int frontWriteQueueSize = 2048;
private String bindIp = "0.0.0.0";
Expand Down Expand Up @@ -578,6 +579,14 @@ public void setMycatNodeId(int mycatNodeId) {
this.mycatNodeId = mycatNodeId;
}

public boolean isGlobalTableCheckSwitchOn() {
return globalTableCheckSwitchOn;
}

public void setGlobalTableCheckSwitchOn(boolean globalTableCheckSwitchOn) {
this.globalTableCheckSwitchOn = globalTableCheckSwitchOn;
}

@Override
public String toString() {
return "SystemConfig [processorBufferLocalPercent="
Expand All @@ -603,6 +612,7 @@ public String toString() {
+ ", clusterHeartbeatUser=" + clusterHeartbeatUser
+ ", clusterHeartbeatPass=" + clusterHeartbeatPass
+ ", clusterHeartbeatPeriod=" + clusterHeartbeatPeriod
+ ", globalTableCheckSwitchOn=" + globalTableCheckSwitchOn
+ ", glableTableCheckPeriod=" + glableTableCheckPeriod
+ ", clusterHeartbeatTimeout=" + clusterHeartbeatTimeout
+ ", clusterHeartbeatRetry=" + clusterHeartbeatRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.alibaba.druid.sql.ast.SQLOrderBy;
import com.alibaba.druid.sql.ast.SQLOrderingSpecification;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.expr.SQLInSubQueryExpr;
import com.alibaba.druid.sql.ast.statement.SQLSelect;
import com.alibaba.druid.sql.ast.statement.SQLSelectOrderByItem;
import com.alibaba.druid.sql.ast.statement.SQLTableSource;
import com.alibaba.druid.sql.ast.statement.SQLUpdateSetItem;
Expand Down Expand Up @@ -282,6 +284,16 @@ public static String convertUpdateSQL(String sql){

StringBuilder sb = new StringBuilder(150);

SQLExpr se = update.getWhere();
// where中有子查询: update company set name='com' where id in (select id from xxx where ...)
if(se instanceof SQLInSubQueryExpr){
// return sql;
int idx = sql.toUpperCase().indexOf(" SET ") + 5;
sb.append(sql.substring(0, idx)).append(GLOBAL_TABLE_MYCAT_COLUMN)
.append("=").append(operationTimestamp)
.append(",").append(sql.substring(idx));
return sb.toString();
}
String where = null;
if(update.getWhere() != null)
where = update.getWhere().toString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package io.mycat.sqlengine;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

/**
* 当SQLJob的结果有多行时,利用该处理器进行处理
* @author [email protected]
*/
public class MultiRowSQLQueryResultHandler extends OneRawSQLQueryResultHandler{
private static final Logger LOGGER = LoggerFactory
.getLogger(MultiRowSQLQueryResultHandler.class);
// 获得结果之后,利用该对象进行回调进行通知和处理结果
private final SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> callback;

private List<Map<String, String>> resultRows = new LinkedList<>(); // 保存结果行

public MultiRowSQLQueryResultHandler(String[] fetchCols,
SQLQueryResultListener<SQLQueryResult<List<Map<String, String>>>> callback) {
super(fetchCols, null);
this.callback = callback;
}

@Override
public boolean onRowData(String dataNode, byte[] rowData) {
super.onRowData(dataNode, rowData);
resultRows.add(getResult());

return false;
}

@Override
public void finished(String dataNode, boolean failed) {
SQLQueryResult<List<Map<String, String>>> queryResult =
new SQLQueryResult<List<Map<String, String>>>(this.resultRows, !failed);
if(callback != null)
this.callback.onResult(queryResult); // callback 是构造函数传进来,在得到结果是进行回调
else
LOGGER.warn(" callback is null ");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ public void finished(String dataNode, boolean failed) {

}

// 子类 MultiRowSQLQueryResultHandler 需要使用
protected Map<String, String> getResult() {
return result;
}

}

0 comments on commit ba0c9c8

Please sign in to comment.