From f2b158862cc6525e83ca5c0eb007ccf95b52d677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=90=B4=E6=99=9F=20Wu=20Sheng?= Date: Thu, 6 Dec 2018 14:40:29 +0800 Subject: [PATCH] [OAL refactor step 1]Refactor dispatcher manager (#2006) * Remove the hard codes about DispatcherManager. * Add comments. --- .../oap/server/core/CoreModuleProvider.java | 65 ++++++++--- .../core/analysis/DispatcherManager.java | 104 ++++++++++++------ .../core/source/SourceReceiverImpl.java | 5 + 3 files changed, 125 insertions(+), 49 deletions(-) diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java index 20ea0bcbf968..bfdf48cfa300 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java @@ -22,27 +22,60 @@ import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener; import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener; import org.apache.skywalking.oap.server.core.annotation.AnnotationScan; -import org.apache.skywalking.oap.server.core.cache.*; -import org.apache.skywalking.oap.server.core.cluster.*; -import org.apache.skywalking.oap.server.core.config.*; -import org.apache.skywalking.oap.server.core.query.*; +import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer; +import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache; +import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache; +import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache; +import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache; +import org.apache.skywalking.oap.server.core.cluster.ClusterModule; +import org.apache.skywalking.oap.server.core.cluster.ClusterRegister; +import org.apache.skywalking.oap.server.core.cluster.RemoteInstance; +import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService; +import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService; +import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService; +import org.apache.skywalking.oap.server.core.query.AggregationQueryService; +import org.apache.skywalking.oap.server.core.query.AlarmQueryService; +import org.apache.skywalking.oap.server.core.query.MetadataQueryService; +import org.apache.skywalking.oap.server.core.query.MetricQueryService; +import org.apache.skywalking.oap.server.core.query.TopologyQueryService; +import org.apache.skywalking.oap.server.core.query.TraceQueryService; import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener; -import org.apache.skywalking.oap.server.core.register.service.*; -import org.apache.skywalking.oap.server.core.remote.*; -import org.apache.skywalking.oap.server.core.remote.annotation.*; -import org.apache.skywalking.oap.server.core.remote.client.*; -import org.apache.skywalking.oap.server.core.server.*; -import org.apache.skywalking.oap.server.core.source.*; +import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister; +import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister; +import org.apache.skywalking.oap.server.core.remote.RemoteSenderService; +import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer; +import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter; +import org.apache.skywalking.oap.server.core.remote.client.Address; +import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister; +import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister; +import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl; +import org.apache.skywalking.oap.server.core.source.SourceReceiver; +import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl; import org.apache.skywalking.oap.server.core.storage.PersistenceTimer; import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener; import org.apache.skywalking.oap.server.core.storage.model.IModelGetter; import org.apache.skywalking.oap.server.core.storage.model.IModelOverride; import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer; -import org.apache.skywalking.oap.server.library.module.*; +import org.apache.skywalking.oap.server.library.module.ModuleConfig; +import org.apache.skywalking.oap.server.library.module.ModuleDefine; +import org.apache.skywalking.oap.server.library.module.ModuleProvider; +import org.apache.skywalking.oap.server.library.module.ModuleStartException; +import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException; import org.apache.skywalking.oap.server.library.server.ServerException; import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer; import org.apache.skywalking.oap.server.library.server.jetty.JettyServer; -import org.slf4j.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * @author peng-yongsheng @@ -59,6 +92,7 @@ public class CoreModuleProvider extends ModuleProvider { private final StorageAnnotationListener storageAnnotationListener; private final StreamAnnotationListener streamAnnotationListener; private final StreamDataAnnotationContainer streamDataAnnotationContainer; + private final SourceReceiverImpl receiver; public CoreModuleProvider() { super(); @@ -67,6 +101,7 @@ public CoreModuleProvider() { this.storageAnnotationListener = new StorageAnnotationListener(); this.streamAnnotationListener = new StreamAnnotationListener(); this.streamDataAnnotationContainer = new StreamDataAnnotationContainer(); + receiver = new SourceReceiverImpl(); } @Override public String name() { @@ -101,7 +136,7 @@ public CoreModuleProvider() { this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService()); - this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl()); + this.registerServiceImplementation(SourceReceiver.class, receiver); this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer); @@ -143,10 +178,12 @@ public CoreModuleProvider() { remoteClientManager.start(); try { + receiver.scan(); + annotationScan.scan(() -> { streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses()); }); - } catch (IOException e) { + } catch (IOException | IllegalAccessException | InstantiationException e) { throw new ModuleStartException(e.getMessage(), e); } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java index 5e55a75b723e..46977923313f 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/DispatcherManager.java @@ -18,57 +18,91 @@ package org.apache.skywalking.oap.server.core.analysis; -import java.util.*; -import org.apache.skywalking.oap.server.core.analysis.generated.all.AllDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.endpoint.EndpointDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.endpointrelation.EndpointRelationDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.service.ServiceDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstance.ServiceInstanceDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmcpu.ServiceInstanceJVMCPUDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmgc.ServiceInstanceJVMGCDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemory.ServiceInstanceJVMMemoryDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher; -import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher; -import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher; -import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher; -import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher; -import org.apache.skywalking.oap.server.core.source.*; -import org.slf4j.*; +import com.google.common.collect.ImmutableSet; +import com.google.common.reflect.ClassPath; +import java.io.IOException; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.skywalking.oap.server.core.UnexpectedException; +import org.apache.skywalking.oap.server.core.source.Scope; +import org.apache.skywalking.oap.server.core.source.Source; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * @author peng-yongsheng + * @author peng-yongsheng, wusheng */ public class DispatcherManager { private static final Logger logger = LoggerFactory.getLogger(DispatcherManager.class); - private Map dispatcherMap; + private Map> dispatcherMap; public DispatcherManager() { this.dispatcherMap = new HashMap<>(); + } - this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()}); + public void forward(Source source) { + for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) { + dispatcher.dispatch(source); + } + } - this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()}); + /** + * Scan all classes under `org.apache.skywalking` package, + * + * If it implement {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}, then, it will be added + * into this DispatcherManager based on the Source definition. + * + * @throws IOException + * @throws IllegalAccessException + * @throws InstantiationException + */ + public void scan() throws IOException, IllegalAccessException, InstantiationException { + ClassPath classpath = ClassPath.from(this.getClass().getClassLoader()); + ImmutableSet classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking"); + for (ClassPath.ClassInfo classInfo : classes) { + Class aClass = classInfo.load(); - this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()}); - this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()}); - this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()}); + if (!aClass.isInterface() && SourceDispatcher.class.isAssignableFrom(aClass)) { + Type[] genericInterfaces = aClass.getGenericInterfaces(); + for (Type genericInterface : genericInterfaces) { + ParameterizedType anInterface = (ParameterizedType)genericInterface; + if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) { + Type[] arguments = anInterface.getActualTypeArguments(); - this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()}); - this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()}); - this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()}); + if (arguments.length != 1) { + throw new UnexpectedException("unexpected type argument number, class " + aClass.getName()); + } + Type argument = arguments[0]; - this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()}); - this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()}); - this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new SourceDispatcher[] {new ServiceInstanceJVMMemoryDispatcher()}); - this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new SourceDispatcher[] {new ServiceInstanceJVMMemoryPoolDispatcher()}); - } + Object source = ((Class)argument).newInstance(); - public void forward(Source source) { - for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) { - dispatcher.dispatch(source); + if (!Source.class.isAssignableFrom(source.getClass())) { + throw new UnexpectedException("unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. "); + } + + Source dispatcherSource = (Source)source; + SourceDispatcher dispatcher = (SourceDispatcher)aClass.newInstance(); + + Scope scope = dispatcherSource.scope(); + + List dispatchers = this.dispatcherMap.get(scope); + if (dispatchers == null) { + dispatchers = new ArrayList<>(); + this.dispatcherMap.put(scope, dispatchers); + } + + dispatchers.add(dispatcher); + + logger.info("Dispatcher {} is added into Scope {}.", dispatcher.getClass().getName(), scope); + } + } + } } } } diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java index 853c99bc7b10..4c48efdfbb6a 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/SourceReceiverImpl.java @@ -18,6 +18,7 @@ package org.apache.skywalking.oap.server.core.source; +import java.io.IOException; import org.apache.skywalking.oap.server.core.analysis.DispatcherManager; /** @@ -34,4 +35,8 @@ public SourceReceiverImpl() { @Override public void receive(Source source) { dispatcherManager.forward(source); } + + public void scan() throws IOException, InstantiationException, IllegalAccessException { + dispatcherManager.scan(); + } }