Skip to content

Commit

Permalink
reload @@config_all BUG修复及代码重构
Browse files Browse the repository at this point in the history
  • Loading branch information
zhuam committed Sep 18, 2016
1 parent 5bc150b commit 14a9d9f
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 98 deletions.
5 changes: 3 additions & 2 deletions src/main/java/io/mycat/MycatServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ public void run() {

//根据 lastTime 确认事务的执行, 超过 sqlExecuteTimeout 阀值 close connection
long currentTime = TimeUtil.currentTimeMillis();
Iterator<BackendConnection> iter = PhysicalDBPool.oldCons.iterator();
Iterator<BackendConnection> iter = NIOProcessor.backends_old.iterator();
while( iter.hasNext() ) {
BackendConnection con = iter.next();
long lastTime = con.getLastTime();
Expand Down Expand Up @@ -681,12 +681,13 @@ public void run() {
node.heartbeatCheck(heartPeriod);
}

/*
Map<String, PhysicalDBPool> _nodes = config.getBackupDataHosts();
if (_nodes != null) {
for (PhysicalDBPool node : _nodes.values()) {
node.heartbeatCheck(heartPeriod);
}
}
}*/
}
});
}
Expand Down
38 changes: 1 addition & 37 deletions src/main/java/io/mycat/backend/ConMap.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package io.mycat.backend;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -135,38 +133,4 @@ public void clearConnections(String reason, PhysicalDatasource dataSouce) {
}
items.clear();
}

/**
* 从 NIOProcessor 将 BackendConnection 移除到 old connection 待回收区
* @param dataSouce
* @return
*/
public List<BackendConnection> shiftConnections(PhysicalDatasource dataSouce) {

List<BackendConnection> backends = new ArrayList<BackendConnection>();

for (NIOProcessor processor : MycatServer.getInstance().getProcessors()) {
ConcurrentMap<Long, BackendConnection> map = processor.getBackends();
Iterator<Entry<Long, BackendConnection>> itor = map.entrySet().iterator();
while (itor.hasNext()) {
Entry<Long, BackendConnection> entry = itor.next();
BackendConnection con = entry.getValue();
if (con instanceof MySQLConnection) {
if (((MySQLConnection) con).getPool() == dataSouce) {
backends.add( con );
itor.remove();
}
}else if(con instanceof JDBCConnection){
if(((JDBCConnection) con).getPool() == dataSouce){
backends.add( con );
itor.remove();
}
}
}
}
items.clear();

return backends;
}

}
}
27 changes: 0 additions & 27 deletions src/main/java/io/mycat/backend/datasource/PhysicalDBPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -48,10 +46,6 @@ public class PhysicalDBPool {

protected static final Logger LOGGER = LoggerFactory.getLogger(PhysicalDBPool.class);

// TODO: add by zhuam
// reload @@config_all 后, 老的后端connection 全部移往 oldCons, 待检测进程销毁
public final static ConcurrentLinkedQueue<BackendConnection> oldCons = new ConcurrentLinkedQueue<BackendConnection>();

public static final int BALANCE_NONE = 0;
public static final int BALANCE_ALL_BACK = 1;
public static final int BALANCE_ALL = 2;
Expand Down Expand Up @@ -381,27 +375,6 @@ public void stopHeartbeat() {
}
}

/**
* 转移 dataSources 数据库连接到回收区
*/
public void shiftDatasourcesOldCons() {

// 清除前一次 reload 转移出去的 old Cons, 避免后端太多的问题
//can't connect to mysql server ,errmsg:Too many connections
Iterator<BackendConnection> iter = oldCons.iterator();
while( iter.hasNext() ) {
BackendConnection con = iter.next();
con.close("clear old datasources");
iter.remove();
}

// 转移本次 old Cons 进入回收区
for (PhysicalDatasource source : this.allDs) {
List<BackendConnection> shiftCons = source.shiftCons();
oldCons.addAll(shiftCons);
}
}

/**
* 强制清除 dataSources
* @param reason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,6 @@ public int getActiveCount() {
public void clearCons(String reason) {
this.conMap.clearConnections(reason, this);
}

/**
* 转移 Cons
*/
public List<BackendConnection> shiftCons() {
return this.conMap.shiftConnections(this);
}

public void startHeartbeat() {
heartbeat.start();
Expand Down
42 changes: 21 additions & 21 deletions src/main/java/io/mycat/config/MycatConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -201,15 +201,15 @@ public long getRollbackTime() {
}

public void reload(
Map<String, UserConfig> users,
Map<String, SchemaConfig> schemas,
Map<String, PhysicalDBNode> dataNodes,
Map<String, PhysicalDBPool> dataHosts,
MycatCluster cluster,
FirewallConfig firewall,
Map<String, UserConfig> newUsers,
Map<String, SchemaConfig> newSchemas,
Map<String, PhysicalDBNode> newDataNodes,
Map<String, PhysicalDBPool> newDataHosts,
MycatCluster newCluster,
FirewallConfig newFirewall,
boolean reloadAll) {

apply(users, schemas, dataNodes, dataHosts, cluster, firewall, reloadAll);
apply(newUsers, newSchemas, newDataNodes, newDataHosts, newCluster, newFirewall, reloadAll);
this.reloadTime = TimeUtil.currentTimeMillis();
this.status = reloadAll?RELOAD_ALL:RELOAD;
}
Expand Down Expand Up @@ -237,12 +237,12 @@ public void rollback(
this.status = ROLLBACK;
}

private void apply(Map<String, UserConfig> users,
Map<String, SchemaConfig> schemas,
Map<String, PhysicalDBNode> dataNodes,
Map<String, PhysicalDBPool> dataHosts,
MycatCluster cluster,
FirewallConfig firewall,
private void apply(Map<String, UserConfig> newUsers,
Map<String, SchemaConfig> newSchemas,
Map<String, PhysicalDBNode> newDataNodes,
Map<String, PhysicalDBPool> newDataHosts,
MycatCluster newCluster,
FirewallConfig newFirewall,
boolean isLoadAll) {

final ReentrantLock lock = this.lock;
Expand Down Expand Up @@ -276,20 +276,20 @@ private void apply(Map<String, UserConfig> users,
// 2、执行新的配置
//---------------------------------------------------
if (isLoadAll) {
if (dataNodes != null) {
for (PhysicalDBPool newDbPool : dataHosts.values()) {
if (newDataHosts != null) {
for (PhysicalDBPool newDbPool : newDataHosts.values()) {
if ( newDbPool != null) {
newDbPool.startHeartbeat();
}
}
}
this.dataNodes = dataNodes;
this.dataHosts = dataHosts;
this.dataNodes = newDataNodes;
this.dataHosts = newDataHosts;
}
this.users = users;
this.schemas = schemas;
this.cluster = cluster;
this.firewall = firewall;
this.users = newUsers;
this.schemas = newSchemas;
this.cluster = newCluster;
this.firewall = newFirewall;

} finally {
lock.unlock();
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/mycat/manager/response/ShowBackendOld.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
import java.nio.ByteBuffer;

import io.mycat.backend.BackendConnection;
import io.mycat.backend.datasource.PhysicalDBPool;
import io.mycat.backend.mysql.PacketUtil;
import io.mycat.backend.mysql.nio.MySQLConnection;
import io.mycat.config.Fields;
import io.mycat.manager.ManagerConnection;
import io.mycat.net.NIOProcessor;
import io.mycat.net.mysql.EOFPacket;
import io.mycat.net.mysql.FieldPacket;
import io.mycat.net.mysql.ResultSetHeaderPacket;
Expand Down Expand Up @@ -66,7 +66,7 @@ public static void execute(ManagerConnection c) {
byte packetId = eof.packetId;
String charset = c.getCharset();

for (BackendConnection bc : PhysicalDBPool.oldCons) {
for (BackendConnection bc : NIOProcessor.backends_old) {
if ( bc != null) {
RowDataPacket row = getRow(bc, charset);
row.packetId = ++packetId;
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/io/mycat/net/NIOProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import java.util.Iterator;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

import io.mycat.buffer.BufferPool;
import io.mycat.buffer.DirectByteBufferPool;
import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.MycatServer;
import io.mycat.backend.BackendConnection;
Expand All @@ -44,7 +46,9 @@
* @author mycat
*/
public final class NIOProcessor {

private static final Logger LOGGER = LoggerFactory.getLogger("NIOProcessor");

private final String name;
private final BufferPool bufferPool;
private final NameableExecutor executor;
Expand All @@ -54,6 +58,10 @@ public final class NIOProcessor {
private long netInBytes;
private long netOutBytes;

// TODO: add by zhuam
// reload @@config_all 后, 老的backends 全部移往 backends_old, 待检测任务进行销毁
public final static ConcurrentLinkedQueue<BackendConnection> backends_old = new ConcurrentLinkedQueue<BackendConnection>();

//前端已连接数
private AtomicInteger frontendsLength = new AtomicInteger(0);

Expand Down

0 comments on commit 14a9d9f

Please sign in to comment.