Skip to content

Commit

Permalink
[OAL refactor step 1]Refactor dispatcher manager (apache#2006)
Browse files Browse the repository at this point in the history
* Remove the hard codes about DispatcherManager.

* Add comments.
  • Loading branch information
wu-sheng authored Dec 6, 2018
1 parent 5f5be77 commit f2b1588
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,60 @@
import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
import org.apache.skywalking.oap.server.core.cache.*;
import org.apache.skywalking.oap.server.core.cluster.*;
import org.apache.skywalking.oap.server.core.config.*;
import org.apache.skywalking.oap.server.core.query.*;
import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
import org.apache.skywalking.oap.server.core.query.MetricQueryService;
import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
import org.apache.skywalking.oap.server.core.query.TraceQueryService;
import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
import org.apache.skywalking.oap.server.core.register.service.*;
import org.apache.skywalking.oap.server.core.remote.*;
import org.apache.skywalking.oap.server.core.remote.annotation.*;
import org.apache.skywalking.oap.server.core.remote.client.*;
import org.apache.skywalking.oap.server.core.server.*;
import org.apache.skywalking.oap.server.core.source.*;
import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
import org.apache.skywalking.oap.server.core.remote.client.Address;
import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
import org.apache.skywalking.oap.server.core.source.SourceReceiver;
import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
import org.apache.skywalking.oap.server.core.storage.model.IModelOverride;
import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
import org.apache.skywalking.oap.server.library.module.*;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.library.server.ServerException;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
import org.slf4j.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
Expand All @@ -59,6 +92,7 @@ public class CoreModuleProvider extends ModuleProvider {
private final StorageAnnotationListener storageAnnotationListener;
private final StreamAnnotationListener streamAnnotationListener;
private final StreamDataAnnotationContainer streamDataAnnotationContainer;
private final SourceReceiverImpl receiver;

public CoreModuleProvider() {
super();
Expand All @@ -67,6 +101,7 @@ public CoreModuleProvider() {
this.storageAnnotationListener = new StorageAnnotationListener();
this.streamAnnotationListener = new StreamAnnotationListener();
this.streamDataAnnotationContainer = new StreamDataAnnotationContainer();
receiver = new SourceReceiverImpl();
}

@Override public String name() {
Expand Down Expand Up @@ -101,7 +136,7 @@ public CoreModuleProvider() {

this.registerServiceImplementation(IComponentLibraryCatalogService.class, new ComponentLibraryCatalogService());

this.registerServiceImplementation(SourceReceiver.class, new SourceReceiverImpl());
this.registerServiceImplementation(SourceReceiver.class, receiver);

this.registerServiceImplementation(StreamDataClassGetter.class, streamDataAnnotationContainer);

Expand Down Expand Up @@ -143,10 +178,12 @@ public CoreModuleProvider() {
remoteClientManager.start();

try {
receiver.scan();

annotationScan.scan(() -> {
streamDataAnnotationContainer.generate(streamAnnotationListener.getStreamClasses());
});
} catch (IOException e) {
} catch (IOException | IllegalAccessException | InstantiationException e) {
throw new ModuleStartException(e.getMessage(), e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,91 @@

package org.apache.skywalking.oap.server.core.analysis;

import java.util.*;
import org.apache.skywalking.oap.server.core.analysis.generated.all.AllDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.endpoint.EndpointDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.endpointrelation.EndpointRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.service.ServiceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstance.ServiceInstanceDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmcpu.ServiceInstanceJVMCPUDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmgc.ServiceInstanceJVMGCDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemory.ServiceInstanceJVMMemoryDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancejvmmemorypool.ServiceInstanceJVMMemoryPoolDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.serviceinstancerelation.ServiceInstanceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.generated.servicerelation.ServiceRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.endpointrelation.EndpointCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentDispatcher;
import org.apache.skywalking.oap.server.core.analysis.manual.servicerelation.ServiceCallRelationDispatcher;
import org.apache.skywalking.oap.server.core.source.*;
import org.slf4j.*;
import com.google.common.collect.ImmutableSet;
import com.google.common.reflect.ClassPath;
import java.io.IOException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.source.Scope;
import org.apache.skywalking.oap.server.core.source.Source;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author peng-yongsheng
* @author peng-yongsheng, wusheng
*/
public class DispatcherManager {

private static final Logger logger = LoggerFactory.getLogger(DispatcherManager.class);

private Map<Scope, SourceDispatcher[]> dispatcherMap;
private Map<Scope, List<SourceDispatcher>> dispatcherMap;

public DispatcherManager() {
this.dispatcherMap = new HashMap<>();
}

this.dispatcherMap.put(Scope.All, new SourceDispatcher[] {new AllDispatcher()});
public void forward(Source source) {
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
}
}

this.dispatcherMap.put(Scope.Segment, new SourceDispatcher[] {new SegmentDispatcher()});
/**
* Scan all classes under `org.apache.skywalking` package,
*
* If it implement {@link org.apache.skywalking.oap.server.core.analysis.SourceDispatcher}, then, it will be added
* into this DispatcherManager based on the Source definition.
*
* @throws IOException
* @throws IllegalAccessException
* @throws InstantiationException
*/
public void scan() throws IOException, IllegalAccessException, InstantiationException {
ClassPath classpath = ClassPath.from(this.getClass().getClassLoader());
ImmutableSet<ClassPath.ClassInfo> classes = classpath.getTopLevelClassesRecursive("org.apache.skywalking");
for (ClassPath.ClassInfo classInfo : classes) {
Class<?> aClass = classInfo.load();

this.dispatcherMap.put(Scope.Service, new SourceDispatcher[] {new ServiceDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstance, new SourceDispatcher[] {new ServiceInstanceDispatcher()});
this.dispatcherMap.put(Scope.Endpoint, new SourceDispatcher[] {new EndpointDispatcher()});
if (!aClass.isInterface() && SourceDispatcher.class.isAssignableFrom(aClass)) {
Type[] genericInterfaces = aClass.getGenericInterfaces();
for (Type genericInterface : genericInterfaces) {
ParameterizedType anInterface = (ParameterizedType)genericInterface;
if (anInterface.getRawType().getTypeName().equals(SourceDispatcher.class.getName())) {
Type[] arguments = anInterface.getActualTypeArguments();

this.dispatcherMap.put(Scope.ServiceRelation, new SourceDispatcher[] {new ServiceRelationDispatcher(), new ServiceCallRelationDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceRelation, new SourceDispatcher[] {new ServiceInstanceRelationDispatcher()});
this.dispatcherMap.put(Scope.EndpointRelation, new SourceDispatcher[] {new EndpointRelationDispatcher(), new EndpointCallRelationDispatcher()});
if (arguments.length != 1) {
throw new UnexpectedException("unexpected type argument number, class " + aClass.getName());
}
Type argument = arguments[0];

this.dispatcherMap.put(Scope.ServiceInstanceJVMCPU, new SourceDispatcher[] {new ServiceInstanceJVMCPUDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMGC, new SourceDispatcher[] {new ServiceInstanceJVMGCDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMMemory, new SourceDispatcher[] {new ServiceInstanceJVMMemoryDispatcher()});
this.dispatcherMap.put(Scope.ServiceInstanceJVMMemoryPool, new SourceDispatcher[] {new ServiceInstanceJVMMemoryPoolDispatcher()});
}
Object source = ((Class)argument).newInstance();

public void forward(Source source) {
for (SourceDispatcher dispatcher : dispatcherMap.get(source.scope())) {
dispatcher.dispatch(source);
if (!Source.class.isAssignableFrom(source.getClass())) {
throw new UnexpectedException("unexpected type argument of class " + aClass.getName() + ", should be `org.apache.skywalking.oap.server.core.source.Source`. ");
}

Source dispatcherSource = (Source)source;
SourceDispatcher dispatcher = (SourceDispatcher)aClass.newInstance();

Scope scope = dispatcherSource.scope();

List<SourceDispatcher> dispatchers = this.dispatcherMap.get(scope);
if (dispatchers == null) {
dispatchers = new ArrayList<>();
this.dispatcherMap.put(scope, dispatchers);
}

dispatchers.add(dispatcher);

logger.info("Dispatcher {} is added into Scope {}.", dispatcher.getClass().getName(), scope);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.skywalking.oap.server.core.source;

import java.io.IOException;
import org.apache.skywalking.oap.server.core.analysis.DispatcherManager;

/**
Expand All @@ -34,4 +35,8 @@ public SourceReceiverImpl() {
@Override public void receive(Source source) {
dispatcherManager.forward(source);
}

public void scan() throws IOException, InstantiationException, IllegalAccessException {
dispatcherManager.scan();
}
}

0 comments on commit f2b1588

Please sign in to comment.