Skip to content

Commit

Permalink
issue apache#110 bug fix for RM channel management (apache#169)
Browse files Browse the repository at this point in the history
* issue apache#110 fix RM channel management

* fix merge mistake

* issue apache#110 enhance: if no channel found on my application set, try other application on the same resource.

* bug fix and enhance

* issue apache#110 enhance the original fix
  • Loading branch information
sharajava authored and slievrly committed Jan 18, 2019
1 parent cf10190 commit 105e24a
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 208 deletions.
217 changes: 154 additions & 63 deletions core/src/main/java/com/alibaba/fescar/core/rpc/ChannelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ConcurrentMap;

import com.alibaba.fescar.common.Constants;
import com.alibaba.fescar.common.exception.FrameworkException;
import com.alibaba.fescar.core.protocol.*;
import com.alibaba.fescar.core.protocol.RegisterRMRequest;
import com.alibaba.fescar.core.protocol.RegisterTMRequest;
Expand All @@ -50,21 +51,16 @@ public class ChannelManager {
= new ConcurrentHashMap<Channel, RpcContext>();

/**
* dbkey+appname+ip port context
* resourceId -> applicationId -> ip -> port -> RpcContext
*/
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>
RM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
private static final ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>>>
RM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>>>();

/**
* ip+appname,port
*/
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS
= new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

private static final ConcurrentMap<String, String> DB_GROUP_MAPPING = new ConcurrentHashMap<String, String>();
private static final ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> TM_CHANNELS = new ConcurrentHashMap<String, ConcurrentMap<Integer, RpcContext>>();

/**
* Is registered boolean.
Expand Down Expand Up @@ -103,12 +99,16 @@ private static String buildClientId(String applicationId, Channel channel) {
return applicationId + Constants.CLIENT_ID_SPLIT_CHAR + getAddressFromChannel(channel);
}

private static String[] readClientId(String clientId) {
return clientId.split(Constants.CLIENT_ID_SPLIT_CHAR);
}

private static RpcContext buildChannelHolder(TransactionRole clientRole, String version, String applicationId,
String txServiceGroup, String dbkeys, Channel channel) {
RpcContext holder = new RpcContext();
holder.setClientRole(clientRole);
holder.setVersion(version);
String clientId = buildClientId(applicationId, channel);
holder.setClientId(buildClientId(applicationId, channel));
holder.setApplicationId(applicationId);
holder.setTransactionServiceGroup(txServiceGroup);
holder.addResources(dbKeytoSet(dbkeys));
Expand Down Expand Up @@ -314,84 +314,175 @@ private static Channel getChannelFromSameClientMap(Map<Integer, RpcContext> clie
/**
* Gets get channel.
*
* @param resourceId the db key
* @param clientIp the client ip
* @param clientAppName the client app name
* @return the get channel
* @param resourceId Resource ID
* @param clientId Client ID - ApplicationId:IP:Port
* @return Corresponding channel, NULL if not found.
*/
public static Channel getChannel(String resourceId, String clientIp, String clientAppName) {
public static Channel getChannel(String resourceId, String clientId) {
Channel resultChannel = null;

String[] clientIdInfo = readClientId(clientId);

if (clientIdInfo == null || clientIdInfo.length != 3) {
throw new FrameworkException("Invalid Client ID: " + clientId);
}

String targetApplicationId = clientIdInfo[0];
String targetIP = clientIdInfo[1];
int targetPort = Integer.parseInt(clientIdInfo[2]);

ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>> applicationIdMap = RM_CHANNELS.get(resourceId);
if (null != applicationIdMap && !applicationIdMap.isEmpty()) {
ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> clientIpMap = applicationIdMap.get(clientAppName);
if (null != clientIpMap && !clientIpMap.isEmpty()) {
ConcurrentMap<Integer, RpcContext> portMap = clientIpMap.get(clientIp);
if (null != portMap && !portMap.isEmpty()) {
for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : portMap.entrySet()) {
Channel channel = portMapEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;
break;

if (targetApplicationId == null || applicationIdMap.isEmpty()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[" + resourceId + "]");
}
return null;
}

ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> ipMap = applicationIdMap.get(targetApplicationId);

if (null != ipMap && !ipMap.isEmpty()) {

// Firstly, try to find the original channel through which the branch was registered.
ConcurrentMap<Integer, RpcContext> portMapOnTargetIP = ipMap.get(targetIP);
if (portMapOnTargetIP != null && !portMapOnTargetIP.isEmpty()) {

RpcContext exactRpcContext = portMapOnTargetIP.get(targetPort);
if (exactRpcContext != null) {
Channel channel = exactRpcContext.getChannel();
if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Just got exactly the one " + channel + " for " + clientId);
}
} else {
if (portMapOnTargetIP.remove(targetPort, exactRpcContext)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive " + channel);
}
}
portMap.remove(portMapEntry.getKey());
}
}
if (null == resultChannel) {
for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> clientIpMapEntry : clientIpMap
.entrySet()) {
if (clientIpMapEntry.getKey().equals(clientIp)) { continue; }
for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : clientIpMapEntry.getValue()
.entrySet()) {
Channel channel = portMapEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;

// The original channel was broken, try another one.
if (resultChannel == null) {
for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnTargetIPEntry : portMapOnTargetIP.entrySet()) {
Channel channel = portMapOnTargetIPEntry.getValue().getChannel();

if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose " + channel + " on the same IP[" + targetIP + "] as alternative of "
+ clientId);
}
break;
} else {
if (portMapOnTargetIP.remove(portMapOnTargetIPEntry.getKey(), portMapOnTargetIPEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(
"use same appname:" + clientAppName + ",channel:" + channel + ",replace ip:"
+ clientIp);
LOGGER.info("Removed inactive " + channel);
}
break;
}
clientIpMapEntry.getValue().remove(portMapEntry.getKey());
}
if (null != resultChannel) {
break;
}
}
}
}
}
if (null == resultChannel) {
for (ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>> appIdEntryMap : RM_CHANNELS.values()) {
for (ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>> clientIpEntryMap : appIdEntryMap.values()) {
if (clientIpEntryMap.containsKey(clientIp)) {
for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : clientIpEntryMap.get(clientIp)
.entrySet()) {
Channel channel = portMapEntry.getValue().getChannel();
if (channel.isActive()) {
resultChannel = channel;

// No channel on the this app node, try another one.
if (resultChannel == null) {
for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> ipMapEntry : ipMap
.entrySet()) {
if (ipMapEntry.getKey().equals(targetIP)) { continue; }

ConcurrentMap<Integer, RpcContext> portMapOnOtherIP = ipMapEntry.getValue();
if (portMapOnOtherIP == null || portMapOnOtherIP.isEmpty()) {
continue;
}

for (ConcurrentMap.Entry<Integer, RpcContext> portMapOnOtherIPEntry : portMapOnOtherIP.entrySet()) {
Channel channel = portMapOnOtherIPEntry.getValue().getChannel();

if (channel.isActive()) {
resultChannel = channel;
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose " + channel + " on the same application[" + targetApplicationId
+ "] as alternative of "
+ clientId);
}
break;
} else {
if (portMapOnOtherIP.remove(portMapOnOtherIPEntry.getKey(), portMapOnOtherIPEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("use another dbkey,channel:" + channel);
LOGGER.info("Removed inactive " + channel);
}
break;
}
clientIpEntryMap.get(clientIp).remove(portMapEntry.getKey());
}
}
if (null != resultChannel) {
break;
}
if (resultChannel != null) { break; }
}
}
}

if (resultChannel == null) {
resultChannel = tryOtherApp(applicationIdMap, targetApplicationId);

if (resultChannel == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("No channel is available for resource[" + resourceId
+ "] as alternative of "
+ clientId);
}
if (null != resultChannel) {
break;
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Choose " + resultChannel + " on the same resource[" + resourceId
+ "] as alternative of "
+ clientId);
}
}
}

return resultChannel;

}

private static Channel tryOtherApp(ConcurrentMap<String, ConcurrentMap<String, ConcurrentMap<Integer,
RpcContext>>> applicationIdMap, String myApplicationId) {
Channel chosenChannel = null;
for (ConcurrentMap.Entry<String, ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>>> applicationIdMapEntry : applicationIdMap.entrySet()) {
if (applicationIdMapEntry.getKey().equals(myApplicationId)) {
continue;
}

ConcurrentMap<String, ConcurrentMap<Integer, RpcContext>> targetIPMap = applicationIdMapEntry.getValue();
if (targetIPMap == null || targetIPMap.isEmpty()) {
continue;
}

for (ConcurrentMap.Entry<String, ConcurrentMap<Integer, RpcContext>> targetIPMapEntry : targetIPMap.entrySet()) {
ConcurrentMap<Integer, RpcContext> portMap = targetIPMapEntry.getValue();
if (portMap == null || portMap.isEmpty()) {
continue;
}

for (ConcurrentMap.Entry<Integer, RpcContext> portMapEntry : portMap.entrySet()) {
Channel channel = portMapEntry.getValue().getChannel();
if (channel.isActive()) {
chosenChannel = channel;
break;
} else {
if (portMap.remove(portMapEntry.getKey(), portMapEntry.getValue())) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Removed inactive " + channel);
}
}
}
}
if (chosenChannel != null) { break; }
}
if (chosenChannel != null) { break; }
}
return chosenChannel;

}
}
41 changes: 14 additions & 27 deletions core/src/main/java/com/alibaba/fescar/core/rpc/RpcContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class RpcContext {

private String transactionServiceGroup;

private String clientId;

private Channel channel;

private Set<String> resourceSets;
Expand Down Expand Up @@ -175,7 +177,7 @@ public Map<Integer, RpcContext> getPortMap(String resourceId) {
* @return the get client id
*/
public String getClientId() {
return getClientIp();
return clientId;
}

/**
Expand Down Expand Up @@ -277,20 +279,6 @@ private static String getAddressFromChannel(Channel channel) {
return address;
}

/**
* Gets get client ip.
*
* @return the get client ip
*/
public String getClientIp() {
String address = getAddressFromChannel(channel);
String clientIp = address;
if (clientIp.contains(Constants.IP_PORT_SPLIT_CHAR)) {
clientIp = clientIp.substring(0, clientIp.lastIndexOf(Constants.IP_PORT_SPLIT_CHAR));
}
return clientIp;
}

private static Integer getClientPortFromChannel(Channel channel) {
String address = getAddressFromChannel(channel);
Integer port = 0;
Expand Down Expand Up @@ -347,19 +335,18 @@ public void addResources(Set<String> resource) {
this.resourceSets.addAll(resource);
}

/**
* To string string.
*
* @return the string
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("vgroup:" + transactionServiceGroup);
sb.append(",appname:" + applicationId);
sb.append(",channel:" + channel);
sb.append(",version:" + version);
sb.append(",type:" + clientRole.name());
return sb.toString();
return "RpcContext{" +
"applicationId='" + applicationId + '\'' +
", transactionServiceGroup='" + transactionServiceGroup + '\'' +
", clientId='" + clientId + '\'' +
", channel=" + channel +
", resourceSets=" + resourceSets +
'}';
}
}
Loading

0 comments on commit 105e24a

Please sign in to comment.