Skip to content

Commit

Permalink
Merge branch 'apache-3.1' into apache-3.2
Browse files Browse the repository at this point in the history
# Conflicts:
#	dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
  • Loading branch information
AlbumenJ committed Apr 10, 2023
2 parents 8efd069 + e3e7ce8 commit 1d28205
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import static org.apache.dubbo.common.constants.CommonConstants.REGISTRY_LOCAL_FILE_CACHE_ENABLED;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.INTERNAL_ERROR;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_FETCH_INSTANCE;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_FAILED_LOAD_METADATA;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_CLUSTER_KEY;
import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
Expand Down Expand Up @@ -99,32 +98,32 @@ private AbstractServiceDiscovery(ApplicationModel applicationModel, String servi
this.metadataInfo = new MetadataInfo(serviceName);
boolean localCacheEnabled = registryURL.getParameter(REGISTRY_LOCAL_FILE_CACHE_ENABLED, true);
this.metaCacheManager = new MetaCacheManager(localCacheEnabled, getCacheNameSuffix(),
applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getCacheRefreshingScheduledExecutor());
int metadataInfoCacheExpireTime = registryURL.getParameter(METADATA_INFO_CACHE_EXPIRE_KEY, DEFAULT_METADATA_INFO_CACHE_EXPIRE);
int metadataInfoCacheSize = registryURL.getParameter(METADATA_INFO_CACHE_SIZE_KEY, DEFAULT_METADATA_INFO_CACHE_SIZE);
this.refreshCacheFuture = applicationModel.getFrameworkModel().getBeanFactory()
.getBean(FrameworkExecutorRepository.class).getSharedScheduledExecutor()
.scheduleAtFixedRate(() -> {
try {
while (metadataInfos.size() > metadataInfoCacheSize) {
AtomicReference<String> oldestRevision = new AtomicReference<>();
AtomicReference<MetadataInfoStat> oldestStat = new AtomicReference<>();
metadataInfos.forEach((k, v) -> {
if (System.currentTimeMillis() - v.getUpdateTime() > metadataInfoCacheExpireTime &&
(oldestStat.get() == null || oldestStat.get().getUpdateTime() > v.getUpdateTime())) {
oldestRevision.set(k);
oldestStat.set(v);
.getBean(FrameworkExecutorRepository.class).getSharedScheduledExecutor()
.scheduleAtFixedRate(() -> {
try {
while (metadataInfos.size() > metadataInfoCacheSize) {
AtomicReference<String> oldestRevision = new AtomicReference<>();
AtomicReference<MetadataInfoStat> oldestStat = new AtomicReference<>();
metadataInfos.forEach((k, v) -> {
if (System.currentTimeMillis() - v.getUpdateTime() > metadataInfoCacheExpireTime &&
(oldestStat.get() == null || oldestStat.get().getUpdateTime() > v.getUpdateTime())) {
oldestRevision.set(k);
oldestStat.set(v);
}
});
if (oldestStat.get() != null) {
metadataInfos.remove(oldestRevision.get(), oldestStat.get());
}
});
if (oldestStat.get() != null) {
metadataInfos.remove(oldestRevision.get(), oldestStat.get());
}
} catch (Throwable t) {
logger.error(INTERNAL_ERROR, "", "", "Error occurred when clean up metadata info cache.", t);
}
} catch (Throwable t) {
logger.error(INTERNAL_ERROR, "", "", "Error occurred when clean up metadata info cache.", t);
}
}, metadataInfoCacheExpireTime / 2, metadataInfoCacheExpireTime / 2, TimeUnit.MILLISECONDS);
}, metadataInfoCacheExpireTime / 2, metadataInfoCacheExpireTime / 2, TimeUnit.MILLISECONDS);
}


Expand All @@ -133,12 +132,11 @@ public synchronized void register() throws RuntimeException {
if (isDestroy) {
return;
}
this.serviceInstance = createServiceInstance(this.metadataInfo);
if (!isValidInstance(this.serviceInstance)) {
logger.warn(REGISTRY_FAILED_FETCH_INSTANCE, "", "", "No valid instance found, stop registering instance address to registry.");
ServiceInstance serviceInstance = createServiceInstance(this.metadataInfo);
if (!isValidInstance(serviceInstance)) {
return;
}

this.serviceInstance = serviceInstance;
boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
if (revisionUpdated) {
reportMetadata(this.metadataInfo);
Expand All @@ -158,15 +156,12 @@ public synchronized void update() throws RuntimeException {
}

if (this.serviceInstance == null) {
this.serviceInstance = createServiceInstance(this.metadataInfo);
} else if (!isValidInstance(this.serviceInstance)) {
ServiceInstanceMetadataUtils.customizeInstance(this.serviceInstance, this.applicationModel);
register();
}

if (!isValidInstance(this.serviceInstance)) {
return;
}

ServiceInstance oldServiceInstance = this.serviceInstance;
DefaultServiceInstance newServiceInstance = new DefaultServiceInstance((DefaultServiceInstance) oldServiceInstance);
boolean revisionUpdated = calOrUpdateInstanceRevision(newServiceInstance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_NACOS_EXCEPTION;
import static org.apache.dubbo.common.function.ThrowableConsumer.execute;
import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
import static org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils.createNamingService;
import static org.apache.dubbo.registry.nacos.util.NacosNamingServiceUtils.getGroup;
Expand Down Expand Up @@ -78,7 +79,7 @@ public NacosServiceDiscovery(ApplicationModel applicationModel, URL registryURL)
this.namingService = createNamingService(registryURL);
// backward compatibility for 3.0.x
this.group = Boolean.parseBoolean(ConfigurationUtils.getProperty(applicationModel, NACOS_SD_USE_DEFAULT_GROUP_KEY, "false")) ?
DEFAULT_GROUP : getGroup(registryURL);
DEFAULT_GROUP : getGroup(registryURL);
}

@Override
Expand All @@ -105,13 +106,14 @@ public void doUnregister(ServiceInstance serviceInstance) throws RuntimeExceptio

@Override
protected void doUpdate(ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) throws RuntimeException {
if (EMPTY_REVISION.equals(getExportedServicesRevision(newServiceInstance))) {
if (EMPTY_REVISION.equals(getExportedServicesRevision(newServiceInstance))
|| EMPTY_REVISION.equals(oldServiceInstance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME))) {
super.doUpdate(oldServiceInstance, newServiceInstance);
return;
}

if (!Objects.equals(newServiceInstance.getHost(), oldServiceInstance.getHost()) ||
!Objects.equals(newServiceInstance.getPort(), oldServiceInstance.getPort())) {
!Objects.equals(newServiceInstance.getPort(), oldServiceInstance.getPort())) {
// Ignore if id changed. Should unregister first.
super.doUpdate(oldServiceInstance, newServiceInstance);
return;
Expand Down Expand Up @@ -143,15 +145,15 @@ public Set<String> getServices() {
@Override
public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException {
return ThrowableFunction.execute(namingService, service ->
service.selectInstances(serviceName, group, true)
.stream().map((i) -> NacosNamingServiceUtils.toServiceInstance(registryURL, i))
.collect(Collectors.toList())
service.selectInstances(serviceName, group, true)
.stream().map((i) -> NacosNamingServiceUtils.toServiceInstance(registryURL, i))
.collect(Collectors.toList())
);
}

@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
throws NullPointerException, IllegalArgumentException {
// check if listener has already been added through another interface/service
if (!instanceListeners.add(listener)) {
return;
Expand Down Expand Up @@ -228,9 +230,9 @@ public URL getUrl() {
private void handleEvent(NamingEvent event, ServiceInstancesChangedListener listener) {
String serviceName = event.getServiceName();
List<ServiceInstance> serviceInstances = event.getInstances()
.stream()
.map((i) -> NacosNamingServiceUtils.toServiceInstance(registryURL, i))
.collect(Collectors.toList());
.stream()
.map((i) -> NacosNamingServiceUtils.toServiceInstance(registryURL, i))
.collect(Collectors.toList());
listener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.x.discovery.ServiceCache;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.function.ThrowableConsumer;
import org.apache.dubbo.common.function.ThrowableFunction;
Expand All @@ -44,6 +45,7 @@
import static org.apache.dubbo.common.constants.LoggerCodeConstants.REGISTRY_ZOOKEEPER_EXCEPTION;
import static org.apache.dubbo.common.function.ThrowableFunction.execute;
import static org.apache.dubbo.metadata.RevisionResolver.EMPTY_REVISION;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework;
Expand Down Expand Up @@ -111,15 +113,16 @@ public void doUnregister(ServiceInstance serviceInstance) throws RuntimeExceptio

@Override
protected void doUpdate(ServiceInstance oldServiceInstance, ServiceInstance newServiceInstance) throws RuntimeException {
if (EMPTY_REVISION.equals(getExportedServicesRevision(newServiceInstance))) {
if (EMPTY_REVISION.equals(getExportedServicesRevision(newServiceInstance))
|| EMPTY_REVISION.equals(oldServiceInstance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME))) {
super.doUpdate(oldServiceInstance, newServiceInstance);
return;
}

org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> oldInstance = build(oldServiceInstance);
org.apache.curator.x.discovery.ServiceInstance<ZookeeperInstance> newInstance = build(newServiceInstance);
if (!Objects.equals(newInstance.getName(), oldInstance.getName()) ||
!Objects.equals(newInstance.getId(), oldInstance.getId())) {
!Objects.equals(newInstance.getId(), oldInstance.getId())) {
// Ignore if id changed. Should unregister first.
super.doUpdate(oldServiceInstance, newServiceInstance);
return;
Expand All @@ -146,7 +149,7 @@ public List<ServiceInstance> getInstances(String serviceName) throws NullPointer

@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
throws NullPointerException, IllegalArgumentException {
// check if listener has already been added through another interface/service
if (!instanceListeners.add(listener)) {
return;
Expand All @@ -169,7 +172,7 @@ public void removeServiceInstancesChangedListener(ServiceInstancesChangedListene
watcher.getCacheInstance().close();
} catch (IOException e) {
logger.error(REGISTRY_ZOOKEEPER_EXCEPTION, "curator stop watch failed", "",
"Curator Stop service discovery watch failed. Service Name: " + serviceName);
"Curator Stop service discovery watch failed. Service Name: " + serviceName);
}
}
}
Expand All @@ -190,8 +193,8 @@ protected void registerServiceWatcher(String serviceName, ServiceInstancesChange

ZookeeperServiceDiscoveryChangeWatcher watcher = watcherCaches.computeIfAbsent(serviceName, name -> {
ServiceCache<ZookeeperInstance> serviceCache = serviceDiscovery.serviceCacheBuilder()
.name(name)
.build();
.name(name)
.build();
ZookeeperServiceDiscoveryChangeWatcher newer = new ZookeeperServiceDiscoveryChangeWatcher(this, serviceCache, name, latch);
serviceCache.addListener(newer);

Expand Down

0 comments on commit 1d28205

Please sign in to comment.