diff --git a/bom/pom.xml b/bom/pom.xml
index 2d0a40544..9f98f460c 100644
--- a/bom/pom.xml
+++ b/bom/pom.xml
@@ -348,7 +348,7 @@
com.ecwid.consul
consul-api
- 1.3.0
+ 1.4.2
joda-time
diff --git a/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/registry/utils/RegistryUtils.java b/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/registry/utils/RegistryUtils.java
index a16eff0b1..8f6cf96db 100644
--- a/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/registry/utils/RegistryUtils.java
+++ b/extension-impl/extension-common/src/main/java/com/alipay/sofa/rpc/registry/utils/RegistryUtils.java
@@ -26,6 +26,7 @@
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
+import com.alipay.sofa.rpc.config.ConfigUniqueNameGenerator;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;
@@ -259,4 +260,44 @@ public static void initOrAddList(Map> orginMap, K key, V needA
listeners.add(needAdd);
}
}
+
+ public static String convertInstanceToUrl(String host, int port, Map metaData) {
+ if (metaData == null) {
+ metaData = new HashMap();
+ }
+ String uri = "";
+ String protocol = metaData.get(RpcConstants.CONFIG_KEY_PROTOCOL);
+ if (StringUtils.isNotEmpty(protocol)) {
+ uri = protocol + "://";
+ }
+ uri += host + ":" + port;
+
+ StringBuilder sb = new StringBuilder();
+ for (Map.Entry entry : metaData.entrySet()) {
+ sb.append("&").append(entry.getKey()).append("=").append(entry.getValue());
+ }
+ if (sb.length() > 0) {
+ uri += sb.replace(0, 1, "?").toString();
+ }
+ return uri;
+ }
+
+ public static String getServerHost(ServerConfig server) {
+ String host = server.getVirtualHost();
+ if (host == null) {
+ host = server.getHost();
+ if (NetUtils.isLocalHost(host) || NetUtils.isAnyHost(host)) {
+ host = SystemInfo.getLocalHost();
+ }
+ }
+ return host;
+ }
+
+ public static String buildUniqueName(AbstractInterfaceConfig config, String protocol) {
+ if (RpcConstants.PROTOCOL_TYPE_BOLT.equals(protocol) || RpcConstants.PROTOCOL_TYPE_TR.equals(protocol)) {
+ return ConfigUniqueNameGenerator.getUniqueName(config) + "@DEFAULT";
+ } else {
+ return ConfigUniqueNameGenerator.getUniqueName(config) + "@" + protocol;
+ }
+ }
}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulConstants.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulConstants.java
new file mode 100644
index 000000000..630430094
--- /dev/null
+++ b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulConstants.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.registry.consul;
+
+/**
+ * All constants of the consul registry
+ *
+ * @author ScienJus
+ */
+public class ConsulConstants {
+
+ public static final String CONSUL_SERVICE_NAME_KEY = "consulServiceName";
+
+ public static final String HEARTBEAT_INTERVAL_KEY = "heartbeat.interval";
+
+ public static final String HEARTBEAT_CORE_SIZE_KEY = "heartbeat.coreSize";
+
+ public static final String LOOKUP_INTERVAL_KEY = "lookup.interval";
+
+ public static final String WATCH_TIMEOUT_KEY = "watch.timeout";
+
+ public static final String HEALTH_CHECK_TYPE_KEY = "healthCheck.type";
+
+ public static final String HEALTH_CHECK_TTL_KEY = "healthCheck.ttl";
+
+ public static final String HEALTH_CHECK_HOST_KEY = "healthCheck.host";
+
+ public static final String HEALTH_CHECK_PORT_KEY = "healthCheck.port";
+
+ public static final String HEALTH_CHECK_TIMEOUT_KEY = "healthCheck.timeout";
+
+ public static final String HEALTH_CHECK_INTERVAL_KEY = "healthCheck.interval";
+
+ public static final String HEALTH_CHECK_PROTOCOL_KEY = "healthCheck.protocol";
+
+ public static final String HEALTH_CHECK_PATH_KEY = "healthCheck.path";
+
+ public static final String HEALTH_CHECK_METHOD_KEY = "healthCheck.method";
+
+ public static final int DEFAULT_CONSUL_PORT = 8500;
+
+ public static final int DEFAULT_HEARTBEAT_INTERVAL = 3000;
+
+ public static final int DEFAULT_HEARTBEAT_CORE_SIZE = 1;
+
+ public static final int DEFAULT_LOOKUP_INTERVAL = 1000;
+
+ public static final int DEFAULT_WATCH_TIMEOUT = 5;
+
+ public static final ConsulRegistryProperties.HealthCheckType DEFAULT_HEALTH_CHECK_TYPE = ConsulRegistryProperties.HealthCheckType.TTL;
+
+ public static final String DEFAULT_HEALTH_CHECK_TTL = "10s";
+
+ public static final String DEFAULT_HEALTH_CHECK_TIMEOUT = "1s";
+
+ public static final String DEFAULT_HEALTH_CHECK_INTERVAL = "5s";
+
+ public static final String DEFAULT_HEALTH_CHECK_PROTOCOL = "http";
+
+ public static final String DEFAULT_HEALTH_CHECK_PATH = "/health";
+
+ public static final String DEFAULT_HEALTH_CHECK_METHOD = "GET";
+}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistry.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistry.java
index d4d6ccd59..298fe05b0 100644
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistry.java
+++ b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistry.java
@@ -18,12 +18,12 @@
import com.alipay.sofa.rpc.client.ProviderGroup;
import com.alipay.sofa.rpc.client.ProviderInfo;
-import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.config.ConsumerConfig;
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.RegistryConfig;
+import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.context.RpcRunningState;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.event.ConsumerSubEvent;
@@ -34,43 +34,59 @@
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.registry.Registry;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulConstants;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURL;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURLUtils;
-import com.alipay.sofa.rpc.registry.consul.internal.ConsulManager;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulEphemeralNode;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulService;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulServiceResp;
-import com.alipay.sofa.rpc.registry.consul.model.NotifyConsumerListener;
-import com.alipay.sofa.rpc.registry.consul.model.NotifyListener;
-import com.alipay.sofa.rpc.registry.consul.model.ThrallRoleType;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import java.util.ArrayList;
-import java.util.Collection;
+import com.alipay.sofa.rpc.registry.utils.RegistryUtils;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.agent.model.NewService;
+
+import java.net.URL;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
-import static com.alipay.sofa.rpc.common.utils.StringUtils.CONTEXT_SEP;
+import static com.alipay.sofa.rpc.registry.consul.ConsulUtils.buildServiceId;
+import static com.alipay.sofa.rpc.registry.consul.ConsulUtils.buildServiceIds;
+import static com.alipay.sofa.rpc.registry.consul.ConsulUtils.buildServiceName;
+import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.buildUniqueName;
+import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.getServerHost;
/**
- * CONSUL 注册中心
+ *
+ * Consul Registry. Features:
+ *
+ *
+ * - register publisher as instance to consul agent.
+ * - subscribe instances change event.
+ * - custom health check, e.g. tcp, http.
+ *
+ *
+ * The data structure in Consul consists of three parts: service name, service id, tag.
+ *
+ *
+ * - service name is the human-readable name of each service. In sofa-rpc, the default value is interfaceId.
+ * - tag can be used to filter a set of instances which can be subscribed, we use interfaceId + version + uniqueId + protocol to identify it.
+ * - each instance needs to have a unique service id so it won't be overwritten by other instances, we use tag + host + port to identify it.
+ *
+ *
+ * Here is an example:
+ *
+ * {
+ * Service: "com.alipay.sofa.rpc.registry.consul.TestService",
+ * Tags: [
+ * "com.alipay.sofa.rpc.registry.consul.TestService:1.0:default@DEFAULT"
+ * ],
+ * ID: "com.alipay.sofa.rpc.registry.consul.TestService:1.0:default@DEFAULT-127.0.0.1-12200"
+ * }
+ *
+ *
*
* @author dingpeng
+ * @author ScienJus
* @since 5.5.0
*/
@Extension("consul")
@@ -79,111 +95,45 @@ public class ConsulRegistry extends Registry {
/**
* Logger
*/
- private final static Logger LOGGER = LoggerFactory
- .getLogger(ConsulRegistry.class);
-
- private ConsulManager consulManager;
-
- /**
- * Root path of registry data
- */
- private String rootPath;
-
- /**
- * 保存服务发布者的url
- */
- private ConcurrentMap> providerUrls = new ConcurrentHashMap>();
+ private final static Logger LOGGER = LoggerFactory.getLogger(ConsulRegistry.class);
- /**
- * 保存服务消费者的url
- */
- private ConcurrentMap consumerUrls = new ConcurrentHashMap();
+ private final ConsulRegistryProperties properties;
- private Cache>> serviceCache;
+ private Map heartbeatFutures = new ConcurrentHashMap<>();
- private final ConcurrentMap lookupGroupServices = Maps
- .newConcurrentMap();
+ private Map healthServiceInformers = new ConcurrentHashMap<>();
- private final ConcurrentMap>> notifyServiceListeners = Maps
- .newConcurrentMap();
+ private ConsulClient consulClient;
- private final Set serviceGroupLookUped = Sets
- .newConcurrentHashSet();
+ private ScheduledExecutorService heartbeatExecutor;
- private ExecutorService notifyExecutor;
-
- /**
- * 注册中心配置
- *
- * @param registryConfig 注册中心配置
- */
protected ConsulRegistry(RegistryConfig registryConfig) {
super(registryConfig);
- }
-
- public String[] validateIp(RegistryConfig registryConfig) {
- String addressInput = registryConfig.getAddress(); // xxx:2181,yyy:2181/path1/paht2
-
- if (StringUtils.isEmpty(addressInput)) {
- throw new SofaRpcRuntimeException("Address of consul registry is empty.");
- }
-
- int idx = addressInput.indexOf(CONTEXT_SEP);
- String address; // IP地址
- if (idx > 0) {
- address = addressInput.substring(0, idx);
- rootPath = addressInput.substring(idx);
-
- } else {
- address = addressInput;
- rootPath = "/";
- }
-
- if (!ConsulURLUtils.isValidAddress(address)) {
- throw new SofaRpcRuntimeException("Address format of consul registry is wrong.");
- }
- if (!rootPath.endsWith(CONTEXT_SEP)) {
- rootPath += CONTEXT_SEP; // 保证以"/"结尾
- }
- String[] ipAndHost = StringUtils.split(address, ":");
- return ipAndHost;
- }
-
- private ConsulService buildConsulHealthService(ConsulURL url) {
- return ConsulService.newService()//
- .withAddress(url.getHost())//
- .withPort(Integer.toString(url.getPort()))//
- .withName(ConsulURLUtils.toServiceName(url.getGroup()))//
- .withTag(ConsulURLUtils.healthServicePath(url, ThrallRoleType.PROVIDER))//
- .withId(url.getHost() + ":" + url.getPort() + "-" + url.getPath() + "-" + url.getVersion())//
- .withCheckInterval(Integer.toString(ConsulConstants.TTL)).build();
- }
-
- private ConsulEphemeralNode buildEphemralNode(ConsulURL url, ThrallRoleType roleType) {
- return ConsulEphemeralNode.newEphemralNode().withUrl(url)//
- .withEphemralType(roleType)//
- .withCheckInterval(Integer.toString(ConsulConstants.TTL * 6))//
- .build();
+ this.properties = new ConsulRegistryProperties(registryConfig.getParameters());
}
@Override
public void init() {
-
- if (consulManager != null) {
+ if (consulClient != null) {
return;
}
- String[] address = validateIp(registryConfig);
- consulManager = new ConsulManager(address[0], Integer.parseInt(address[1]));
- serviceCache = CacheBuilder.newBuilder().maximumSize(1000).build();
- notifyExecutor = Executors.newCachedThreadPool(
- new NamedThreadFactory("NotifyConsumerListener", true));
+ String[] hostAndPort = StringUtils.split(registryConfig.getAddress(), ":");
+ String host = hostAndPort[0];
+ int port = hostAndPort.length > 1 ? Integer.parseInt(hostAndPort[1]) : ConsulConstants.DEFAULT_CONSUL_PORT;
+ consulClient = new ConsulClient(host, port);
+
+ int coreSize = properties.getHeartbeatCoreSize();
+
+ heartbeatExecutor = Executors.newScheduledThreadPool(coreSize);
}
@Override
public void destroy() {
- providerUrls.clear();
- consumerUrls.clear();
+ if (heartbeatExecutor != null) {
+ heartbeatExecutor.shutdown();
+ }
+ healthServiceInformers.values().forEach(HealthServiceInformer::shutdown);
}
@Override
@@ -195,7 +145,6 @@ public void destroy(DestroyHook hook) {
@Override
public boolean start() {
-
return true;
}
@@ -204,57 +153,48 @@ public void register(ProviderConfig config) {
String appName = config.getAppName();
if (!registryConfig.isRegister()) {
- //只订阅不注册
+ // 只订阅不注册
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE));
}
return;
}
- if (config.isRegister()) {
- //注册服务端节点
- try {
- List urls = ConsulRegistryHelper.convertProviderToUrls(config);
- if (CommonUtils.isNotEmpty(urls)) {
- String providerPath = ConsulRegistryHelper.buildProviderPath(rootPath, config);
- if (LOGGER.isInfoEnabled(appName)) {
- LOGGER.infoWithApp(appName,
- LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, providerPath));
- }
- for (String url : urls) {
- // url = URLEncoder.encode(url, "UTF-8");
- String providerUrl = providerPath + CONTEXT_SEP + url;
-
- ConsulURL providerConfigUrl = ConsulURL.valueOf(url);
- ConsulService service = this.buildConsulHealthService(providerConfigUrl);
- consulManager.registerService(service);
- ConsulEphemeralNode ephemralNode = this.buildEphemralNode(providerConfigUrl,
- ThrallRoleType.PROVIDER);
- consulManager.registerEphemralNode(ephemralNode);
- if (LOGGER.isInfoEnabled(appName)) {
- LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, providerUrl));
- }
- }
- providerUrls.put(config, urls);
+ if (!config.isRegister()) {
+ return;
+ }
+ // 注册服务端节点
+ try {
+ List services = buildNewServices(config);
+ if (CommonUtils.isNotEmpty(services)) {
+ if (LOGGER.isInfoEnabled(appName)) {
+ LOGGER.infoWithApp(appName,
+ LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_START, config.getInterfaceId()));
+ }
+ for (NewService service : services) {
+ registerConsulService(service);
if (LOGGER.isInfoEnabled(appName)) {
- LOGGER.infoWithApp(appName,
- LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, providerPath));
+ LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB, config.getInterfaceId()));
}
}
- } catch (Exception e) {
- throw new SofaRpcRuntimeException("Failed to register provider to consulRegistry!", e);
+ if (LOGGER.isInfoEnabled(appName)) {
+ LOGGER.infoWithApp(appName,
+ LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_PUB_OVER, config.getInterfaceId()));
+ }
}
+ } catch (Exception e) {
+ throw new SofaRpcRuntimeException("Failed to register provider to consulRegistry!", e);
+ }
- if (EventBus.isEnable(ProviderPubEvent.class)) {
- ProviderPubEvent event = new ProviderPubEvent(config);
- EventBus.post(event);
- }
+ if (EventBus.isEnable(ProviderPubEvent.class)) {
+ ProviderPubEvent event = new ProviderPubEvent(config);
+ EventBus.post(event);
}
}
@Override
public void unRegister(ProviderConfig config) {
-
String appName = config.getAppName();
+
if (!registryConfig.isRegister()) {
// 注册中心不注册
if (LOGGER.isInfoEnabled(appName)) {
@@ -263,43 +203,32 @@ public void unRegister(ProviderConfig config) {
return;
}
// 反注册服务端节点
- if (config.isRegister()) {
- try {
- List urls = providerUrls.remove(config);
-
- if (CommonUtils.isNotEmpty(urls)) {
- String providerPath = ConsulRegistryHelper.buildProviderPath(rootPath, config);
-
- for (String url : urls) {
- ConsulURL providerConfigUrl = ConsulURL.valueOf(url);
- ConsulService service = this.buildConsulHealthService(providerConfigUrl);
- consulManager.unregisterService(service);
- }
- if (LOGGER.isInfoEnabled(appName)) {
- LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_UNPUB,
- providerPath, "1"));
- }
- }
- } catch (Exception e) {
- if (!RpcRunningState.isShuttingDown()) {
- throw new SofaRpcRuntimeException("Failed to unregister provider to consulRegistry!", e);
+ if (!config.isRegister()) {
+ return;
+ }
+ try {
+ List ids = buildServiceIds(config);
+ if (CommonUtils.isNotEmpty(ids)) {
+ ids.forEach(this::deregisterConsulService);
+ if (LOGGER.isInfoEnabled(appName)) {
+ LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_ROUTE_REGISTRY_UNPUB,
+ config.getInterfaceId(), ids.size()));
}
}
+ } catch (Exception e) {
+ if (!RpcRunningState.isShuttingDown()) {
+ throw new SofaRpcRuntimeException("Failed to unregister provider to consulRegistry!", e);
+ }
}
-
}
@Override
public void batchUnRegister(List configs) {
-
- for (ProviderConfig providerConfig : configs) {
- unRegister(providerConfig);
- }
+ configs.forEach(this::unRegister);
}
@Override
public List subscribe(ConsumerConfig config) {
-
String appName = config.getAppName();
if (!registryConfig.isSubscribe()) {
// 注册中心不订阅
@@ -308,235 +237,151 @@ public List subscribe(ConsumerConfig config) {
}
return null;
}
- // 注册Consumer节点
- if (config.isRegister()) {
- try {
-
- String url = ConsulRegistryHelper.convertConsumerToUrl(config);
- ConsulURL consulURL = ConsulURL.valueOf(url);
-
- Iterator>>> it = serviceCache.asMap().entrySet()
- .iterator();
-
- Set result = new HashSet();
-
- List matchConsulUrls = new ArrayList();
- // find all providerInfos
- while (it.hasNext()) {
- Map.Entry>> entry = it.next();
- Collection> consulURLList = entry.getValue().values();
-
- List matchProviders = new ArrayList();
- for (List next : consulURLList) {
- matchConsulUrls.addAll(next);
- matchProviders.addAll(ConsulRegistryHelper.convertUrl2ProviderInfos(next));
- }
- result.addAll(ConsulRegistryHelper.matchProviderInfos(config, matchProviders));
- }
-
- NotifyConsumerListener listener = new NotifyConsumerListener(consulURL, matchConsulUrls);
-
- consumerUrls.put(config, url);
-
- Pair> listenersPair =
- notifyServiceListeners.get(consulURL.getServiceKey());
-
- if (listenersPair == null) {
- Set listeners = Sets.newConcurrentHashSet();
- listeners.add(listener);
- listenersPair =
- new ImmutablePair>(consulURL, listeners);
- } else {
- listenersPair.getValue().add(listener);
- }
-
- if (notifyServiceListeners.get(consulURL.getServiceKey()) == null) {
- notifyServiceListeners.put(consulURL.getServiceKey(), listenersPair);
- }
- if (!serviceGroupLookUped.contains(consulURL.getGroup())) {
- serviceGroupLookUped.add(consulURL.getGroup());
- ServiceLookUper serviceLookUper = new ServiceLookUper(consulURL.getGroup());
- serviceLookUper.setDaemon(true);
- serviceLookUper.start();
- ConsulEphemeralNode ephemralNode = this.buildEphemralNode(consulURL, ThrallRoleType.CONSUMER);
- consulManager.registerEphemralNode(ephemralNode);
- } else {
- notifyListener(consulURL, listener);
- }
-
- if (EventBus.isEnable(ConsumerSubEvent.class)) {
- ConsumerSubEvent event = new ConsumerSubEvent(config);
- EventBus.post(event);
- }
+ if (!config.isSubscribe()) {
+ return null;
+ }
+ try {
+ List providers = lookupHealthService(config);
- return Collections.singletonList(new ProviderGroup().addAll(result));
- } catch (Exception e) {
- throw new SofaRpcRuntimeException("Failed to register consumer to consulRegistry!", e);
+ if (EventBus.isEnable(ConsumerSubEvent.class)) {
+ ConsumerSubEvent event = new ConsumerSubEvent(config);
+ EventBus.post(event);
}
+
+ return Collections.singletonList(new ProviderGroup().addAll(providers));
+ } catch (Exception e) {
+ throw new SofaRpcRuntimeException("Failed to subscribe provider from consulRegistry!", e);
}
- return null;
}
@Override
public void unSubscribe(ConsumerConfig config) {
-
String appName = config.getAppName();
if (!registryConfig.isSubscribe()) {
// 注册中心不订阅
if (LOGGER.isInfoEnabled(appName)) {
- LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE));
+ LOGGER.infoWithApp(config.getAppName(), LogCodes.getLog(LogCodes.INFO_REGISTRY_IGNORE));
}
}
- // 注册Consumer节点
- if (config.isRegister()) {
- // 向服务器端发送取消订阅请求
- String url = ConsulRegistryHelper.convertConsumerToUrl(config);
- ConsulURL consulURL = ConsulURL.valueOf(url);
- consumerUrls.remove(config);
- notifyServiceListeners.remove(consulURL.getServiceKey());
-
+ if (!config.isSubscribe()) {
+ return;
+ }
+ String uniqueName = buildUniqueName(config, config.getProtocol());
+ HealthServiceInformer informer = healthServiceInformers.get(uniqueName);
+ if (informer == null) {
+ return;
+ }
+ informer.removeListener(config.getProviderInfoListener());
+ if (informer.getListenerSize() == 0) {
+ healthServiceInformers.remove(uniqueName);
+ informer.shutdown();
}
-
}
@Override
public void batchUnSubscribe(List configs) {
-
- for (ConsumerConfig consumerConfig : configs) {
- unSubscribe(consumerConfig);
- }
+ configs.forEach(this::unSubscribe);
}
- private void notifyListener(ConsulURL url, NotifyListener listener) {
- Map> groupCacheUrls = serviceCache.getIfPresent(url.getGroup());
- if (groupCacheUrls != null) {
- for (Map.Entry> entry : groupCacheUrls.entrySet()) {
- String cacheServiceKey = entry.getKey();
- if (url.getServiceKey().equals(cacheServiceKey)) {
- List newUrls = entry.getValue();
- ConsulRegistry.this.notify(url, listener, newUrls);
- }
- }
+ private List lookupHealthService(ConsumerConfig config) {
+ String uniqueName = buildUniqueName(config, config.getProtocol());
+ String serviceName = buildServiceName(config);
+ String informerKey = String.join("-", serviceName, uniqueName);
+ HealthServiceInformer informer = healthServiceInformers.get(informerKey);
+ if (informer == null) {
+ informer = new HealthServiceInformer(serviceName, uniqueName, consulClient, properties);
+ informer.init();
+ healthServiceInformers.put(informerKey, informer);
}
+ informer.addListener(config.getProviderInfoListener());
+ return informer.currentProviders();
}
- protected void notify(final ConsulURL url, final NotifyListener listener,
- final List urls) {
- if (url == null) {
- throw new IllegalArgumentException("notify url == null");
- }
- if (listener == null) {
- throw new IllegalArgumentException("notify listener == null");
- }
- try {
-
- notifyExecutor.submit(new Runnable() {
-
- @Override
- public void run() {
- listener.notify(url, urls);
- }
- });
- } catch (Exception t) {
- // 将失败的通知请求记录到失败列表,定时重试
- LOGGER.error(
- "Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(),
- t);
+ private void deregisterConsulService(String id) {
+ consulClient.agentServiceDeregister(id);
+ ScheduledFuture future = heartbeatFutures.remove(id);
+ if (future != null) {
+ future.cancel(true);
}
}
- private Map> lookupServiceUpdate(String group) {
- Long lastConsulIndexId =
- lookupGroupServices.get(group) == null ? Long.valueOf(0L) : lookupGroupServices.get(group);
- String serviceName = ConsulURLUtils.toServiceName(group);
- ConsulServiceResp consulResp = consulManager.lookupHealthService(serviceName, lastConsulIndexId);
- if (consulResp != null) {
- List consulServcies = consulResp.getConsulServices();
- boolean updated = consulServcies != null && !consulServcies.isEmpty()
- && consulResp.getConsulIndex() > lastConsulIndexId;
- if (updated) {
- Map> groupProviderUrls = Maps.newConcurrentMap();
- for (ConsulService service : consulServcies) {
- ConsulURL providerUrl = buildURL(service);
- String serviceKey = providerUrl.getServiceKey();
- List urlList = groupProviderUrls.get(serviceKey);
- if (urlList == null) {
- urlList = Lists.newArrayList();
- groupProviderUrls.put(serviceKey, urlList);
- }
- urlList.add(providerUrl);
- }
- lookupGroupServices.put(group, consulResp.getConsulIndex());
- return groupProviderUrls;
+ private void registerConsulService(NewService service) {
+ consulClient.agentServiceRegister(service);
+ if (service.getCheck().getTtl() != null) {
+ ScheduledFuture> scheduledFuture =
+ heartbeatExecutor.scheduleAtFixedRate(
+ () -> checkPass(service),
+ 0, properties.getHeartbeatInterval(), TimeUnit.MILLISECONDS);
+
+ // multiple heartbeat use the same service id, remove and cancel the old one, or still use it?
+ ScheduledFuture oldFuture = heartbeatFutures.remove(service.getId());
+ if (oldFuture != null) {
+ oldFuture.cancel(true);
}
+ heartbeatFutures.put(service.getId(), scheduledFuture);
}
- return null;
}
- private ConsulURL buildURL(ConsulService service) {
+ private void checkPass(NewService service) {
try {
- for (String tag : service.getTags()) {
- if (org.apache.commons.lang3.StringUtils.indexOf(tag, ConsulConstants.PROVIDERS_CATEGORY) != -1) {
- String toUrlPath = org.apache.commons.lang3.StringUtils.substringAfter(tag,
- ConsulConstants.PROVIDERS_CATEGORY);
- ConsulURL consulUrl = ConsulURL.valueOf(ConsulURL.decode(toUrlPath));
- return consulUrl;
- }
- }
+ consulClient.agentCheckPass("service:" + service.getId(), "TTL check passing by SOFA RPC");
} catch (Exception e) {
- LOGGER.error("convert consul service to url fail! service:" + service, e);
+ LOGGER.error("Consul check pass failed.", e);
}
- return null;
}
- private class ServiceLookUper extends Thread {
-
- private final String group;
-
- public ServiceLookUper(String group) {
- this.group = group;
+ private List buildNewServices(ProviderConfig> config) {
+ List servers = config.getServer();
+ if (CommonUtils.isEmpty(servers)) {
+ return Collections.emptyList();
}
+ return servers.stream().map(server -> {
+ NewService service = new NewService();
+ service.setId(buildServiceId(config, server));
+ service.setName(buildServiceName(config));
+
+ String host = getServerHost(server);
+ int port = server.getPort();
+ service.setAddress(host);
+ service.setPort(port);
+
+ Map metaData = RegistryUtils.convertProviderToMap(config, server).entrySet().stream()
+ .filter(e -> ConsulUtils.isValidMetaKey(e.getKey()))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ service.setMeta(metaData);
+ service.setTags(Collections.singletonList(buildUniqueName(config, server.getProtocol())));
+
+ service.setCheck(buildCheck(host, port));
+ return service;
+ }).collect(Collectors.toList());
+ }
- @Override
- public void run() {
- while (true) {
- try {
- // 最新拉取的值
- Map> groupNewUrls = lookupServiceUpdate(group);
- if (groupNewUrls != null && !groupNewUrls.isEmpty()) {
- // 缓存中的值
- Map> groupCacheUrls = serviceCache.getIfPresent(group);
- if (groupCacheUrls == null) {
- groupCacheUrls = Maps.newConcurrentMap();
- serviceCache.put(group, groupCacheUrls);
- }
- for (Map.Entry> entry : groupNewUrls.entrySet()) {
- List oldUrls = groupCacheUrls.get(entry.getKey());
- List newUrls = entry.getValue();
- boolean isSame = CommonUtils.listEquals(newUrls, oldUrls);
- if (!isSame) {
- groupCacheUrls.put(entry.getKey(), newUrls);
- Pair> listenerPair =
- notifyServiceListeners.get(entry.getKey());
- if (listenerPair != null) {
- ConsulURL subscribeUrl = listenerPair.getKey();
- Set listeners = listenerPair.getValue();
- for (NotifyListener listener : listeners) {
- ConsulRegistry.this.notify(subscribeUrl, listener, newUrls);
- }
- }
- }
- }
- }
- sleep(ConsulConstants.DEFAULT_LOOKUP_INTERVAL);
- } catch (Throwable e) {
- try {
- Thread.sleep(2000);
- } catch (InterruptedException ignored) {
- }
- }
+ private NewService.Check buildCheck(String serverHost, int serverPort) {
+ NewService.Check check = new NewService.Check();
+ ConsulRegistryProperties.HealthCheckType healthCheckType = properties.getHealthCheckType();
+ if (healthCheckType == ConsulRegistryProperties.HealthCheckType.TTL) {
+ check.setTtl(properties.getHealthCheckTTL());
+ } else if (healthCheckType == ConsulRegistryProperties.HealthCheckType.TCP) {
+ String host = properties.getHealthCheckHost(serverHost);
+ int port = properties.getHealthCheckPort(serverPort);
+ check.setTcp(host + ":" + port);
+ check.setInterval(properties.getHealthCheckInterval());
+ check.setTimeout(properties.getHealthCheckTimeout());
+ } else {
+ String host = properties.getHealthCheckHost(serverHost);
+ int port = properties.getHealthCheckPort(serverPort);
+ String address;
+ try {
+ address = new URL(properties.getHealthCheckProtocol(), host, port, properties.getHealthCheckPath()).toString();
+ } catch (Exception e) {
+ throw new SofaRpcRuntimeException("Invalid health check url!", e);
}
+ check.setHttp(address);
+ check.setMethod(properties.getHealthCheckMethod());
+ check.setInterval(properties.getHealthCheckInterval());
+ check.setTimeout(properties.getHealthCheckTimeout());
}
+ return check;
}
}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryHelper.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryHelper.java
deleted file mode 100644
index fdcaaa38c..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryHelper.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul;
-
-import com.alipay.sofa.rpc.client.ProviderHelper;
-import com.alipay.sofa.rpc.client.ProviderInfo;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURL;
-import com.alipay.sofa.rpc.registry.utils.RegistryUtils;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Helper for ConsulRegistry
- *
- * @author dingpeng
- */
-public class ConsulRegistryHelper extends RegistryUtils {
-
- static List convertUrl2ProviderInfos(List consulUrls) {
- List result = new ArrayList();
-
- for (ConsulURL consulUrl : consulUrls) {
- ProviderInfo providerInfo = new ProviderInfo();
-
- ProviderHelper.toProviderInfo(consulUrl.toString());
- providerInfo.setPath(consulUrl.getPath());
- providerInfo.setPort(consulUrl.getPort());
-
- result.add(providerInfo);
- }
-
- return result;
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryProperties.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryProperties.java
new file mode 100644
index 000000000..8958b136d
--- /dev/null
+++ b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryProperties.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.registry.consul;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_INTERVAL;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_METHOD;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_PATH;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_PROTOCOL;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_TIMEOUT;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_TTL;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEALTH_CHECK_TYPE;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEARTBEAT_CORE_SIZE;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_HEARTBEAT_INTERVAL;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_LOOKUP_INTERVAL;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.DEFAULT_WATCH_TIMEOUT;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_HOST_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_INTERVAL_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_METHOD_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_PATH_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_PORT_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_PROTOCOL_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_TIMEOUT_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_TTL_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEALTH_CHECK_TYPE_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEARTBEAT_CORE_SIZE_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.HEARTBEAT_INTERVAL_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.LOOKUP_INTERVAL_KEY;
+import static com.alipay.sofa.rpc.registry.consul.ConsulConstants.WATCH_TIMEOUT_KEY;
+
+/**
+ * All configurations of the consul registry
+ *
+ * @author ScienJus
+ */
+public class ConsulRegistryProperties {
+
+ private final Map registryParameters;
+
+ public ConsulRegistryProperties(Map registryParameters) {
+ if (registryParameters == null) {
+ registryParameters = Collections.emptyMap();
+ }
+ this.registryParameters = registryParameters;
+ }
+
+ public int getHeartbeatInterval() {
+ return getInt(HEARTBEAT_INTERVAL_KEY, DEFAULT_HEARTBEAT_INTERVAL);
+ }
+
+ public int getHeartbeatCoreSize() {
+ return getInt(HEARTBEAT_CORE_SIZE_KEY, DEFAULT_HEARTBEAT_CORE_SIZE);
+ }
+
+ public int getLookupInterval() {
+ return getInt(LOOKUP_INTERVAL_KEY, DEFAULT_LOOKUP_INTERVAL);
+ }
+
+ public int getWatchTimeout() {
+ return getInt(WATCH_TIMEOUT_KEY, DEFAULT_WATCH_TIMEOUT);
+ }
+
+ public HealthCheckType getHealthCheckType() {
+ return get(HEALTH_CHECK_TYPE_KEY, s -> HealthCheckType.valueOf(s.toUpperCase()), DEFAULT_HEALTH_CHECK_TYPE);
+ }
+
+ public String getHealthCheckTTL() {
+ return getString(HEALTH_CHECK_TTL_KEY, DEFAULT_HEALTH_CHECK_TTL);
+ }
+
+ public String getHealthCheckHost(String host) {
+ return getString(HEALTH_CHECK_HOST_KEY, host);
+ }
+
+ public int getHealthCheckPort(int port) {
+ return getInt(HEALTH_CHECK_PORT_KEY, port);
+ }
+
+ public String getHealthCheckTimeout() {
+ return getString(HEALTH_CHECK_TIMEOUT_KEY, DEFAULT_HEALTH_CHECK_TIMEOUT);
+ }
+
+ public String getHealthCheckInterval() {
+ return getString(HEALTH_CHECK_INTERVAL_KEY, DEFAULT_HEALTH_CHECK_INTERVAL);
+ }
+
+ public String getHealthCheckProtocol() {
+ return getString(HEALTH_CHECK_PROTOCOL_KEY, DEFAULT_HEALTH_CHECK_PROTOCOL);
+ }
+
+ public String getHealthCheckPath() {
+ return getString(HEALTH_CHECK_PATH_KEY, DEFAULT_HEALTH_CHECK_PATH);
+ }
+
+ public String getHealthCheckMethod() {
+ return getString(HEALTH_CHECK_METHOD_KEY, DEFAULT_HEALTH_CHECK_METHOD);
+ }
+
+ private int getInt(String key, int defaultValue) {
+ return get(key, Integer::parseInt, defaultValue);
+ }
+
+ private String getString(String key, String defaultValue) {
+ return get(key, Function.identity(), defaultValue);
+ }
+
+ private T get(String key, Function transform, T defaultValue) {
+ String value = registryParameters.get(key);
+ if (value != null) {
+ return transform.apply(value);
+ }
+ return defaultValue;
+ }
+
+ public enum HealthCheckType {
+ TTL, TCP, HTTP
+ }
+}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulUtils.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulUtils.java
new file mode 100644
index 000000000..5b89c3f07
--- /dev/null
+++ b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/ConsulUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.registry.consul;
+
+import com.alipay.sofa.rpc.common.utils.CommonUtils;
+import com.alipay.sofa.rpc.config.AbstractInterfaceConfig;
+import com.alipay.sofa.rpc.config.ProviderConfig;
+import com.alipay.sofa.rpc.config.ServerConfig;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.buildUniqueName;
+import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.getServerHost;
+
+/**
+ * @author ScienJus
+ */
+public class ConsulUtils {
+
+ private static final Pattern META_KEY_PATTERN = Pattern.compile("[a-zA-Z0-9\\-_]+");
+
+ /**
+ * Key can only contain A-Z a-z 0-9 _ and -.
+ * @param key
+ * @return
+ */
+ public static boolean isValidMetaKey(String key) {
+ return META_KEY_PATTERN.matcher(key).matches();
+ }
+
+ public static String buildServiceName(AbstractInterfaceConfig config) {
+ String consulServiceName = config.getParameter(ConsulConstants.CONSUL_SERVICE_NAME_KEY);
+ if (consulServiceName != null) {
+ return consulServiceName;
+ }
+ return config.getInterfaceId();
+ }
+
+ public static List buildServiceIds(ProviderConfig> config) {
+ List servers = config.getServer();
+ if (CommonUtils.isEmpty(servers)) {
+ return Collections.emptyList();
+ }
+ return servers.stream()
+ .map(server -> buildServiceId(config, server))
+ .collect(Collectors.toList());
+ }
+
+ public static String buildServiceId(ProviderConfig config, ServerConfig server) {
+ return String.join("-", buildUniqueName(config, server.getProtocol()), getServerHost(server),
+ String.valueOf(server.getPort()));
+ }
+}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/HealthServiceInformer.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/HealthServiceInformer.java
new file mode 100644
index 000000000..cd3743a93
--- /dev/null
+++ b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/HealthServiceInformer.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alipay.sofa.rpc.registry.consul;
+
+import com.alipay.sofa.rpc.client.ProviderGroup;
+import com.alipay.sofa.rpc.client.ProviderHelper;
+import com.alipay.sofa.rpc.client.ProviderInfo;
+import com.alipay.sofa.rpc.listener.ProviderInfoListener;
+import com.alipay.sofa.rpc.log.Logger;
+import com.alipay.sofa.rpc.log.LoggerFactory;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static com.alipay.sofa.rpc.registry.utils.RegistryUtils.convertInstanceToUrl;
+
+/**
+ * Observe the providers from consul and notify the consumers
+ *
+ * @author ScienJus
+ */
+public class HealthServiceInformer {
+
+ private static final Logger LOGGER = LoggerFactory
+ .getLogger(HealthServiceInformer.class);
+
+ private String serviceName;
+
+ private String tag;
+
+ private Response> currentData;
+
+ private ConsulClient consulClient;
+
+ private ConsulRegistryProperties properties;
+
+ private List listeners;
+
+ private ScheduledExecutorService watchExecutor;
+
+ public HealthServiceInformer(String serviceName, String tag, ConsulClient consulClient, ConsulRegistryProperties properties) {
+ this.serviceName = serviceName;
+ this.tag = tag;
+ this.consulClient = consulClient;
+ this.properties = properties;
+ this.listeners = new ArrayList<>();
+ }
+
+ private void watchHealthService() {
+ try {
+ HealthServicesRequest request = HealthServicesRequest.newBuilder()
+ .setTag(tag)
+ .setQueryParams(new QueryParams(properties.getWatchTimeout(), currentData.getConsulIndex()))
+ .setPassing(true)
+ .build();
+ Response> response = consulClient.getHealthServices(serviceName, request);
+ if (response.getConsulIndex().equals(currentData.getConsulIndex())) {
+ return;
+ }
+ this.currentData = response;
+ ProviderGroup providerGroup = new ProviderGroup(currentProviders());
+ listeners.stream().filter(Objects::nonNull).forEach(l -> l.updateProviders(providerGroup));
+ } catch (Exception e) {
+ LOGGER.error("Consul watch health service failed.", e);
+ }
+ }
+
+ public void init() {
+ HealthServicesRequest request = HealthServicesRequest.newBuilder()
+ .setTag(tag)
+ .setQueryParams(QueryParams.DEFAULT)
+ .setPassing(true)
+ .build();
+ this.currentData = consulClient.getHealthServices(serviceName, request);
+
+ this.watchExecutor = Executors.newSingleThreadScheduledExecutor();
+ this.watchExecutor.scheduleWithFixedDelay(this::watchHealthService, properties.getLookupInterval(), properties.getLookupInterval(), TimeUnit.MILLISECONDS);
+ }
+
+ public List currentProviders() {
+ return currentData.getValue().stream()
+ .map(HealthService::getService)
+ .map(service -> ProviderHelper.toProviderInfo(convertInstanceToUrl(service.getAddress(), service.getPort(), service.getMeta())))
+ .collect(Collectors.toList());
+ }
+
+ public void addListener(ProviderInfoListener listener) {
+ listeners.add(listener);
+ }
+
+ public void removeListener(ProviderInfoListener listener) {
+ listeners.remove(listener);
+ }
+
+ public int getListenerSize() {
+ return listeners.size();
+ }
+
+ public void shutdown() {
+ this.watchExecutor.shutdown();
+ }
+
+}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulConstants.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulConstants.java
deleted file mode 100644
index 4d228bd22..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulConstants.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.common;
-
-import java.util.regex.Pattern;
-
-/**
- * Consul Constants
- *
- * @author dingpeng
- */
-public class ConsulConstants {
-
- /**
- * service 最长存活周期(Time To Live),单位秒。 每个service会注册一个ttl类型的check,在最长TTL秒不发送心跳 就会将service变为不可用状态。
- */
- public static int TTL = 30;
-
- /**
- * 心跳周期,取ttl的2/3
- */
- public static int HEARTBEAT_CIRCLE = (TTL * 1000 * 2) / 3 / 10;
-
- /**
- * consul服务查询默认间隔时间。单位毫秒
- */
- public static int DEFAULT_LOOKUP_INTERVAL = 30000;
-
- /**
- * consul block 查询时 block的最长时间,单位,分钟
- */
- public static int CONSUL_BLOCK_TIME_MINUTES = 10;
-
- /**
- * consul block 查询时 block的最长时间,单位,秒
- */
- public static long CONSUL_BLOCK_TIME_SECONDS = CONSUL_BLOCK_TIME_MINUTES * 60;
-
- public static final Pattern COMMA_SPLIT_PATTERN = Pattern.compile("\\s*[,]+\\s*");
-
- public static final String DEFAULT_VERSION = "1.0.0";
- public static final String LOCALHOST_KEY = "localhost";
- public static final String ANYHOST_KEY = "anyhost";
- public static final String ANYHOST_VALUE = "0.0.0.0";
-
- public static final String CONSUL_SERVICE_PRE = "consul_";
- public static final String PATH_SEPARATOR = "/";
- public static final String PROVIDERS_CATEGORY = "providers";
- public static final String CONSUMERS_CATEGORY = "consumers";
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulURL.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulURL.java
deleted file mode 100644
index 6c95698fb..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulURL.java
+++ /dev/null
@@ -1,437 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.common;
-
-import com.alipay.sofa.rpc.common.RpcConstants;
-
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-import java.net.InetAddress;
-import java.net.URLDecoder;
-import java.net.URLEncoder;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * ConsulURL
- *
- * @author dingpeng
- */
-public class ConsulURL implements Serializable {
-
- private static final long serialVersionUID = -1985165475234910535L;
-
- final static String UTF8_ENCODING = "UTF-8";
-
- final static String INTERFACE = "interface";
-
- final static String GROUP_KEY = "group";
-
- private final String protocol;
-
- private final String host;
-
- private final int port;
-
- private final String group;
-
- private final String interfaceId;
-
- private final String path;
-
- private final Map parameters;
-
- private volatile transient String ip;
-
- private volatile transient String full;
-
- private volatile transient String string;
-
- protected ConsulURL() {
- this.protocol = null;
- this.host = null;
- this.port = 0;
- this.path = null;
- this.group = null;
- this.interfaceId = null;
- this.parameters = null;
- }
-
- public ConsulURL(String protocol, String host, int port, String path, String group,
- String interfaceId,
- Map parameters) {
- this.protocol = protocol;
- this.host = host;
- this.port = (port < 0 ? 0 : port);
- this.path = path;
- this.interfaceId = interfaceId;
- this.group = group;
- while (path != null && path.startsWith("/")) {
- path = path.substring(1);
- }
- if (parameters == null) {
- parameters = new HashMap();
- } else {
- parameters = new HashMap(parameters);
- }
- this.parameters = Collections.unmodifiableMap(parameters);
- }
-
- public static ConsulURL valueOf(String url) {
- if (url == null || (url = url.trim()).length() == 0) {
- throw new IllegalArgumentException("url is null");
- }
- String protocol = null;
- String host = null;
- String interfaceId = null;
- String group = null;
- int port = 0;
- String path = null;
- Map parameters = null;
- int i = url.indexOf("?");
- if (i >= 0) {
- //seperate with & to key=value
- String[] parts = url.substring(i + 1).split("\\&");
- parameters = new HashMap();
- for (String part : parts) {
- part = part.trim();
- if (part.length() > 0) {
- int j = part.indexOf('=');
- if (j >= 0) {
- parameters.put(part.substring(0, j), part.substring(j + 1));
- } else {
- parameters.put(part, part);
- }
- }
- }
- interfaceId = parameters.get(INTERFACE);
- group = parameters.get(GROUP_KEY);
- url = url.substring(0, i);
- }
- i = url.indexOf("://");
- if (i >= 0) {
- if (i == 0) {
- throw new IllegalStateException("url missing protocol: \"" + url + "\"");
- }
- protocol = url.substring(0, i);
- url = url.substring(i + 3);
- } else {
- // maybe: file:/path/to/file
- i = url.indexOf(":/");
- if (i >= 0) {
- if (i == 0) {
- throw new IllegalStateException("url missing protocol: \"" + url + "\"");
- }
- protocol = url.substring(0, i);
- url = url.substring(i + 1);
- }
- }
-
- i = url.indexOf("/");
- if (i >= 0) {
- path = url.substring(i + 1);
- url = url.substring(0, i);
- }
- i = url.indexOf(":");
- if (i >= 0 && i < url.length() - 1) {
- port = Integer.parseInt(url.substring(i + 1));
- url = url.substring(0, i);
- }
- if (url.length() > 0) {
- host = url;
- }
- return new ConsulURL(protocol, host, port, path, group, interfaceId, parameters);
- }
-
- public String getProtocol() {
- return protocol;
- }
-
- public String getHost() {
- return host;
- }
-
- public String getIp() {
-
- try {
- return InetAddress.getByName(ip).getHostAddress();
- } catch (UnknownHostException e) {
- return ip;
- }
- }
-
- public int getPort() {
- return port;
- }
-
- public int getPort(int defaultPort) {
- return port <= 0 ? defaultPort : port;
- }
-
- public String getAddress() {
- return port <= 0 ? host : host + ":" + port;
- }
-
- public String getPath() {
- return path;
- }
-
- public ConsulURL setProtocol(String protocol) {
- return new ConsulURL(protocol, host, port, path, interfaceId, group, getParameters());
- }
-
- public ConsulURL setAddress(String address) {
- int i = address.lastIndexOf(':');
- String host;
- int port = this.port;
- if (i >= 0) {
- host = address.substring(0, i);
- port = Integer.parseInt(address.substring(i + 1));
- } else {
- host = address;
- }
- return new ConsulURL(protocol, host, port, path, interfaceId, group, getParameters());
- }
-
- public ConsulURL setPort(int port) {
- return new ConsulURL(protocol, host, port, path, interfaceId, group, getParameters());
- }
-
- public Map getParameters() {
- return parameters;
- }
-
- public String getParameter(String key) {
- String value = parameters.get(key);
- return value;
- }
-
- public String getParameter(String key, String defaultValue) {
- String value = getParameter(key);
- if (value == null || value.length() == 0) {
- return defaultValue;
- }
- return value;
- }
-
- @Override
- public String toString() {
- if (string != null) {
- return string;
- }
- return string = buildString(false, true);
- }
-
- public String toString(String... parameters) {
- return buildString(false, true, parameters);
- }
-
- public String toFullString() {
- if (full != null) {
- return full;
- }
- return full = buildString(true, true);
- }
-
- private void buildParameters(StringBuilder buf, boolean concat, String[] parameters) {
- if (getParameters() != null && getParameters().size() > 0) {
- List includes = (parameters == null || parameters.length == 0 ? null : Arrays.asList(parameters));
- boolean first = true;
- for (Map.Entry entry : new TreeMap(getParameters()).entrySet()) {
- if (entry.getKey() != null && entry.getKey().length() > 0 &&
- (includes == null || includes.contains(entry.getKey()))) {
- if (first) {
- if (concat) {
- buf.append("?");
- }
- first = false;
- } else {
- buf.append("&");
- }
- buf.append(entry.getKey());
- buf.append("=");
- buf.append(entry.getValue() == null ? "" : entry.getValue().trim());
- }
- }
- }
- }
-
- private String buildString(boolean appendUser, boolean appendParameter, String... parameters) {
- return buildString(appendUser, appendParameter, false, false, parameters);
- }
-
- private String buildString(boolean appendUser, boolean appendParameter, boolean useIP, boolean useService,
- String... parameters) {
- StringBuilder buf = new StringBuilder();
- if (protocol != null && protocol.length() > 0) {
- buf.append(protocol);
- buf.append("://");
- }
- String host;
- if (useIP) {
- host = getIp();
- } else {
- host = getHost();
- }
- if (host != null && host.length() > 0) {
- buf.append(host);
- if (port > 0) {
- buf.append(":");
- buf.append(port);
- }
- }
- String path;
- if (useService) {
- path = getServiceKey();
- } else {
- path = getPath();
- }
- if (path != null && path.length() > 0) {
- buf.append("/");
- buf.append(path);
- }
- if (appendParameter) {
- buildParameters(buf, true, parameters);
- }
- return buf.toString();
- }
-
- public String getServiceKey() {
- String inf = getServiceInterface();
- if (inf == null) {
- return null;
- }
- StringBuilder buf = new StringBuilder();
- String group = getGroup();
- if (group != null && group.length() > 0) {
- buf.append(group).append("/");
- }
- buf.append(inf);
- String version = getVersion();
- if (version != null && version.length() > 0) {
- buf.append(":").append(version);
- }
- return buf.toString();
- }
-
- public String getGroup() {
- return getParameter(RpcConstants.CONFIG_KEY_UNIQUEID, RpcConstants.ADDRESS_DEFAULT_GROUP);
- }
-
- public String getVersion() {
- return getParameter(RpcConstants.CONFIG_KEY_RPC_VERSION, ConsulConstants.DEFAULT_VERSION);
- }
-
- public String getServiceInterface() {
- return getParameter(RpcConstants.CONFIG_KEY_INTERFACE, "");
- }
-
- public static String encode(String value) {
- if (value == null || value.length() == 0) {
- return "";
- }
- try {
- return URLEncoder.encode(value, UTF8_ENCODING);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- public static String decode(String value) {
- if (value == null || value.length() == 0) {
- return "";
- }
- try {
- return URLDecoder.decode(value, UTF8_ENCODING);
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + ((path == null) ? 0 : path.hashCode());
- result = prime * result + port;
- result = prime * result + ((protocol == null) ? 0 : protocol.hashCode());
- result = prime * result + ((this.getGroup() == null) ? 0 : this.getGroup().hashCode());
- result = prime * result + ((this.getVersion() == null) ? 0 : this.getVersion().hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- ConsulURL other = (ConsulURL) obj;
- if (host == null) {
- if (other.host != null) {
- return false;
- }
- } else if (!host.equals(other.host)) {
- return false;
- }
- if (path == null) {
- if (other.path != null) {
- return false;
- }
- } else if (!path.equals(other.path)) {
- return false;
- }
- if (port != other.port) {
- return false;
- }
- if (protocol == null) {
- if (other.protocol != null) {
- return false;
- }
- } else if (!protocol.equals(other.protocol)) {
- return false;
- }
- if (this.getGroup() == null) {
- if (other.getGroup() != null) {
- return false;
- }
- } else if (!this.getGroup().equals(other.getGroup())) {
- return false;
- }
- if (this.getVersion() == null) {
- if (other.getVersion() != null) {
- return false;
- }
- } else if (!this.getVersion().equals(other.getVersion())) {
- return false;
- }
-
- return true;
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulURLUtils.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulURLUtils.java
deleted file mode 100644
index b650b0ee0..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/common/ConsulURLUtils.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.common;
-
-import com.alipay.sofa.rpc.common.utils.StringUtils;
-import com.alipay.sofa.rpc.registry.consul.model.ThrallRoleType;
-
-import java.util.regex.Pattern;
-
-/**
- * ConsulURL工具类
- *
- * @author dingpeng
- */
-public class ConsulURLUtils {
-
- private static final Pattern ADDRESS_PATTERN =
- Pattern
- .compile("^(25[0-5]|2[0-4]\\d|[0-1]?\\d?\\d)(\\.(25[0-5]|2[0-4]\\d|[0-1]?\\d?\\d)){3}$");
-
- private ConsulURLUtils() {
-
- }
-
- public static boolean isValidAddress(String address) {
- String[] ipAndHost = StringUtils.split(address, ":");
-
- return ipAndHost.length == 2 && ADDRESS_PATTERN.matcher(ipAndHost[0]).matches();
- }
-
- public static String toServiceName(String group) {
- return ConsulConstants.CONSUL_SERVICE_PRE + group;
- }
-
- private static String toServicePath(ConsulURL url) {
- String name = url.getServiceInterface();
- String group = url.getGroup();
- return group + ConsulConstants.PATH_SEPARATOR + ConsulURL.encode(name);
- }
-
- public static String toCategoryPathNotIncludeVersion(ConsulURL url, ThrallRoleType roleType) {
- switch (roleType) {
- case CONSUMER:
- return toServicePath(url) + ConsulConstants.PATH_SEPARATOR + ConsulConstants.CONSUMERS_CATEGORY;
- case PROVIDER:
- return toServicePath(url) + ConsulConstants.PATH_SEPARATOR + ConsulConstants.PROVIDERS_CATEGORY;
- default:
- throw new IllegalArgumentException("there is no role type");
- }
-
- }
-
- public static String toCategoryPathIncludeVersion(ConsulURL url, ThrallRoleType roleType) {
- switch (roleType) {
- case CONSUMER:
- return toServicePath(url) + ConsulConstants.PATH_SEPARATOR + url.getVersion()
- + ConsulConstants.PATH_SEPARATOR + ConsulConstants.CONSUMERS_CATEGORY;
- case PROVIDER:
- return toServicePath(url) + ConsulConstants.PATH_SEPARATOR + url.getVersion()
- + ConsulConstants.PATH_SEPARATOR + ConsulConstants.PROVIDERS_CATEGORY;
- default:
- throw new IllegalArgumentException("there is no role type");
- }
-
- }
-
- public static String healthServicePath(ConsulURL url, ThrallRoleType roleType) {
- return toCategoryPathNotIncludeVersion(url, roleType) + ConsulConstants.PATH_SEPARATOR
- + ConsulURL.encode(url.toFullString());
- }
-
- public static String ephemralNodePath(ConsulURL url, ThrallRoleType roleType) {
- return ConsulConstants.CONSUL_SERVICE_PRE + toCategoryPathIncludeVersion(url, roleType)
- + ConsulConstants.PATH_SEPARATOR + url.getAddress();
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/internal/ConsulManager.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/internal/ConsulManager.java
deleted file mode 100644
index b27f2ab7c..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/internal/ConsulManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.internal;
-
-import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulConstants;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulEphemeralNode;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulRouterResp;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulService;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulServiceResp;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulSession;
-import com.alipay.sofa.rpc.registry.consul.model.HeartbeatService;
-import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.QueryParams;
-import com.ecwid.consul.v1.Response;
-import com.ecwid.consul.v1.agent.model.NewService;
-import com.ecwid.consul.v1.health.model.HealthService;
-import com.ecwid.consul.v1.health.model.HealthService.Service;
-import com.ecwid.consul.v1.kv.model.GetValue;
-import com.ecwid.consul.v1.kv.model.PutParams;
-import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * 封装ecwid consul client
- *
- * @author dingpeng
- */
-public class ConsulManager {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ConsulManager.class);
-
- private final Object lock = new Object();
-
- private final ConsulClient client;
-
- private final TtlScheduler ttlScheduler;
-
- private final ScheduledExecutorService scheduleRegistry;
-
- public ConsulManager(String host, int port) {
- client = new ConsulClient(host, port);
- ttlScheduler = new TtlScheduler(client);
- scheduleRegistry = Executors.newScheduledThreadPool(1, new NamedThreadFactory("retryFailedTtl", true));
- scheduleRegistry.scheduleAtFixedRate(new Runnable() {
-
- @Override
- public void run() {
- try {
- retryFailedTtl();
- } catch (Throwable e) {
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("retry registry znode failed", e);
- }
- }
- }
- }, ConsulConstants.HEARTBEAT_CIRCLE, ConsulConstants.HEARTBEAT_CIRCLE, TimeUnit.MILLISECONDS);
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info("ConsulEcwidClient init finish. client host:" + host + ", port:" + port);
- }
- }
-
- private void retryFailedTtl() {
- Set failedService = ttlScheduler.getFailedService();
- Set failedSession = ttlScheduler.getFailedSession();
- if (failedSession.size() > 0 || failedService.size() > 0) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("retry to registry failed service %d or failed session %d",
- failedService.size(),
- failedSession.size()));
- }
- for (HeartbeatService heartbeatService : failedService) {
- registerService(heartbeatService.getService());
- }
- Set allSuccess = Sets.newHashSet();
- for (ConsulSession consulSession : failedSession) {
- allSuccess.add(registerEphemralNode(consulSession.getEphemralNode()));
- }
- if (!allSuccess.contains(Boolean.FALSE)) {
- ttlScheduler.cleanFailedTtl();
- }
- }
- }
-
- public void registerService(ConsulService service) {
- NewService newService = service.getNewService();
- client.agentServiceRegister(newService);
- HeartbeatService heartbeatService = new HeartbeatService(service, newService);
- ttlScheduler.addHeartbeatServcie(heartbeatService);
- }
-
- public void unregisterService(ConsulService service) {
- NewService newService = service.getNewService();
- client.agentServiceDeregister(newService.getId());
- HeartbeatService heartbeatService = new HeartbeatService(service, newService);
- ttlScheduler.removeHeartbeatServcie(heartbeatService);
- }
-
- public Boolean registerEphemralNode(ConsulEphemeralNode ephemralNode) {
- String sessionId = null;
- List sessions = client.getSessionList(QueryParams.DEFAULT).getValue();
- if (sessions != null && !sessions.isEmpty()) {
- for (Session session : sessions) {
- if (session.getName().equals(ephemralNode.getSessionName())) {
- sessionId = session.getId();
- }
- }
- }
- if (sessionId == null) {
- NewSession newSession = ephemralNode.getNewSession();
- synchronized (lock) {
- sessionId = client.sessionCreate(newSession, QueryParams.DEFAULT).getValue();
- }
- }
- ConsulSession session = new ConsulSession(sessionId, ephemralNode);
- ttlScheduler.addHeartbeatSession(session);
- PutParams kvPutParams = new PutParams();
- kvPutParams.setAcquireSession(sessionId);
-
- client.getKVValue(ephemralNode.getEphemralNodeKey());
-
- return client.setKVValue(ephemralNode.getEphemralNodeKey(), ephemralNode.getEphemralNodeValue(),
- kvPutParams).getValue();
- }
-
- public ConsulRouterResp lookupRouterMessage(String serviceName, long lastConsulIndex) {
- QueryParams queryParams = new QueryParams(ConsulConstants.CONSUL_BLOCK_TIME_SECONDS, lastConsulIndex);
- Response orgResponse = client.getKVValue(serviceName, queryParams);
- GetValue getValue = orgResponse.getValue();
- if (getValue != null && StringUtils.isNotBlank(getValue.getValue())) {
- String router = new String(Base64.decodeBase64(getValue.getValue()));
- ConsulRouterResp response = ConsulRouterResp.newResponse()//
- .withValue(router)//
- .withConsulIndex(orgResponse.getConsulIndex())//
- .withConsulLastContact(orgResponse.getConsulLastContact())//
- .withConsulKnowLeader(orgResponse.isConsulKnownLeader())//
- .build();
- return response;
- }
- return null;
- }
-
- public ConsulServiceResp lookupHealthService(String serviceName, long lastConsulIndex) {
- QueryParams queryParams = new QueryParams(ConsulConstants.CONSUL_BLOCK_TIME_SECONDS, lastConsulIndex);
- Response> orgResponse = client.getHealthServices(serviceName, true, queryParams);
- if (orgResponse != null && orgResponse.getValue() != null && !orgResponse.getValue().isEmpty()) {
- List healthServices = orgResponse.getValue();
- List consulServices = Lists.newArrayList();
- for (HealthService orgService : healthServices) {
- Service org = orgService.getService();
- ConsulService newService = ConsulService.newService()//
- .withAddress(org.getAddress())//
- .withName(org.getService())//
- .withId(org.getId())//
- .withPort(org.getPort().toString())//
- .withTags(org.getTags())//
- .build();
- consulServices.add(newService);
- }
- if (!consulServices.isEmpty()) {
- ConsulServiceResp response = ConsulServiceResp.newResponse()//
- .withValue(consulServices)//
- .withConsulIndex(orgResponse.getConsulIndex())//
- .withConsulLastContact(orgResponse.getConsulLastContact())//
- .withConsulKnowLeader(orgResponse.isConsulKnownLeader())//
- .build();
- return response;
- }
- }
- return null;
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/internal/TtlScheduler.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/internal/TtlScheduler.java
deleted file mode 100644
index 80e55566f..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/internal/TtlScheduler.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.internal;
-
-import com.alipay.sofa.rpc.common.struct.NamedThreadFactory;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulConstants;
-import com.alipay.sofa.rpc.registry.consul.model.ConsulSession;
-import com.alipay.sofa.rpc.registry.consul.model.HeartbeatService;
-import com.ecwid.consul.v1.ConsulClient;
-import com.ecwid.consul.v1.QueryParams;
-import com.google.common.collect.Sets;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-/**
- * ttl健康检查
- *
- * @author dingpeng
- */
-public class TtlScheduler {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(TtlScheduler.class);
-
- private final Set services = Sets.newConcurrentHashSet();
-
- private final Set sessions = Sets.newConcurrentHashSet();
-
- private final Set failedservices = Sets.newConcurrentHashSet();
-
- private final Set failedsessions = Sets.newConcurrentHashSet();
-
- private final ScheduledExecutorService heartbeatServiceExecutor = Executors.newScheduledThreadPool(1,
- new NamedThreadFactory("CheckServiceTimer",
- true));
-
- private final ScheduledExecutorService heartbeatSessionExecutor = Executors.newScheduledThreadPool(1,
- new NamedThreadFactory("CheckSessionTimer",
- true));
-
- private final ConsulClient client;
-
- public TtlScheduler(ConsulClient client) {
- this.client = client;
- heartbeatServiceExecutor.scheduleAtFixedRate(new ConsulHeartbeatServiceTask(),
- ConsulConstants.HEARTBEAT_CIRCLE,
- ConsulConstants.HEARTBEAT_CIRCLE, TimeUnit.MILLISECONDS);
- heartbeatSessionExecutor.scheduleAtFixedRate(new ConsulHeartbeatSessionTask(),
- ConsulConstants.HEARTBEAT_CIRCLE,
- ConsulConstants.HEARTBEAT_CIRCLE, TimeUnit.MILLISECONDS);
- }
-
- public void addHeartbeatServcie(final HeartbeatService service) {
- services.add(service);
- }
-
- public void addHeartbeatSession(final ConsulSession session) {
- sessions.add(session);
- }
-
- public void removeHeartbeatServcie(final HeartbeatService service) {
- services.remove(service);
- }
-
- public Set getFailedService() {
- return failedservices;
- }
-
- public Set getFailedSession() {
- return failedsessions;
- }
-
- public void cleanFailedTtl() {
- failedsessions.clear();
- failedservices.clear();
- }
-
- private class ConsulHeartbeatServiceTask implements Runnable {
-
- @Override
- public void run() {
- for (HeartbeatService service : services) {
- try {
- String checkId = service.getNewService().getId();
- if (!checkId.startsWith("service:")) {
- checkId = "service:" + checkId;
- }
- client.agentCheckPass(checkId);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sending consul heartbeat for: {}", checkId);
- }
- } catch (Throwable e) {
- failedservices.add(service);
- services.remove(service);
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
- }
-
- private class ConsulHeartbeatSessionTask implements Runnable {
-
- @Override
- public void run() {
- Set sessionIds = Sets.newHashSet();
- for (ConsulSession session : sessions) {
- try {
- String sessionId = session.getSessionId();
- if (!sessionIds.contains(sessionId)) {
- client.renewSession(sessionId, QueryParams.DEFAULT);
- sessionIds.add(sessionId);
- }
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug("Sending consul heartbeat for: {}", sessionId);
- }
- } catch (Throwable e) {
- failedsessions.addAll(sessions);
- sessions.clear();
- LOGGER.error(e.getMessage(), e);
- }
- }
- }
- }
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/AbstractBuilder.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/AbstractBuilder.java
deleted file mode 100644
index 868d1bcb7..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/AbstractBuilder.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import java.util.Map;
-
-abstract class AbstractBuilder {
-
- private static final String VARIABLE_START = "${";
- private static final char VARIABLE_END = '}';
- private static final char DEFAULT_VALUE_START = ':';
-
- private static Map environment = System.getenv();
-
- static void setEnvironmentForTesting(Map environment) {
- AbstractBuilder.environment = environment;
- }
-
- protected String substituteEnvironmentVariables(String value) {
- // It might not look pretty, but this is actually about the fastest way to do it!
- final StringBuilder result = new StringBuilder();
- final int length = value.length();
- int index = 0;
- while (index < length) {
- final int start = value.indexOf(VARIABLE_START, index);
- if (start == -1) {
- result.append(value.substring(index));
- return result.toString();
- }
- final int end = value.indexOf(VARIABLE_END, start);
- if (end == -1) {
- result.append(value.substring(index));
- return result.toString();
- }
- if (start > index) {
- result.append(value.substring(index, start));
- }
- String defaultValue = null;
- String variable = value.substring(start + 2, end);
- final int split = variable.indexOf(DEFAULT_VALUE_START);
- if (split != -1) {
- defaultValue = variable.substring(split + 1);
- variable = variable.substring(0, split);
- }
- if (environment.containsKey(variable)) {
- result.append(environment.get(variable));
- } else if (defaultValue != null) {
- result.append(defaultValue);
- }
- index = end + 1;
-
- }
- return result.toString();
- }
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulEphemeralNode.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulEphemeralNode.java
deleted file mode 100644
index 848ad1ad8..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulEphemeralNode.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURL;
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURLUtils;
-import com.ecwid.consul.v1.session.model.NewSession;
-import com.ecwid.consul.v1.session.model.Session;
-
-/**
- * Consul 临时节点
- *
- * @author dingpeng
- */
-public final class ConsulEphemeralNode {
-
- private final ConsulURL url;
-
- private final String interval;
-
- private final ThrallRoleType ephemeralType;
-
- private ConsulEphemeralNode(Builder builder) {
- this.url = builder.url;
- this.interval = builder.interval;
- this.ephemeralType = builder.ephemeralType;
- }
-
- public NewSession getNewSession() {
- NewSession newSersson = new NewSession();
- newSersson.setName(getSessionName());
- newSersson.setLockDelay(15);
- newSersson.setBehavior(Session.Behavior.DELETE);
- newSersson.setTtl(this.interval + "s");
- return newSersson;
- }
-
- public String getSessionName() {
- return ephemeralType.name() + "_" + url.getHost() + "_" + url.getPort();
- }
-
- public String getEphemralNodeKey() {
- return ConsulURLUtils.ephemralNodePath(url, ephemeralType);
- }
-
- public String getEphemralNodeValue() {
- return url.toFullString();
- }
-
- public static Builder newEphemralNode() {
- return new Builder();
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((ephemeralType == null) ? 0 : ephemeralType.hashCode());
- result = prime * result + ((interval == null) ? 0 : interval.hashCode());
- result = prime * result + ((url == null) ? 0 : url.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- ConsulEphemeralNode other = (ConsulEphemeralNode) obj;
- if (ephemeralType != other.ephemeralType) {
- return false;
- }
- if (interval == null) {
- if (other.interval != null) {
- return false;
- }
- } else if (!interval.equals(other.interval)) {
- return false;
- }
- if (url == null) {
- if (other.url != null) {
- return false;
- }
- } else if (!url.equals(other.url)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "ConsulEphemeralNode [url=" + url + ", interval=" + interval + ", ephemeralType=" + ephemeralType + "]";
- }
-
- public static class Builder extends AbstractBuilder {
-
- private ConsulURL url;
-
- private String interval;
-
- private ThrallRoleType ephemeralType;
-
- public Builder withUrl(ConsulURL url) {
- this.url = url;
- return this;
- }
-
- public Builder withEphemralType(ThrallRoleType ephemeralType) {
- this.ephemeralType = ephemeralType;
- return this;
- }
-
- public Builder withCheckInterval(String interval) {
- this.interval = substituteEnvironmentVariables(interval);
- return this;
- }
-
- public ConsulEphemeralNode build() {
- return new ConsulEphemeralNode(this);
- }
-
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulRouterResp.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulRouterResp.java
deleted file mode 100644
index fa5879010..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulRouterResp.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-/**
- * ConsulRouterResp
- *
- * @author dingpeng
- */
-public final class ConsulRouterResp {
-
- private final String consulRouter;
- private final Long consulIndex;
- private final Boolean consulKnownLeader;
- private final Long consulLastContact;
-
- private ConsulRouterResp(Builder builder) {
- this.consulRouter = builder.consulRouter;
- this.consulIndex = builder.consulIndex;
- this.consulKnownLeader = builder.consulKnownLeader;
- this.consulLastContact = builder.consulLastContact;
- }
-
- public String getConsulRouter() {
- return consulRouter;
- }
-
- public Long getConsulIndex() {
- return consulIndex;
- }
-
- public Boolean getConsulKnownLeader() {
- return consulKnownLeader;
- }
-
- public Long getConsulLastContact() {
- return consulLastContact;
- }
-
- public static Builder newResponse() {
- return new Builder();
- }
-
- public static class Builder extends AbstractBuilder {
-
- private String consulRouter;
- private Long consulIndex;
- private Boolean consulKnownLeader;
- private Long consulLastContact;
-
- public Builder withValue(String routerMessage) {
- this.consulRouter = routerMessage;
- return this;
- }
-
- public Builder withConsulIndex(Long consulIndex) {
- this.consulIndex = consulIndex;
- return this;
- }
-
- public Builder withConsulKnowLeader(Boolean consulKnownLeader) {
- this.consulKnownLeader = consulKnownLeader;
- return this;
- }
-
- public Builder withConsulLastContact(Long consulLastContact) {
- this.consulLastContact = consulLastContact;
- return this;
- }
-
- public ConsulRouterResp build() {
- return new ConsulRouterResp(this);
- }
-
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulService.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulService.java
deleted file mode 100644
index f92e74b6b..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulService.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import com.ecwid.consul.v1.agent.model.NewService;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import static java.util.Collections.unmodifiableList;
-import static java.util.Collections.unmodifiableSet;
-
-public final class ConsulService {
-
- private final String name;
- private final String id;
- private final String address;
- private final Integer port;
- private final Set tags;
- private final String interval;
-
- private ConsulService(Builder builder) {
- this.name = builder.name;
- this.id = builder.id != null ? builder.id : name + ":" + UUID.randomUUID().toString();
- this.address = builder.address;
- this.port = builder.port;
- this.tags = unmodifiableSet(new HashSet(builder.tags));
- this.interval = builder.interval;
- }
-
- public NewService getNewService() {
- NewService consulService = new NewService();
- consulService.setName(this.name);
- consulService.setId(this.id);
- consulService.setAddress(this.address);
- consulService.setPort(this.port);
- consulService.setTags(unmodifiableList(new ArrayList(this.tags)));
- NewService.Check check = new NewService.Check();
- check.setTtl(this.interval + "s");
- check.setDeregisterCriticalServiceAfter("3m");
- consulService.setCheck(check);
- return consulService;
- }
-
- public String getName() {
- return name;
- }
-
- public String getId() {
- return id;
- }
-
- public String getAddress() {
- return address;
- }
-
- public Integer getPort() {
- return port;
- }
-
- public Set getTags() {
- return tags;
- }
-
- public String getInterval() {
- return interval;
- }
-
- public static Builder newService() {
- return new Builder();
- }
-
- @Override
- public String toString() {
- return "Service{" + //
- "name=" + name + //
- ", id=" + id + //
- ", address=" + address + //
- ", port=" + port + //
- ", interval=" + interval + //
- ", tags=" + tags + '}';//
- }
-
- String toConsulRegistrationJson() {
- final StringBuilder builder = new StringBuilder();
- builder.append("{");
- append(builder, "ID", id);
- builder.append(",");
- append(builder, "Name", name);
- builder.append(",");
- if (!tags.isEmpty()) {
- builder.append("\"Tags\":[");
- StringBuilder sb = new StringBuilder(",");
- for (String tag : tags) {
- sb.append("\"").append(tag).append("\"");
- }
- // StringJoiner joiner = new StringJoiner(",");
- // tags.stream().map(t -> "\"" + t + "\"").forEach(newElement -> joiner.add(newElement));
- builder.append(sb.toString());
- builder.append("],");
- }
- if (address != null) {
- append(builder, "Address", address);
- builder.append(",");
- }
- append(builder, "Port", port);
- builder.append(",");
- builder.append("\"Check\":{");
- append(builder, "Interval", interval);
- builder.append("}");
- builder.append("}");
- return builder.toString();
- }
-
- private void append(StringBuilder builder, String key, String value) {
- builder.append("\"");
- builder.append(key);
- builder.append("\":\"");
- builder.append(value);
- builder.append("\"");
- }
-
- private void append(StringBuilder builder, String key, Integer value) {
- builder.append("\"");
- builder.append(key);
- builder.append("\":");
- builder.append(value);
- builder.append("");
- }
-
- public static class Builder extends AbstractBuilder {
-
- private String name;
- private String id;
- private String address;
- private Integer port;
- private Set tags = new HashSet();
- private String interval;
-
- public Builder withName(String name) {
- this.name = substituteEnvironmentVariables(name);
- return this;
- }
-
- public Builder withId(String id) {
- this.id = substituteEnvironmentVariables(id);
- return this;
- }
-
- public Builder withAddress(String address) {
- this.address = substituteEnvironmentVariables(address);
- return this;
- }
-
- public Builder withPort(String port) {
- this.port = Integer.parseInt(substituteEnvironmentVariables(port));
- return this;
- }
-
- public Builder withCheckInterval(String interval) {
- final String value = substituteEnvironmentVariables(interval);
- this.interval = value;
- return this;
- }
-
- public Builder withTag(String tag) {
- tags.add(substituteEnvironmentVariables(tag));
- return this;
- }
-
- public Builder withTags(List tags) {
- for (String tag : tags) {
- this.tags.add(substituteEnvironmentVariables(tag));
- }
- return this;
- }
-
- public ConsulService build() {
- if (name == null) {
- throw new IllegalArgumentException("Required service name is missing");
- }
- if (port == null) {
- throw new IllegalArgumentException("Required port is missing for service " + name);
- }
- return new ConsulService(this);
- }
-
- }
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulServiceResp.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulServiceResp.java
deleted file mode 100644
index 0111ac908..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulServiceResp.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import java.util.List;
-
-public final class ConsulServiceResp {
-
- private final List consulServices;
- private final Long consulIndex;
- private final Boolean consulKnownLeader;
- private final Long consulLastContact;
-
- private ConsulServiceResp(Builder builder) {
- this.consulServices = builder.consulServices;
- this.consulIndex = builder.consulIndex;
- this.consulKnownLeader = builder.consulKnownLeader;
- this.consulLastContact = builder.consulLastContact;
- }
-
- public List getConsulServices() {
- return consulServices;
- }
-
- public Long getConsulIndex() {
- return consulIndex;
- }
-
- public Boolean getConsulKnownLeader() {
- return consulKnownLeader;
- }
-
- public Long getConsulLastContact() {
- return consulLastContact;
- }
-
- public static Builder newResponse() {
- return new Builder();
- }
-
- public static class Builder extends AbstractBuilder {
-
- private List consulServices;
- private Long consulIndex;
- private Boolean consulKnownLeader;
- private Long consulLastContact;
-
- public Builder withValue(List value) {
- this.consulServices = value;
- return this;
- }
-
- public Builder withConsulIndex(Long consulIndex) {
- this.consulIndex = consulIndex;
- return this;
- }
-
- public Builder withConsulKnowLeader(Boolean consulKnownLeader) {
- this.consulKnownLeader = consulKnownLeader;
- return this;
- }
-
- public Builder withConsulLastContact(Long consulLastContact) {
- this.consulLastContact = consulLastContact;
- return this;
- }
-
- public ConsulServiceResp build() {
- return new ConsulServiceResp(this);
- }
-
- }
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulSession.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulSession.java
deleted file mode 100644
index d9c0c8570..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ConsulSession.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-/**
- * sessions to store EphemralNode of Consul
- *
- * @author dingpeng
- */
-public final class ConsulSession {
-
- private String sessionId;
-
- private ConsulEphemeralNode ephemralNode;
-
- public ConsulSession(String sessionId, ConsulEphemeralNode ephemralNode) {
- super();
- this.sessionId = sessionId;
- this.ephemralNode = ephemralNode;
- }
-
- public String getSessionId() {
- return sessionId;
- }
-
- public void setSessionId(String sessionId) {
- this.sessionId = sessionId;
- }
-
- public ConsulEphemeralNode getEphemralNode() {
- return ephemralNode;
- }
-
- public void setEphemralNode(ConsulEphemeralNode ephemralNode) {
- this.ephemralNode = ephemralNode;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((ephemralNode == null) ? 0 : ephemralNode.hashCode());
- result = prime * result + ((sessionId == null) ? 0 : sessionId.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- ConsulSession other = (ConsulSession) obj;
- if (ephemralNode == null) {
- if (other.ephemralNode != null) {
- return false;
- }
- } else if (!ephemralNode.equals(other.ephemralNode)) {
- return false;
- }
- if (sessionId == null) {
- if (other.sessionId != null) {
- return false;
- }
- } else if (!sessionId.equals(other.sessionId)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "ConsulSession [sessionId=" + sessionId + ", ephemralNode=" + ephemralNode + "]";
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/HeartbeatService.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/HeartbeatService.java
deleted file mode 100644
index 39486add3..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/HeartbeatService.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import com.ecwid.consul.v1.agent.model.NewService;
-
-/**
- * ConsulService and NewService
- *
- * @author dingpeng
- */
-public class HeartbeatService {
-
- private ConsulService service;
-
- private NewService newService;
-
- public HeartbeatService(ConsulService service, NewService newService) {
- super();
- this.service = service;
- this.newService = newService;
- }
-
- public ConsulService getService() {
- return service;
- }
-
- public void setService(ConsulService service) {
- this.service = service;
- }
-
- public NewService getNewService() {
- return newService;
- }
-
- public void setNewService(NewService newService) {
- this.newService = newService;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((newService == null) ? 0 : newService.hashCode());
- result = prime * result + ((service == null) ? 0 : service.hashCode());
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- HeartbeatService other = (HeartbeatService) obj;
- if (newService == null) {
- if (other.newService != null) {
- return false;
- }
- } else if (!newService.equals(other.newService)) {
- return false;
- }
- if (service == null) {
- if (other.service != null) {
- return false;
- }
- } else if (!service.equals(other.service)) {
- return false;
- }
- return true;
- }
-
- @Override
- public String toString() {
- return "HeartbeatService [service=" + service + ", newService=" + newService + "]";
- }
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/NotifyConsumerListener.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/NotifyConsumerListener.java
deleted file mode 100644
index 028474ed8..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/NotifyConsumerListener.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURL;
-
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
-/**
- * 消费者通知器
- *
- * @author dingpeng
- */
-public class NotifyConsumerListener implements NotifyListener {
-
- private ConsulURL subscribeUrl;
-
- private AtomicReference> providerUrls;
-
- public NotifyConsumerListener(ConsulURL subscribeUrl, List urls) {
-
- this.subscribeUrl = subscribeUrl;
- this.providerUrls = new AtomicReference>(urls);
-
- }
-
- @Override
- public void notify(ConsulURL subscribeUrl, List urls) {
- this.providerUrls = new AtomicReference>(urls);
- }
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/NotifyListener.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/NotifyListener.java
deleted file mode 100644
index 7eebe5fc3..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/NotifyListener.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import com.alipay.sofa.rpc.registry.consul.common.ConsulURL;
-
-import java.util.List;
-
-/**
- * NotifyListener
- *
- * @author dingpeng
- */
-public interface NotifyListener {
-
- void notify(ConsulURL subscribeUrl, List urls);
-
-}
diff --git a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ThrallRoleType.java b/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ThrallRoleType.java
deleted file mode 100644
index c2427cf76..000000000
--- a/extension-impl/registry-consul/src/main/java/com/alipay/sofa/rpc/registry/consul/model/ThrallRoleType.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-/**
- * Role Type of Thrall
- *
- * @author dingpeng
- */
-public enum ThrallRoleType {
- CONSUMER(0),
-
- PROVIDER(1);
-
- private final int value;
-
- private ThrallRoleType(int value) {
- this.value = value;
- }
-
- public final int getNumber() {
- return value;
- }
-
- public static ThrallRoleType forNumber(Integer value) {
- switch (value) {
- case 0:
- return CONSUMER;
- case 1:
- return PROVIDER;
- default:
- return null;
- }
- }
-}
diff --git a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryTest.java b/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryTest.java
index 846d80b32..0d4e21df0 100644
--- a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryTest.java
+++ b/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulRegistryTest.java
@@ -25,178 +25,227 @@
import com.alipay.sofa.rpc.config.ServerConfig;
import com.alipay.sofa.rpc.listener.ProviderInfoListener;
import com.alipay.sofa.rpc.registry.RegistryFactory;
+import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.Response;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
+import com.ecwid.consul.v1.health.model.HealthService;
import com.pszymczyk.consul.ConsulProcess;
import com.pszymczyk.consul.ConsulStarterBuilder;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
-import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
/**
- * Test of ConsulRegistry
+ * Consul Registry Tests
*
- * @author dingpeng
+ * @author ScienJus
*/
public class ConsulRegistryTest {
- private static ConsulProcess consul;
+ private static final String INTERFACE_ID = "com.alipay.sofa.rpc.registry.consul.TestService";
- private static RegistryConfig registryConfig;
+ private static final String CONSUL_SERVICE_NAME = "test-service";
- private static ConsulRegistry registry;
+ private ConsulProcess consul;
- @BeforeClass
- public static void setup() {
- //language=JSON
- String customConfiguration =
- "{\n" +
- " \"datacenter\": \"dc-test\",\n" +
- " \"log_level\": \"info\"\n" +
- "}\n";
+ private RegistryConfig registryConfig;
+ private ConsulRegistry registry;
+
+ @Before
+ public void setup() {
consul = ConsulStarterBuilder.consulStarter()
- .withConsulVersion("1.2.1")
- .withCustomConfig(customConfiguration)
+ .withConsulVersion("1.4.0")
.build()
.start();
registryConfig = new RegistryConfig()
.setProtocol("consul")
- .setSubscribe(true)
.setAddress("127.0.0.1:" + consul.getHttpPort())
- .setParameter("username", "test")
- .setParameter("interface", "testInterface")
.setRegister(true);
registry = (ConsulRegistry) RegistryFactory.getRegistry(registryConfig);
registry.init();
}
- @AfterClass
- public static void tearDown() throws Exception {
+ @After
+ public void tearDown() {
registry.destroy();
consul.close();
registry = null;
}
@Test
- public void testAll() throws Exception {
+ public void testRegister() {
+ ProviderConfig> providerConfig = providerConfig("consul-test-1", 12200, 12201, 12202);
+ registry.register(providerConfig);
+
+ ConsulClient consulClient = new ConsulClient("localhost:" + consul.getHttpPort());
+ HealthServicesRequest request = HealthServicesRequest.newBuilder().setPassing(true).build();
+ assertUntil(() -> {
+ Response> healthServices = consulClient.getHealthServices(INTERFACE_ID, request);
+ Assert.assertEquals(3, healthServices.getValue().size());
+ }, 10, TimeUnit.SECONDS);
+
+ registry.unRegister(providerConfig);
+
+ assertUntil(() -> {
+ Response> healthServices = consulClient.getHealthServices(INTERFACE_ID, request);
+ Assert.assertEquals(0, healthServices.getValue().size());
+ }, 10, TimeUnit.SECONDS);
+ }
- int timeoutPerSub = 1000;
+ @Test
+ public void testRegisterWithCustomName() {
+ ProviderConfig> providerConfig = providerConfig("consul-test-1", 12200, 12201, 12202);
+ providerConfig.setParameter(ConsulConstants.CONSUL_SERVICE_NAME_KEY, CONSUL_SERVICE_NAME);
+ registry.register(providerConfig);
+
+ ConsulClient consulClient = new ConsulClient("localhost:" + consul.getHttpPort());
+ HealthServicesRequest request = HealthServicesRequest.newBuilder().setPassing(true).build();
+ assertUntil(() -> {
+ Response> healthServices = consulClient.getHealthServices(CONSUL_SERVICE_NAME, request);
+ Assert.assertEquals(3, healthServices.getValue().size());
+ }, 10, TimeUnit.SECONDS);
+
+ registry.unRegister(providerConfig);
+
+ Response> healthServicesAfterUnRegister = consulClient.getHealthServices(INTERFACE_ID, request);
+ assertUntil(() -> {
+ Response> healthServices = consulClient.getHealthServices(CONSUL_SERVICE_NAME, request);
+ Assert.assertEquals(0, healthServicesAfterUnRegister.getValue().size());
+ }, 10, TimeUnit.SECONDS);
+ }
- ServerConfig serverConfig = new ServerConfig()
- .setProtocol("bolt")
- .setHost("localhost")
- .setPort(12200);
+ @Test
+ public void testSubscribe() {
+ ProviderConfig> providerConfig = providerConfig("consul-test-1", 12200, 12201, 12202);
+ registry.register(providerConfig);
- ProviderConfig> provider = new ProviderConfig();
- provider.setInterfaceId("com.alipay.xxx.TestService")
- .setUniqueId("unique123Id")
- .setApplication(new ApplicationConfig().setAppName("test-server"))
- .setProxy("javassist")
- .setRegister(true)
- .setRegistry(registryConfig)
- .setSerialization("hessian2")
- .setServer(serverConfig)
- .setWeight(222)
- .setTimeout(3000);
+ ConsumerConfig> consumerConfig = consumerConfig("consul-test-1");
- // 注册
- registry.register(provider);
+ assertUntil(() -> {
+ List providerGroups = registry.subscribe(consumerConfig);
+ Assert.assertEquals(1, providerGroups.size());
+ Assert.assertEquals(3, providerGroups.get(0).size());
+ }, 10, TimeUnit.SECONDS);
- ConsumerConfig> consumer = new ConsumerConfig();
- consumer.setInterfaceId("com.alipay.xxx.TestService")
- .setUniqueId("unique123Id")
- .setApplication(new ApplicationConfig().setAppName("test-server"))
- .setProxy("javassist")
- .setSubscribe(true)
- .setSerialization("java")
- .setInvokeType("sync")
- .setTimeout(4444);
+ ConsumerConfig> consumerConfigWithAnotherUniqueId = consumerConfig("consul-test-2");
+
+ assertUntil(() -> {
+ List providerGroups = registry.subscribe(consumerConfigWithAnotherUniqueId);
+ Assert.assertEquals(1, providerGroups.size());
+ Assert.assertEquals(0, providerGroups.get(0).size());
+ }, 10, TimeUnit.SECONDS);
+
+ registry.unSubscribe(consumerConfig);
+ registry.unSubscribe(consumerConfigWithAnotherUniqueId);
+ }
+
+ @Test
+ public void testSubscribeNotify() throws InterruptedException {
+ ProviderConfig> providerConfig = providerConfig("consul-test-1", 12200);
+ registry.register(providerConfig);
+
+ ConsumerConfig> consumerConfig = consumerConfig("consul-test-1");
+ MockProviderInfoListener listener = new MockProviderInfoListener();
+ consumerConfig.setProviderInfoListener(listener);
+
+ assertUntil(() -> {
+ List providerGroups = registry.subscribe(consumerConfig);
+ Assert.assertEquals(1, providerGroups.size());
+ Assert.assertEquals(1, providerGroups.get(0).size());
+ }, 10, TimeUnit.SECONDS);
- // 订阅
CountDownLatch latch = new CountDownLatch(1);
- MockProviderInfoListener providerInfoListener = new MockProviderInfoListener();
- providerInfoListener.setCountDownLatch(latch);
- consumer.setProviderInfoListener(providerInfoListener);
- List all = registry.subscribe(consumer);
- providerInfoListener.updateAllProviders(all);
- Map ps = providerInfoListener.getData();
-
- // 订阅 错误的uniqueId
- ConsumerConfig> consumerNoUniqueId = new ConsumerConfig();
- consumerNoUniqueId.setInterfaceId("com.alipay.xxx.TestService")
- .setApplication(new ApplicationConfig().setAppName("test-server"))
- .setProxy("javassist")
- .setSubscribe(true)
- .setSerialization("java")
- .setInvokeType("sync")
- .setTimeout(4444);
- latch = new CountDownLatch(1);
- providerInfoListener.setCountDownLatch(latch);
- consumerNoUniqueId.setProviderInfoListener(providerInfoListener);
- all = registry.subscribe(consumerNoUniqueId);
- providerInfoListener.updateAllProviders(all);
- ps = providerInfoListener.getData();
+ listener.setCountDownLatch(latch);
+
+ providerConfig = providerConfig("consul-test-1", 12201, 12202);
+ registry.register(providerConfig);
+
+ boolean ok = latch.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(ok);
+
+ assertUntil(() -> {
+ Map providers = listener.getData();
+ Assert.assertEquals(3, providers.size());
+ }, 10, TimeUnit.SECONDS);
- // 反注册
latch = new CountDownLatch(1);
- providerInfoListener.setCountDownLatch(latch);
- registry.unRegister(provider);
- latch.await(timeoutPerSub, TimeUnit.MILLISECONDS);
-
- // 一次发2个端口的再次注册
- latch = new CountDownLatch(2);
- providerInfoListener.setCountDownLatch(latch);
- provider.getServer().add(new ServerConfig()
- .setProtocol("bolt")
- .setHost("0.0.0.0")
- .setPort(12201));
- registry.register(provider);
- latch.await(timeoutPerSub * 2, TimeUnit.MILLISECONDS);
-
- // 重复订阅
- ConsumerConfig> consumer2 = new ConsumerConfig();
- consumer2.setInterfaceId("com.alipay.xxx.TestService")
- .setUniqueId("unique123Id")
- .setApplication(new ApplicationConfig().setAppName("test-server"))
+ listener.setCountDownLatch(latch);
+
+ registry.unRegister(providerConfig);
+
+ ok = latch.await(10, TimeUnit.SECONDS);
+ Assert.assertTrue(ok);
+
+ assertUntil(() -> {
+ Map providers = listener.getData();
+ Assert.assertEquals(1, providers.size());
+ }, 10, TimeUnit.SECONDS);
+ }
+
+ private ConsumerConfig> consumerConfig(String uniqueId) {
+ ConsumerConfig> consumer = new ConsumerConfig();
+ consumer.setInterfaceId(INTERFACE_ID)
+ .setUniqueId(uniqueId)
+ .setApplication(new ApplicationConfig().setAppName("consul-registry-test"))
.setProxy("javassist")
.setSubscribe(true)
.setSerialization("java")
.setInvokeType("sync")
.setTimeout(4444);
- CountDownLatch latch2 = new CountDownLatch(1);
- MockProviderInfoListener providerInfoListener2 = new MockProviderInfoListener();
- providerInfoListener2.setCountDownLatch(latch2);
- consumer2.setProviderInfoListener(providerInfoListener2);
- providerInfoListener2.updateAllProviders(registry.subscribe(consumer2));
- latch2.await(timeoutPerSub, TimeUnit.MILLISECONDS);
-
- Map ps2 = providerInfoListener2.getData();
-
- // 取消订阅者1
- registry.unSubscribe(consumer);
- // 批量反注册,判断订阅者2的数据
- latch = new CountDownLatch(2);
- providerInfoListener2.setCountDownLatch(latch);
- List providerConfigList = new ArrayList();
- providerConfigList.add(provider);
- registry.batchUnRegister(providerConfigList);
-
- latch.await(timeoutPerSub * 2, TimeUnit.MILLISECONDS);
+ return consumer;
+ }
- // 批量取消订阅
- List consumerConfigList = new ArrayList();
- consumerConfigList.add(consumer2);
- registry.batchUnSubscribe(consumerConfigList);
+ private ProviderConfig> providerConfig(String uniqueId, int... ports) {
+ ProviderConfig> provider = new ProviderConfig();
+ provider.setInterfaceId(INTERFACE_ID)
+ .setUniqueId(uniqueId)
+ .setApplication(new ApplicationConfig().setAppName("consul-registry-test"))
+ .setProxy("javassist")
+ .setRegister(true)
+ .setRegistry(registryConfig)
+ .setSerialization("hessian2")
+ .setWeight(222)
+ .setTimeout(3000);
+
+ IntStream.of(ports)
+ .mapToObj(port ->
+ new ServerConfig()
+ .setProtocol("bolt")
+ .setHost("localhost")
+ .setPort(port)
+ ).forEach(provider::setServer);
+ return provider;
+ }
+ private void assertUntil(Runnable f, long time, TimeUnit unit) {
+ long until = System.currentTimeMillis() + unit.toMillis(time);
+ while (true) {
+ try {
+ f.run();
+ return;
+ } catch (AssertionError e) {
+ if (until < System.currentTimeMillis()) {
+ throw e;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e1) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
private static class MockProviderInfoListener implements ProviderInfoListener {
diff --git a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/ConsulServiceTest.java b/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulUtilsTest.java
similarity index 53%
rename from extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/ConsulServiceTest.java
rename to extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulUtilsTest.java
index fcf3e5cd9..f157d4c38 100644
--- a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/ConsulServiceTest.java
+++ b/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/ConsulUtilsTest.java
@@ -14,26 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.alipay.sofa.rpc.registry.consul.model;
+package com.alipay.sofa.rpc.registry.consul;
import org.junit.Assert;
import org.junit.Test;
/**
- * @author bystander
- * @version $Id: ConsulServiceTest.java, v 0.1 2018年12月10日 20:24 bystander Exp $
+ * @author ScienJus
*/
-public class ConsulServiceTest {
+public class ConsulUtilsTest {
@Test
- public void testJson() {
- ConsulService consulService = ConsulService.newService()
- .withName("service")
- .withPort("102")
- .withAddress("aa")
- .withCheckInterval("3")
- .build();
- String json = consulService.toConsulRegistrationJson();
- Assert.assertTrue(json.contains("102"));
+ public void testMetaKey() {
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("tags"));
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("TAGS"));
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("TAGS1"));
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("TAGS-1"));
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("consul-tags"));
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("consul_tags"));
+ Assert.assertTrue(ConsulUtils.isValidMetaKey("consul_-_tags"));
+
+ Assert.assertFalse(ConsulUtils.isValidMetaKey("consul.tags"));
+ Assert.assertFalse(ConsulUtils.isValidMetaKey("consul:tags"));
}
}
\ No newline at end of file
diff --git a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/HeartbeatServiceTest.java b/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/HeartbeatServiceTest.java
deleted file mode 100644
index 6024d5e88..000000000
--- a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/HeartbeatServiceTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * @author bystander
- * @version $Id: HeartbeatServiceTest.java, v 0.1 2018年12月10日 20:24 bystander Exp $
- */
-public class HeartbeatServiceTest {
-
- @Test
- public void test() {
- HeartbeatService heartbeatService1 = new HeartbeatService(null, null);
- HeartbeatService heartbeatService2 = new HeartbeatService(null, null);
- Assert.assertEquals(heartbeatService1, heartbeatService2);
- }
-
-}
\ No newline at end of file
diff --git a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/ThrallRoleTypeTest.java b/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/ThrallRoleTypeTest.java
deleted file mode 100644
index 107c6c363..000000000
--- a/extension-impl/registry-consul/src/test/java/com/alipay/sofa/rpc/registry/consul/model/ThrallRoleTypeTest.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.alipay.sofa.rpc.registry.consul.model;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * @author bystander
- * @version $Id: ThrallRoleTypeTest.java, v 0.1 2018年12月10日 20:24 bystander Exp $
- */
-public class ThrallRoleTypeTest {
-
- @Test
- public void testConvert() {
- ThrallRoleType type = ThrallRoleType.forNumber(ThrallRoleType.CONSUMER.getNumber());
- Assert.assertEquals(type, ThrallRoleType.CONSUMER);
- type = ThrallRoleType.forNumber(ThrallRoleType.PROVIDER.getNumber());
- Assert.assertEquals(type, ThrallRoleType.PROVIDER);
- }
-}
\ No newline at end of file