Skip to content

Commit

Permalink
Simplify event publishing (apache#11863)
Browse files Browse the repository at this point in the history
  • Loading branch information
wxbty authored Mar 28, 2023
1 parent 20a713f commit 1435c23
Show file tree
Hide file tree
Showing 42 changed files with 581 additions and 493 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsEvent;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.registry.event.RegistryEvent;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -131,8 +130,6 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
*/
private final int reconnectTaskPeriod;

private final GlobalMetricsEventMulticaster eventMulticaster;

private ApplicationModel applicationModel;

public AbstractDirectory(URL url) {
Expand Down Expand Up @@ -187,7 +184,6 @@ public AbstractDirectory(URL url, RouterChain<T> routerChain, boolean isUrlFromR
this.reconnectTaskPeriod = configuration.getInt(RECONNECT_TASK_PERIOD, DEFAULT_RECONNECT_TASK_PERIOD);
setRouterChain(routerChain);

eventMulticaster = applicationModel.getBeanFactory().getBean(GlobalMetricsEventMulticaster.class);
}

@Override
Expand Down Expand Up @@ -388,15 +384,9 @@ private void refreshInvokers(BitList<Invoker<T>> targetInvokers, Collection<Invo
invokersToRemove.removeAll(needToRemove);
}

private void publishMetricsEvent(MetricsEvent event) {
if (eventMulticaster != null) {
eventMulticaster.publishEvent(event);
}
}

@Override
public void addDisabledInvoker(Invoker<T> invoker) {
publishMetricsEvent(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_DISABLE));
MetricsEventBus.publish(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_DISABLE));
if (invokers.contains(invoker)) {
disabledInvokers.add(invoker);
removeValidInvoker(invoker);
Expand All @@ -406,7 +396,7 @@ public void addDisabledInvoker(Invoker<T> invoker) {

@Override
public void recoverDisabledInvoker(Invoker<T> invoker) {
publishMetricsEvent(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_RECOVER_DISABLE));
MetricsEventBus.publish(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_RECOVER_DISABLE));
if (disabledInvokers.remove(invoker)) {
try {
addValidInvoker(invoker);
Expand Down Expand Up @@ -470,7 +460,7 @@ protected void setInvokers(BitList<Invoker<T>> invokers) {
this.invokers = invokers;
refreshInvokerInternal();
this.invokersInitialized = true;
publishMetricsEvent(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_CURRENT, invokers.size()));
MetricsEventBus.publish(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_CURRENT, invokers.size()));
}

protected void destroyInvokers() {
Expand All @@ -481,14 +471,14 @@ protected void destroyInvokers() {
}

private boolean addValidInvoker(Invoker<T> invoker) {
publishMetricsEvent(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_VALID));
MetricsEventBus.publish(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_VALID));
synchronized (this.validInvokers) {
return this.validInvokers.add(invoker);
}
}

private boolean removeValidInvoker(Invoker<T> invoker) {
publishMetricsEvent(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_UN_VALID));
MetricsEventBus.publish(new RegistryEvent.MetricsDirectoryEvent(applicationModel, RegistryEvent.ApplicationType.D_UN_VALID));
synchronized (this.validInvokers) {
return this.validInvokers.remove(invoker);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.Holder;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.cluster.router.MockInvoker;
Expand Down Expand Up @@ -62,7 +62,7 @@ void testStaticDirectory() {


List<Invoker<String>> filteredInvokers = router.route(invokers.clone(), URL.valueOf("consumer://" + NetUtils.getLocalHost() + "/com.foo.BarService"), new RpcInvocation(), false, new Holder<>());
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
StaticDirectory<String> staticDirectory = new StaticDirectory<>(filteredInvokers);
boolean isAvailable = staticDirectory.isAvailable();
Assertions.assertTrue(!isAvailable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -61,7 +61,7 @@ class FileRouterEngineTest {

@BeforeAll
public static void setUpBeforeClass() throws Exception {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
System.setProperty(ENABLE_CONNECTIVITY_VALIDATION, "false");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
Expand Down Expand Up @@ -107,7 +107,7 @@ public static void afterClass() {
@SuppressWarnings({"unchecked"})
@BeforeEach
public void setUp() throws Exception {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
Map<String, Object> attributes = new HashMap<>();
attributes.put("application", "abstractClusterInvokerTest");
url = url.putAttribute(REFER_KEY, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
Expand Down Expand Up @@ -72,7 +72,7 @@ class ConnectivityValidationTest {

@BeforeEach
public void setup() {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
invoker1 = Mockito.mock(Invoker.class);
invoker2 = Mockito.mock(Invoker.class);
invoker3 = Mockito.mock(Invoker.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.dubbo.rpc.cluster.support;

import org.apache.dubbo.common.URL;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.AppResponse;
import org.apache.dubbo.rpc.AsyncRpcResult;
import org.apache.dubbo.rpc.Invocation;
Expand Down Expand Up @@ -67,7 +67,7 @@ class FailoverClusterInvokerTest {

@BeforeEach
public void setUp() throws Exception {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
dic = mock(Directory.class);

given(dic.getUrl()).willReturn(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
Expand Down Expand Up @@ -51,7 +51,7 @@ class MockClusterInvokerTest {

@BeforeEach
public void beforeMethod() {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
invokers.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.ProxyFactory;
Expand Down Expand Up @@ -48,7 +48,7 @@ class MockProviderRpcExceptionTest {

@BeforeEach
public void beforeMethod() {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
invokers.clear();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.event.MetricsDispatcher;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
Expand Down Expand Up @@ -64,7 +64,7 @@ class ScopeClusterInvokerTest {

@BeforeEach
void beforeMonth() {
ApplicationModel.defaultModel().getBeanFactory().registerBean(GlobalMetricsEventMulticaster.class);
ApplicationModel.defaultModel().getBeanFactory().registerBean(MetricsDispatcher.class);
}

@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,10 @@ public void destroy() {
}
}

public boolean isDestroyed() {
return destroyed.get();
}

private void checkDestroyed() {
if (destroyed.get()) {
throw new IllegalStateException("ScopeBeanFactory is destroyed");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,15 @@
import org.apache.dubbo.config.support.Parameter;
import org.apache.dubbo.config.utils.ConfigValidationUtils;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.model.TimePair;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.registry.event.RegistryEvent;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.ProxyFactory;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.ServerService;
import org.apache.dubbo.rpc.cluster.ConfiguratorFactory;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
import org.apache.dubbo.rpc.model.ModuleServiceRepository;
import org.apache.dubbo.rpc.model.ProviderModel;
Expand Down Expand Up @@ -427,34 +424,23 @@ private void doExportUrls() {

List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

TimePair timePair = TimePair.start();
ApplicationModel applicationModel = module.getApplicationModel();
try {
applicationModel.getBeanFactory().getBean(GlobalMetricsEventMulticaster.class);
} catch (Throwable t) {
applicationModel = ApplicationModel.defaultModel();
}
GlobalMetricsEventMulticaster eventMulticaster = applicationModel.getBeanFactory().getBean(GlobalMetricsEventMulticaster.class);
int size = protocols.size() * registryURLs.size();
eventMulticaster.publishEvent(new RegistryEvent.MetricsServiceRegisterEvent(applicationModel, timePair, getUniqueServiceName(), size));

for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// stub service will use generated service name
if (!serverService) {
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
}
try {
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
} catch (RpcException e) {
eventMulticaster.publishErrorEvent(new RegistryEvent.MetricsServiceRegisterEvent(applicationModel, timePair, getUniqueServiceName(), registryURLs.size()));
throw e;
MetricsEventBus.post(new RegistryEvent.MetricsServiceRegisterEvent(module.getApplicationModel(), getUniqueServiceName(), protocols.size() * registryURLs.size()),
() -> {
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// stub service will use generated service name
if (!serverService) {
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
}
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
return null;
}
}
eventMulticaster.publishFinishEvent(new RegistryEvent.MetricsServiceRegisterEvent(applicationModel, timePair, getUniqueServiceName(), size));
);

providerModel.setServiceUrls(urls);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@
import org.apache.dubbo.metadata.report.MetadataReportInstance;
import org.apache.dubbo.metrics.collector.ConfigCenterMetricsCollector;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.event.GlobalMetricsEventMulticaster;
import org.apache.dubbo.metrics.model.TimePair;
import org.apache.dubbo.metrics.event.MetricsEventBus;
import org.apache.dubbo.metrics.registry.event.RegistryEvent;
import org.apache.dubbo.metrics.report.MetricsReporter;
import org.apache.dubbo.metrics.report.MetricsReporterFactory;
Expand Down Expand Up @@ -841,17 +840,18 @@ private DynamicConfiguration getDynamicConfiguration(URL connectionURL) {
private final AtomicInteger serviceRefreshState = new AtomicInteger(0);

private void registerServiceInstance() {
TimePair timePair = TimePair.start();
GlobalMetricsEventMulticaster eventMulticaster = applicationModel.getBeanFactory().getBean(GlobalMetricsEventMulticaster.class);
eventMulticaster.publishEvent(new RegistryEvent.MetricsApplicationRegisterEvent(applicationModel, timePair));
try {
registered = true;
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
eventMulticaster.publishFinishEvent(new RegistryEvent.MetricsApplicationRegisterEvent(applicationModel, timePair));
MetricsEventBus.post(new RegistryEvent.MetricsApplicationRegisterEvent(applicationModel),
() -> {
ServiceInstanceMetadataUtils.registerMetadataAndInstance(applicationModel);
return null;
}
);
} catch (Exception e) {
eventMulticaster.publishErrorEvent(new RegistryEvent.MetricsApplicationRegisterEvent(applicationModel, timePair));
logger.error(CONFIG_REGISTER_INSTANCE_ERROR, "configuration server disconnected", "", "Register instance error.", e);
}

if (registered) {
// scheduled task for updating Metadata and ServiceInstance
asyncMetadataFuture = frameworkExecutorRepository.getSharedScheduledExecutor().scheduleWithFixedDelay(() -> {
Expand Down
4 changes: 0 additions & 4 deletions dubbo-distribution/dubbo-all/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -460,10 +460,6 @@
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
</dependency>
<dependency>
<groupId>com.tdunning</groupId>
<artifactId>t-digest</artifactId>
</dependency>

<!-- Temporarily add this part to exclude transitive dependency -->
<dependency>
Expand Down
Loading

0 comments on commit 1435c23

Please sign in to comment.