Skip to content

Commit

Permalink
Optimize mesh support. (sofastack#722)
Browse files Browse the repository at this point in the history
* optimize mesh support

* add protocol type

* add more

* mesh api parameter settings
  • Loading branch information
leizhiyuan committed Aug 21, 2019
1 parent e8c00d5 commit 3b3fab1
Show file tree
Hide file tree
Showing 13 changed files with 336 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,7 @@ public boolean needToLoad(ConsumerBootstrap consumerBootstrap) {
}
}
}

boolean isBolt = consumerConfig.getProtocol().equalsIgnoreCase(RpcConstants.PROTOCOL_TYPE_BOLT);

return !isDirect && isMesh && isBolt;
return !isDirect && isMesh;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderHelper;
import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
Expand All @@ -28,6 +29,7 @@
import com.alipay.sofa.rpc.event.EventBus;
import com.alipay.sofa.rpc.event.ProviderPubEvent;
import com.alipay.sofa.rpc.ext.Extension;
import com.alipay.sofa.rpc.listener.ProviderInfoListener;
import com.alipay.sofa.rpc.log.LogCodes;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
Expand All @@ -44,6 +46,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* mesh registry
Expand All @@ -56,11 +61,29 @@ public class MeshRegistry extends Registry {
/**
* Logger
*/
private static final Logger LOGGER = LoggerFactory.getLogger(MeshRegistry.class);
private static final Logger LOGGER = LoggerFactory.getLogger(MeshRegistry.class);

private static final String VERSION = "4.0";
private static final String VERSION = "4.0";

private MeshApiClient client;
protected MeshApiClient client;

//init only once
protected boolean inited;

//has registed app info
protected boolean registedApp;

private static ThreadPoolExecutor asyncCreateConnectionExecutor = initThreadPoolExecutor();

private static ThreadPoolExecutor initThreadPoolExecutor() {
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(20, 20, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5000), new NamedThreadFactory(
"Mesh-Async-Registry", true));

//使用
threadPoolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return threadPoolExecutor;
}

/**
* 注册中心配置
Expand All @@ -71,12 +94,6 @@ protected MeshRegistry(RegistryConfig registryConfig) {
super(registryConfig);
}

//init only once
private boolean inited;

//has registed app info
private boolean registedApp;

@Override
public void init() {
synchronized (MeshRegistry.class) {
Expand Down Expand Up @@ -113,7 +130,7 @@ public void register(ProviderConfig config) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, serviceName));
}
doRegister(appName, serviceName, providerInfo);
doRegister(appName, serviceName, providerInfo, server.getProtocol());

if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, serviceName));
Expand All @@ -134,24 +151,41 @@ public void register(ProviderConfig config) {
* @param serviceName 服务关键字
* @param providerInfo 服务提供者数据
*/
protected void doRegister(String appName, String serviceName, ProviderInfo providerInfo) {
protected void doRegister(final String appName, final String serviceName, final ProviderInfo providerInfo,
final String protocol) {

registerAppInfoOnce(appName);
asyncCreateConnectionExecutor.execute(new Runnable() {
@Override
public void run() {
registerAppInfoOnce(appName);

if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, serviceName));
}
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, serviceName));
}

PublishServiceRequest publishServiceRequest = buildPublishServiceRequest(serviceName, protocol,
providerInfo, appName);

client.publishService(publishServiceRequest);
}
});

}

protected PublishServiceRequest buildPublishServiceRequest(String serviceName, String protocol,
ProviderInfo providerInfo,
String appName) {
PublishServiceRequest publishServiceRequest = new PublishServiceRequest();
publishServiceRequest.setServiceName(serviceName);
publishServiceRequest.setProtocolType(protocol);
ProviderMetaInfo providerMetaInfo = new ProviderMetaInfo();
providerMetaInfo.setProtocol(providerInfo.getProtocolType());
providerMetaInfo.setSerializeType(providerInfo.getSerializationType());
providerMetaInfo.setAppName(appName);
providerMetaInfo.setVersion(VERSION);
providerMetaInfo.setProperties(providerInfo.getStaticAttrs());
publishServiceRequest.setProviderMetaInfo(providerMetaInfo);

client.publishService(publishServiceRequest);
return publishServiceRequest;
}

@Override
Expand Down Expand Up @@ -212,41 +246,59 @@ public void batchUnRegister(List<ProviderConfig> configs) {
}

@Override
public List<ProviderGroup> subscribe(ConsumerConfig config) {
final String appName = config.getAppName();
public List<ProviderGroup> subscribe(final ConsumerConfig config) {

registerAppInfoOnce(appName);
asyncCreateConnectionExecutor.execute(new Runnable() {
@Override
public void run() {
final String appName = config.getAppName();

String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol());
SubscribeServiceRequest subscribeRequest = new SubscribeServiceRequest();
subscribeRequest.setServiceName(key);
SubscribeServiceResult subscribeServiceResult = client.subscribeService(subscribeRequest);
registerAppInfoOnce(appName);

if (subscribeServiceResult == null || !subscribeServiceResult.isSuccess()) {
throw new RuntimeException("regist consumer occors error," + subscribeRequest);
SubscribeServiceRequest subscribeRequest = buildSubscribeServiceRequest(config);
SubscribeServiceResult subscribeServiceResult = client.subscribeService(subscribeRequest);

}
if (subscribeServiceResult == null || !subscribeServiceResult.isSuccess()) {
throw new RuntimeException("regist consumer occors error," + subscribeRequest);

List<ProviderGroup> providerGroups = new ArrayList<ProviderGroup>();
}

ProviderGroup providerGroup = new ProviderGroup();
List<ProviderGroup> providerGroups = new ArrayList<ProviderGroup>();

List<ProviderInfo> providerInfos = new ArrayList<ProviderInfo>();
ProviderGroup providerGroup = new ProviderGroup();

String url = fillProtocolAndVersion(subscribeServiceResult, client.getHost(), "");
List<ProviderInfo> providerInfos = new ArrayList<ProviderInfo>();

ProviderInfo providerInfo = ProviderHelper.toProviderInfo(url);
providerInfos.add(providerInfo);
providerGroup.setProviderInfos(providerInfos);
String url = fillProtocolAndVersion(subscribeServiceResult, client.getHost(), "", config.getProtocol());

providerGroups.add(providerGroup);
ProviderInfo providerInfo = ProviderHelper.toProviderInfo(url);
providerInfos.add(providerInfo);
providerGroup.setProviderInfos(providerInfos);

if (EventBus.isEnable(ConsumerSubEvent.class)) {
ConsumerSubEvent event = new ConsumerSubEvent(config);
EventBus.post(event);
}
providerGroups.add(providerGroup);

if (EventBus.isEnable(ConsumerSubEvent.class)) {
ConsumerSubEvent event = new ConsumerSubEvent(config);
EventBus.post(event);
}

final ProviderInfoListener providerInfoListener = config.getProviderInfoListener();
if (providerInfoListener != null) {
providerInfoListener.updateAllProviders(providerGroups);
}
}
});

//async
return null;

}

return providerGroups;
protected SubscribeServiceRequest buildSubscribeServiceRequest(ConsumerConfig consumerConfig) {
String key = MeshRegistryHelper.buildMeshKey(consumerConfig, consumerConfig.getProtocol());
SubscribeServiceRequest subscribeRequest = new SubscribeServiceRequest();
subscribeRequest.setServiceName(key);
return subscribeRequest;
}

protected void registerAppInfoOnce(String appName) {
Expand Down Expand Up @@ -276,17 +328,24 @@ protected ApplicationInfoRequest buildApplicationRequest(String appName) {
}

protected String fillProtocolAndVersion(SubscribeServiceResult subscribeServiceResult, String targetURL,
String serviceName) {
String serviceName, String protocol) {

String meshPort = judgeMeshPort(protocol);

final List<String> datas = subscribeServiceResult.getDatas();

if (datas == null) {
targetURL = targetURL + ":" + MeshConstants.TCP_PORT;
if (CommonUtils.isEmpty(datas)) {
targetURL = targetURL + ":" + meshPort;
} else {
for (String data : subscribeServiceResult.getDatas()) {
String param = data.substring(data.indexOf("?"));
targetURL = targetURL + ":" + MeshConstants.TCP_PORT;
targetURL = targetURL + param;
final int indexOfParam = data.indexOf("?");
if (indexOfParam != -1) {
String param = data.substring(indexOfParam + 1);
targetURL = targetURL + ":" + meshPort;
targetURL = targetURL + "?" + param;
} else {
targetURL = targetURL + ":" + meshPort;
}
break;
}
}
Expand All @@ -295,11 +354,15 @@ protected String fillProtocolAndVersion(SubscribeServiceResult subscribeServiceR

@Override
public void unSubscribe(ConsumerConfig config) {
String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol());
UnSubscribeServiceRequest unsubscribeRequest = new UnSubscribeServiceRequest();
UnSubscribeServiceRequest unsubscribeRequest = buildUnSubscribeServiceRequest(config);
client.unSubscribeService(unsubscribeRequest);
}

protected UnSubscribeServiceRequest buildUnSubscribeServiceRequest(ConsumerConfig config) {
UnSubscribeServiceRequest unsubscribeRequest = new UnSubscribeServiceRequest();
String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol());
unsubscribeRequest.setServiceName(key);
client.unSubscribeService(unsubscribeRequest);
return unsubscribeRequest;
}

@Override
Expand All @@ -315,6 +378,10 @@ public void batchUnSubscribe(List<ConsumerConfig> configs) {
}
}

protected String judgeMeshPort(String protocol) {
return String.valueOf(MeshConstants.TCP_PORT);
}

@Override
public void destroy() {
// 销毁前备份一下
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public static ProviderInfo convertProviderToProviderInfo(ProviderConfig config,
.setWeight(config.getWeight())
.setSerializationType(config.getSerialization())
.setProtocolType(server.getProtocol())
.setPath(server.getContextPath());
.setPath(server.getContextPath())
.setStaticAttrs(config.getParameters());
String host = server.getHost();
if (NetUtils.isLocalHost(host) || NetUtils.isAnyHost(host)) {
host = SystemInfo.getLocalHost();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,7 @@
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.registry.mesh.model.ApplicationInfoRequest;
import com.alipay.sofa.rpc.registry.mesh.model.ApplicationInfoResult;
import com.alipay.sofa.rpc.registry.mesh.model.MeshEndpoint;
import com.alipay.sofa.rpc.registry.mesh.model.PublishServiceRequest;
import com.alipay.sofa.rpc.registry.mesh.model.PublishServiceResult;
import com.alipay.sofa.rpc.registry.mesh.model.SubscribeServiceRequest;
import com.alipay.sofa.rpc.registry.mesh.model.SubscribeServiceResult;
import com.alipay.sofa.rpc.registry.mesh.model.UnPublishServiceRequest;
import com.alipay.sofa.rpc.registry.mesh.model.UnPublishServiceResult;
import com.alipay.sofa.rpc.registry.mesh.model.UnSubscribeServiceRequest;
import com.alipay.sofa.rpc.registry.mesh.model.UnSubscribeServiceResult;
import com.alipay.sofa.rpc.registry.mesh.model.*;

import java.io.BufferedReader;
import java.io.DataOutputStream;
Expand All @@ -41,7 +31,8 @@
import java.net.URL;

/**
* @author <a href=mailto:[email protected]>leizhiyuan</a>
* @author bystander
* @version $Id: MeshApiClient.java, v 0.1 2018年03月27日 2:24 PM bystander Exp $
*/
public class MeshApiClient {

Expand All @@ -52,12 +43,14 @@ public class MeshApiClient {
/**
* 连接超时
*/
private static int connectTimeout = 1000;
private static int connectTimeout = Integer.parseInt(System.getProperty(
"mesh_http_connect_timeout", "3000")); ;

/**
* 读取超时
*/
private static int readTimeout = 1000;
private static int readTimeout = Integer.parseInt(System.getProperty(
"mesh_http_read_timeout", "15000"));

private static String errorMessage = "ERROR";

Expand Down Expand Up @@ -93,7 +86,8 @@ public boolean registeApplication(ApplicationInfoRequest applicationInfoRequest)
String result = httpPost(MeshEndpoint.CONFIGS, json);

if (!StringUtils.equals(result, errorMessage)) {
final ApplicationInfoResult parse = JSON.parseObject(result, ApplicationInfoResult.class);
final ApplicationInfoResult parse = JSON.parseObject(result,
ApplicationInfoResult.class);
if (parse.isSuccess()) {
return true;
}
Expand All @@ -110,7 +104,8 @@ public int unPublishService(UnPublishServiceRequest request) {
String result = httpPost(MeshEndpoint.UN_PUBLISH, json);

if (!StringUtils.equals(result, errorMessage)) {
final UnPublishServiceResult parse = JSON.parseObject(result, UnPublishServiceResult.class);
final UnPublishServiceResult parse = JSON.parseObject(result,
UnPublishServiceResult.class);
if (parse.isSuccess()) {
return 1;
}
Expand All @@ -128,8 +123,7 @@ public SubscribeServiceResult subscribeService(SubscribeServiceRequest subscribe

SubscribeServiceResult subscribeServiceResult;
if (!StringUtils.equals(result, errorMessage)) {
subscribeServiceResult = JSON.parseObject(result,
SubscribeServiceResult.class);
subscribeServiceResult = JSON.parseObject(result, SubscribeServiceResult.class);
return subscribeServiceResult;
} else {
subscribeServiceResult = new SubscribeServiceResult();
Expand Down Expand Up @@ -168,7 +162,7 @@ private HttpURLConnection createConnection(URL url, String method, boolean doOut
con.setRequestProperty("Content-Type", "text/plain");
return con;
} catch (IOException e) {
e.printStackTrace();
LOGGER.errorWithApp(null, "uri:" + url, e);
return null;
}

Expand Down Expand Up @@ -206,7 +200,13 @@ private String readDataFromConnection(HttpURLConnection con) {
return result;
}

private String httpGet(String path) {
/**
* for get method
*
* @param path
* @return
*/
public String httpGet(String path) {
HttpURLConnection con = null;
String result = null;
try {
Expand Down
Loading

0 comments on commit 3b3fab1

Please sign in to comment.