Skip to content

Commit

Permalink
Support nacos registry. (sofastack#327)
Browse files Browse the repository at this point in the history
* Add nacos registry support extension.
  • Loading branch information
JervyShi authored and leizhiyuan committed Dec 2, 2018
1 parent cd0caf3 commit a45290e
Show file tree
Hide file tree
Showing 22 changed files with 1,220 additions and 136 deletions.
6 changes: 6 additions & 0 deletions all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,11 @@
<artifactId>sofa-rpc-registry-zk</artifactId>
<version>${sofa.rpc.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-registry-nacos</artifactId>
<version>${sofa.rpc.version}</version>
</dependency>
<dependency>
<groupId>com.alipay.sofa</groupId>
<artifactId>sofa-rpc-remoting-bolt</artifactId>
Expand Down Expand Up @@ -361,6 +366,7 @@
<include>com.alipay.sofa:sofa-rpc-registry-consul</include>
<include>com.alipay.sofa:sofa-rpc-registry-local</include>
<include>com.alipay.sofa:sofa-rpc-registry-zk</include>
<include>com.alipay.sofa:sofa-rpc-registry-nacos</include>
<include>com.alipay.sofa:sofa-rpc-remoting-bolt</include>
<include>com.alipay.sofa:sofa-rpc-remoting-http</include>
<include>com.alipay.sofa:sofa-rpc-remoting-resteasy</include>
Expand Down
17 changes: 17 additions & 0 deletions bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
<curator.version>2.9.1</curator.version>
<opentracing.version>0.22.0</opentracing.version>
<dubbo.version>2.4.10</dubbo.version>
<nacos.version>0.5.0</nacos.version>
<!-- serialization -->
<hessian.version>3.3.3</hessian.version>
<thrift.version>0.9.2</thrift.version>
Expand All @@ -32,6 +33,7 @@
<commons.fileupload.version>1.3.3</commons.fileupload.version>
<!-- Log libs -->
<slf4j.version>1.7.21</slf4j.version>
<logback.version>1.2.3</logback.version>
<!-- Test libs -->
<junit.version>4.10</junit.version>
<!-- alipay libs -->
Expand Down Expand Up @@ -115,6 +117,11 @@
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
</dependency>

<!-- resteasy -->
<dependency>
Expand Down Expand Up @@ -310,6 +317,16 @@
<version>1.1.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-api</artifactId>
<version>${nacos.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.nacos</groupId>
<artifactId>nacos-client</artifactId>
<version>${nacos.version}</version>
</dependency>

<!-- alipay -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
*/
package com.alipay.sofa.rpc.common;

import java.nio.charset.Charset;

import static com.alipay.sofa.rpc.common.RpcConfigs.getStringValue;

import java.nio.charset.Charset;

/**
* Rpc Constants
*
Expand Down Expand Up @@ -416,6 +416,35 @@ public class RpcConstants {
*/
public static final String CONFIG_KEY_LOADBALANCER = "loadBalancer";

/**
* 配置key:delay
*/
public static final String CONFIG_KEY_DELAY = "delay";

/**
* 配置key:id
*/
public static final String CONFIG_KEY_ID = "id";

/**
* 配置key:accepts
*/
public static final String CONFIG_KEY_ACCEPTS = "accepts";

/**
* 配置key:pid
*/
public static final String CONFIG_KEY_PID = "pid";

/**
* 配置key:language
*/
public static final String CONFIG_KEY_LANGUAGE = "language";

/**
* 配置key:protocol
*/
public static final String CONFIG_KEY_PROTOCOL = "protocol";
/*--------配置项相关结束---------*/

/*--------客户端相关开始---------*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
*/
package com.alipay.sofa.rpc.registry.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;

import com.alipay.sofa.rpc.client.ProviderInfo;
import com.alipay.sofa.rpc.client.ProviderInfoAttrs;
import com.alipay.sofa.rpc.client.ProviderStatus;
Expand All @@ -31,24 +37,23 @@
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Common Utils for Registry extensions
*
* @author <a href=mailto:[email protected]>dingpeng</a>
*/
public class RegistryUtils {

private static final String JAVA = "java";

/**
* Convert provider to url.
*
* @param providerConfig the ProviderConfig
* @return the url list
*/
public static List<String> convertProviderToUrls(ProviderConfig providerConfig) {
@SuppressWarnings("unchecked")
List<ServerConfig> servers = providerConfig.getServer();
if (servers != null && !servers.isEmpty()) {
List<String> urls = new ArrayList<String>();
Expand All @@ -61,34 +66,45 @@ public static List<String> convertProviderToUrls(ProviderConfig providerConfig)
host = SystemInfo.getLocalHost();
}
}

Map<String, String> metaData = convertProviderToMap(providerConfig, server);
//noinspection unchecked
sb.append(server.getProtocol()).append("://").append(host).append(":")
.append(server.getPort()).append(server.getContextPath()).append("?version=1.0")
.append(
getKeyPairs(RpcConstants.CONFIG_KEY_UNIQUEID, providerConfig.getUniqueId()))
.append(
getKeyPairs(RpcConstants.CONFIG_KEY_INTERFACE, providerConfig.getInterfaceId()))
.append(
getKeyPairs(RpcConstants.CONFIG_KEY_TIMEOUT, providerConfig.getTimeout()))
.append(getKeyPairs("delay", providerConfig.getDelay()))
.append(getKeyPairs("id", providerConfig.getId()))
.append(
getKeyPairs(RpcConstants.CONFIG_KEY_DYNAMIC, providerConfig.isDynamic()))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_WEIGHT, providerConfig.getWeight()))
.append(getKeyPairs("accepts", server.getAccepts()))
.append(getKeyPairs(ProviderInfoAttrs.ATTR_START_TIME, RpcRuntimeContext.now()))
.append(
getKeyPairs(RpcConstants.CONFIG_KEY_APP_NAME, providerConfig.getAppName()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_SERIALIZATION,
providerConfig.getSerialization()))
.append(convertMap2Pair(providerConfig.getParameters()));
addCommonAttrs(sb);
.append(convertMap2Pair(metaData));
urls.add(sb.toString());
}
return urls;
}
return null;
}

public static Map<String, String> convertProviderToMap(ProviderConfig providerConfig, ServerConfig server) {
Map<String, String> metaData = new HashMap<String, String>();
metaData.put(RpcConstants.CONFIG_KEY_UNIQUEID, providerConfig.getUniqueId());
metaData.put(RpcConstants.CONFIG_KEY_INTERFACE, providerConfig.getInterfaceId());
metaData.put(RpcConstants.CONFIG_KEY_TIMEOUT, String.valueOf(providerConfig.getTimeout()));
metaData.put(RpcConstants.CONFIG_KEY_DELAY, String.valueOf(providerConfig.getDelay()));
metaData.put(RpcConstants.CONFIG_KEY_ID, providerConfig.getId());
metaData.put(RpcConstants.CONFIG_KEY_DYNAMIC, String.valueOf(providerConfig.isDynamic()));
metaData.put(ProviderInfoAttrs.ATTR_WEIGHT, String.valueOf(providerConfig.getWeight()));
metaData.put(RpcConstants.CONFIG_KEY_ACCEPTS, String.valueOf(server.getAccepts()));
metaData.put(ProviderInfoAttrs.ATTR_START_TIME, String.valueOf(RpcRuntimeContext.now()));
metaData.put(RpcConstants.CONFIG_KEY_APP_NAME, providerConfig.getAppName());
metaData.put(RpcConstants.CONFIG_KEY_SERIALIZATION, providerConfig.getSerialization());
metaData.put(RpcConstants.CONFIG_KEY_PROTOCOL, server.getProtocol());
if (null != providerConfig.getParameters()) {
//noinspection unchecked
metaData.putAll(providerConfig.getParameters());
}

// add common attr
metaData.put(RpcConstants.CONFIG_KEY_LANGUAGE, JAVA);
metaData.put(RpcConstants.CONFIG_KEY_PID, RpcRuntimeContext.PID);
metaData.put(RpcConstants.CONFIG_KEY_RPC_VERSION, String.valueOf(Version.RPC_VERSION));
return metaData;
}

/**
* Convert consumer to url.
*
Expand All @@ -98,11 +114,12 @@ public static List<String> convertProviderToUrls(ProviderConfig providerConfig)
public static String convertConsumerToUrl(ConsumerConfig consumerConfig) {
StringBuilder sb = new StringBuilder(200);
String host = SystemInfo.getLocalHost();
//noinspection unchecked
sb.append(consumerConfig.getProtocol()).append("://").append(host).append("?version=1.0")
.append(getKeyPairs(RpcConstants.CONFIG_KEY_UNIQUEID, consumerConfig.getUniqueId()))
.append(getKeyPairs("pid", RpcRuntimeContext.PID))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_PID, RpcRuntimeContext.PID))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_TIMEOUT, consumerConfig.getTimeout()))
.append(getKeyPairs("id", consumerConfig.getId()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_ID, consumerConfig.getId()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_GENERIC, consumerConfig.isGeneric()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_INTERFACE, consumerConfig.getInterfaceId()))
.append(getKeyPairs(RpcConstants.CONFIG_KEY_APP_NAME, consumerConfig.getAppName()))
Expand Down Expand Up @@ -135,8 +152,8 @@ public static String getKeyPairs(String key, Object value) {
* @param sb 属性
*/
private static void addCommonAttrs(StringBuilder sb) {
sb.append(getKeyPairs("pid", RpcRuntimeContext.PID));
sb.append(getKeyPairs("language", "java"));
sb.append(getKeyPairs(RpcConstants.CONFIG_KEY_PID, RpcRuntimeContext.PID));
sb.append(getKeyPairs(RpcConstants.CONFIG_KEY_LANGUAGE, JAVA));
sb.append(getKeyPairs(RpcConstants.CONFIG_KEY_RPC_VERSION, Version.RPC_VERSION + ""));
}

Expand Down Expand Up @@ -172,20 +189,20 @@ public static String buildConfigPath(String rootPath, AbstractInterfaceConfig co
}

/**
* Read the warmup weight parameter,
* Read the warmUp weight parameter,
* decide whether to switch the state to the preheating period,
* and set the corresponding parameters during the preheating period.
*
* @param providerInfo
* @param providerInfo the provider info
*/
public static void processWarmUpWeight(ProviderInfo providerInfo) {

String warmupTimeStr = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_WARMUP_TIME);
String warmupWeightStr = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_WARMUP_WEIGHT);
String startTimeStr = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_START_TIME);

if (StringUtils.isNotBlank(warmupTimeStr) && StringUtils.isNotBlank(warmupWeightStr)
&& StringUtils.isNotBlank(startTimeStr)) {
if (StringUtils.isNotBlank(warmupTimeStr) && StringUtils.isNotBlank(warmupWeightStr) &&
StringUtils.isNotBlank(startTimeStr)) {

long warmupTime = CommonUtils.parseLong(warmupTimeStr, 0);
int warmupWeight = CommonUtils.parseInt(warmupWeightStr,
Expand All @@ -204,4 +221,29 @@ public static void processWarmUpWeight(ProviderInfo providerInfo) {
providerInfo.getStaticAttrs().remove(ProviderInfoAttrs.ATTR_WARMUP_WEIGHT);

}

/**
* Init or add list.
*
* @param <K>
* the key parameter
* @param <V>
* the value parameter
* @param orginMap
* the orgin map
* @param key
* the key
* @param needAdd
* the need add
*/
public static <K, V> void initOrAddList(Map<K, List<V>> orginMap, K key, V needAdd) {
List<V> listeners = orginMap.get(key);
if (listeners == null) {
listeners = new CopyOnWriteArrayList<V>();
listeners.add(needAdd);
orginMap.put(key, listeners);
} else {
listeners.add(needAdd);
}
}
}
1 change: 1 addition & 0 deletions extension-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<module>registry-consul</module>
<module>registry-local</module>
<module>registry-zk</module>
<module>registry-nacos</module>
<module>remoting-bolt</module>
<module>remoting-http</module>
<module>remoting-resteasy</module>
Expand Down
Loading

0 comments on commit a45290e

Please sign in to comment.