Skip to content

Commit

Permalink
Optimize service discovery and metadata service mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
chickenlj committed Oct 22, 2019
1 parent 3e78bd8 commit 19ab541
Show file tree
Hide file tree
Showing 18 changed files with 184 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import org.apache.dubbo.metadata.report.MetadataReportInstance;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.support.AbstractRegistryFactory;
import org.apache.dubbo.rpc.Exporter;
Expand Down Expand Up @@ -115,6 +116,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
Expand Down Expand Up @@ -682,11 +684,12 @@ private void initEventListener() {
}

private List<ServiceDiscovery> getServiceDiscoveries() {
return Collections.unmodifiableList(
ExtensionLoader
.getExtensionLoader(ServiceDiscovery.class)
.getLoadedExtensionInstances()
);
return AbstractRegistryFactory.getRegistries()
.stream()
.filter(registry -> registry instanceof ServiceDiscoveryRegistry)
.map(registry -> (ServiceDiscoveryRegistry) registry)
.map(ServiceDiscoveryRegistry::getServiceDiscovery)
.collect(Collectors.toList());
}

/**
Expand Down Expand Up @@ -933,6 +936,9 @@ private void unreferServices() {
}

private void registerServiceInstance() {
if (CollectionUtils.isEmpty(getServiceDiscoveries())) {
return;
}

ApplicationConfig application = getApplication();

Expand All @@ -957,6 +963,9 @@ private URL selectMetadataServiceExportedURL() {

for (String urlValue : urlValues) {
URL url = URL.valueOf(urlValue);
if (MetadataService.class.getName().equals(url.getServiceInterface())) {
continue;
}
if ("rest".equals(url.getProtocol())) { // REST first
selectedURL = url;
break;
Expand All @@ -965,6 +974,10 @@ private URL selectMetadataServiceExportedURL() {
}
}

if (selectedURL == null && CollectionUtils.isNotEmpty(urlValues)) {
selectedURL = URL.valueOf(urlValues.iterator().next());
}

return selectedURL;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.ArrayList;
import java.util.List;

import static java.util.Arrays.asList;
import static java.util.Collections.emptyList;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;

Expand Down Expand Up @@ -76,7 +75,7 @@ public ConfigurableMetadataServiceExporter export() {
ServiceConfig<MetadataService> serviceConfig = new ServiceConfig<>();
serviceConfig.setApplication(getApplicationConfig());
serviceConfig.setRegistries(getRegistries());
serviceConfig.setProtocols(getProtocols());
serviceConfig.setProtocol(generateMetadataProtocol());
serviceConfig.setInterface(MetadataService.class);
serviceConfig.setRef(metadataService);
serviceConfig.setGroup(getApplicationConfig().getName());
Expand Down Expand Up @@ -127,13 +126,10 @@ private List<RegistryConfig> getRegistries() {
return new ArrayList<>(ApplicationModel.getConfigManager().getRegistries());
}

private List<ProtocolConfig> getProtocols() {
return asList(getDefaultProtocol());
}

private ProtocolConfig getDefaultProtocol() {
private ProtocolConfig generateMetadataProtocol() {
ProtocolConfig defaultProtocol = new ProtocolConfig();
defaultProtocol.setName(DUBBO);
// defaultProtocol.setHost() ?
// auto-increment port
defaultProtocol.setPort(-1);
return defaultProtocol;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,16 @@
*/
package org.apache.dubbo.config.metadata;

import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.config.ProtocolConfig;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstanceCustomizer;
import org.apache.dubbo.rpc.model.ApplicationModel;

import java.util.Collection;
import java.util.stream.Stream;

/**
* The {@link ServiceInstanceCustomizer} to customize the {@link ServiceInstance#getPort() port} of service instance.
*
Expand All @@ -35,17 +40,25 @@ public void customize(ServiceInstance serviceInstance) {
return;
}

ApplicationModel.getConfigManager()
.getProtocols()
.stream()
Collection<ProtocolConfig> protocols = ApplicationModel.getConfigManager()
.getProtocols();

if (CollectionUtils.isEmpty(protocols)) {
throw new IllegalStateException("We should have at least one protocol configured at this point.");
}

Stream<ProtocolConfig> protocolStream = protocols.stream();
ProtocolConfig protocolConfig = protocolStream
// use rest as service instance's default protocol.
.filter(protocol -> "rest".equals(protocol.getName()))
.findFirst()
.ifPresent(protocolConfig -> {
if (serviceInstance instanceof DefaultServiceInstance) {
DefaultServiceInstance instance = (DefaultServiceInstance) serviceInstance;
if (protocolConfig.getPort() != null) {
instance.setPort(protocolConfig.getPort());
}
}
});
.orElseGet(() -> protocolStream.findFirst().get());

if (serviceInstance instanceof DefaultServiceInstance) {
DefaultServiceInstance instance = (DefaultServiceInstance) serviceInstance;
if (protocolConfig.getPort() != null) {
instance.setPort(protocolConfig.getPort());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ private InvocationHandler buildInvocationHandler(String referencedBeanName, Refe
localReferenceBeanInvocationHandlerCache.put(referencedBeanName, handler);
} else {
// Remote Reference Bean should initialize immediately
handler.init();
// FIXME
// handler.init();
}

return handler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ boolean addURL(Map<String, SortedSet<URL>> serviceURLs, URL url) {
boolean removeURL(Map<String, SortedSet<URL>> serviceURLs, URL url) {
return executeMutually(() -> {
String key = url.getServiceKey();
SortedSet<URL> urls = serviceURLs.getOrDefault(key, emptySortedSet());
SortedSet<URL> urls = serviceURLs.getOrDefault(key, null);
if (urls == null) {
return true;
}
boolean r = urls.remove(url);
// if it is empty
if (urls.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ public ServiceDiscoveryRegistry(URL registryURL) {
this.subscribedURLsSynthesizers = initSubscribedURLsSynthesizers();
}

public ServiceDiscovery getServiceDiscovery() {
return serviceDiscovery;
}

/**
* Get the subscribed services from the specified registry {@link URL url}
*
Expand Down Expand Up @@ -744,7 +748,7 @@ private List<URL> getExportedURLs(ServiceInstance providerServiceInstance) {
exportedURLs = toURLs(urls);
} catch (Throwable e) {
if (logger.isErrorEnabled()) {
logger.error(format("It's failed to get the exported URLs from the target service instance[%s]",
logger.error(format("Failed to get the exported URLs from the target service instance[%s]",
providerServiceInstance), e);
}
exportedURLs = null; // set the result to be null if failed to get
Expand Down Expand Up @@ -792,6 +796,12 @@ protected Set<String> getServices(URL subscribedURL) {
if (isEmpty(serviceNames)) {
serviceNames = findMappedServices(subscribedURL);
}
if (isEmpty(serviceNames)) {
throw new IllegalStateException(String.format("Could not resolve interface %s for its application (service), " +
"you should either specify service name explicitly or make sure there's at least one active provider " +
" instance and it has registered the service-app mapping info to config center automatically. \n" +
"The full subscribing url is %s", subscribedURL.getServiceInterface(), subscribedURL));
}
return serviceNames;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,9 @@
import org.apache.dubbo.registry.client.ServiceInstanceCustomizer;
import org.apache.dubbo.registry.client.event.ServiceInstancePreRegisteredEvent;

import java.util.ServiceLoader;

/**
* An {@link EventListener event listener} to customize {@link ServiceInstance the service instance} by the instances of
* {@link ServiceInstanceCustomizer} {@link ServiceLoader SPI}.
* Customize the {@link ServiceInstance} before registering to Registry.
*
* @see EventListener
* @see ServiceInstancePreRegisteredEvent
* @see ServiceInstanceCustomizer
* @since 2.7.4
*/
public class CustomizableServiceInstanceListener implements EventListener<ServiceInstancePreRegisteredEvent> {
Expand All @@ -39,6 +33,7 @@ public class CustomizableServiceInstanceListener implements EventListener<Servic
public void onEvent(ServiceInstancePreRegisteredEvent event) {
ExtensionLoader<ServiceInstanceCustomizer> loader =
ExtensionLoader.getExtensionLoader(ServiceInstanceCustomizer.class);
// FIXME, sort customizer before apply
loader.getSupportedExtensionInstances().forEach(customizer -> {
// customizes
customizer.customize(event.getServiceInstance());
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,13 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.registry.client.ServiceInstance;

import java.util.List;
import java.util.ServiceLoader;

/**
* The builder interface of {@link MetadataService} to build {@link URL URLs}, the multiple implementations
* will be loaded by Java standard {@link ServiceLoader} and {@link #composite() composited},
* whose building {@link URL URLs} will be aggregated
* Used to build metadata service url from ServiceInstance.
*
* @see CompositeMetadataServiceURLBuilder
* @since 2.7.4
*/
@SPI
Expand All @@ -39,17 +34,7 @@ public interface MetadataServiceURLBuilder {
* Build the {@link URL URLs} from the specified {@link ServiceInstance}
*
* @param serviceInstance {@link ServiceInstance}
* @return non-null
* @return TODO, usually, we generate one metadata url from one instance. There's no scenario to return a metadta url list.
*/
List<URL> build(ServiceInstance serviceInstance);

/**
* Get the composite implementation of {@link MetadataServiceURLBuilder}
*
* @return the instance of {@link CompositeMetadataServiceURLBuilder}
* @see CompositeMetadataServiceURLBuilder
*/
static MetadataServiceURLBuilder composite() {
return new CompositeMetadataServiceURLBuilder();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
package org.apache.dubbo.registry.client.metadata;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstanceCustomizer;
import org.apache.dubbo.rpc.Protocol;

import java.util.HashMap;
import java.util.Map;

import static org.apache.dubbo.metadata.WritableMetadataService.getExtension;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getMetadataStorageType;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.setProtocolPort;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.setEndpoints;

/**
* A Class to customize the ports of {@link Protocol protocols} into
Expand All @@ -41,11 +45,16 @@ public void customize(ServiceInstance serviceInstance) {

WritableMetadataService writableMetadataService = getExtension(metadataStoredType);

Map<String, Integer> protocols = new HashMap<>();
writableMetadataService.getExportedURLs()
.stream()
.map(URL::valueOf)
.filter(url -> !MetadataService.class.getName().equals(url.getServiceInterface()))
.forEach(url -> {
setProtocolPort(serviceInstance, url.getProtocol(), url.getPort());
// TODO, same protocol listen on different ports will override with each other.
protocols.put(url.getProtocol(), url.getPort());
});

setEndpoints(serviceInstance, protocols);
}
}
Loading

0 comments on commit 19ab541

Please sign in to comment.