From d7bff1e5aa3e4086fcb2d5c9b65507d597b00246 Mon Sep 17 00:00:00 2001 From: feynmanlin Date: Tue, 22 Jun 2021 10:40:47 +0800 Subject: [PATCH] Fix incorrect port of advertisedListener (#10961) Fixes #10951 ### Motivation The advertisedListener has its own port, and now we have no way to obtain the port of TLS and non-TLS advertisedListener except by setting the listenerName through the client. Therefore, brokerServiceUrl and webServiceUrl do not return the address and port of the advertisedListener (cherry picked from commit 99c84c4cf029c4fa3cd3e6dfa47ed0cb5272bedc) --- .../broker/ServiceConfigurationUtils.java | 8 ++++++-- .../MultipleListenerValidatorTest.java | 15 ++++++++++++--- .../apache/pulsar/broker/PulsarService.java | 12 ++++++------ .../impl/ModularLoadManagerImpl.java | 2 +- .../pulsar/compaction/CompactorTool.java | 8 ++++---- .../pulsar/broker/PulsarServiceTest.java | 19 ++++++++++++------- 6 files changed, 41 insertions(+), 23 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java index fe23e6b363a41..6ce69c5aefe46 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java @@ -54,9 +54,13 @@ public static String unsafeLocalhostResolve() { * Get the address of Broker, first try to get it from AdvertisedAddress. * If it is not set, try to get the address set by advertisedListener. * If it is still not set, get it through InetAddress.getLocalHost(). + * @param configuration + * @param ignoreAdvertisedListener Sometimes we can’t use the default key of AdvertisedListener, + * setting it to true can ignore AdvertisedListener. * @return */ - public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration) { + public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration, + boolean ignoreAdvertisedListener) { Map result = MultipleListenerValidator .validateAndAnalysisAdvertisedListener(configuration); @@ -66,7 +70,7 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat } AdvertisedListener advertisedListener = result.get(configuration.getInternalListenerName()); - if (advertisedListener != null) { + if (advertisedListener != null && !ignoreAdvertisedListener) { String address = advertisedListener.getBrokerServiceUrl().getHost(); if (address != null) { return address; diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java index 8928e8223eeb3..f41ed92628571 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.validator; +import java.net.InetAddress; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.testng.annotations.Test; @@ -46,15 +47,23 @@ public void testGetAppliedAdvertised() throws Exception { config.setBrokerServicePortTls(Optional.of(6651)); config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, internal:pulsar+ssl://192.0.0.1:6651"); config.setInternalListenerName("internal"); - assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.1"); + assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false), + "192.0.0.1"); + assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), + InetAddress.getLocalHost().getCanonicalHostName()); config = new ServiceConfiguration(); config.setBrokerServicePortTls(Optional.of(6651)); config.setAdvertisedAddress("192.0.0.2"); - assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.2"); + assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false), + "192.0.0.2"); + assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), + "192.0.0.2"); config.setAdvertisedAddress(null); - assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), + assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false), + ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null)); + assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index be85f545e62ba..cbd3eb3e8cdb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -280,7 +280,7 @@ public PulsarService(ServiceConfiguration config, PulsarConfigurationLoader.isComplete(config); // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName` this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); - this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config); + this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false); state = State.Init; // use `internalListenerName` listener as `advertisedAddress` this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress()); @@ -1361,7 +1361,7 @@ public ShutdownService getShutdownService() { protected String brokerUrl(ServiceConfiguration config) { if (config.getBrokerServicePort().isPresent()) { - return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), + return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), getBrokerListenPort().get()); } else { return null; @@ -1374,7 +1374,7 @@ public static String brokerUrl(String host, int port) { public String brokerUrlTls(ServiceConfiguration config) { if (config.getBrokerServicePortTls().isPresent()) { - return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), + return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), getBrokerListenPortTls().get()); } else { return null; @@ -1387,7 +1387,7 @@ public static String brokerUrlTls(String host, int port) { public String webAddress(ServiceConfiguration config) { if (config.getWebServicePort().isPresent()) { - return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), + return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), getListenPortHTTP().get()); } else { return null; @@ -1400,7 +1400,7 @@ public static String webAddress(String host, int port) { public String webAddressTls(ServiceConfiguration config) { if (config.getWebServicePortTls().isPresent()) { - return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), + return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), getListenPortHTTPS().get()); } else { return null; @@ -1546,7 +1546,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu // worker talks to local broker String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress( - ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig)); + brokerConfig.getAdvertisedAddress()); workerConfig.setWorkerHostname(hostname); workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName()); // inherit broker authorization setting diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java index 6fd2884033f66..5b7867e388a19 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/ModularLoadManagerImpl.java @@ -931,7 +931,7 @@ private void updateLoadBalancingMetrics(final SystemResourceUsage systemResource List metrics = Lists.newArrayList(); Map dimensions = new HashMap<>(); - dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf)); + dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true)); dimensions.put("metric", "loadBalancing"); Metrics m = Metrics.create(dimensions); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index fe7a5b885ff04..ee2b374e56b92 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -104,15 +104,15 @@ public static void main(String[] args) throws Exception { log.info("Found `brokerServicePortTls` in configuration file. \n" + "Will connect pulsar use TLS."); clientBuilder - .serviceUrl(PulsarService.brokerUrlTls(ServiceConfigurationUtils - .getAppliedAdvertisedAddress(brokerConfig), + .serviceUrl(PulsarService.brokerUrlTls( + ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true), brokerConfig.getBrokerServicePortTls().get())) .allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection()) .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath()); } else { - clientBuilder.serviceUrl(PulsarService.brokerUrl(ServiceConfigurationUtils - .getAppliedAdvertisedAddress(brokerConfig), + clientBuilder.serviceUrl(PulsarService.brokerUrl( + ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true), brokerConfig.getBrokerServicePort().get())); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java index 9da4aa7492a5d..1f80221cb47d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/PulsarServiceTest.java @@ -106,17 +106,22 @@ public void testGetWorkerServiceException() throws Exception { @Test public void testAppliedAdvertised() throws Exception { useListenerName = true; - conf.setAdvertisedListeners("internal:pulsar://127.0.0.1, internal:pulsar+ssl://127.0.0.1"); + conf.setAdvertisedListeners("internal:pulsar://127.0.0.1:6650, internal:pulsar+ssl://127.0.0.1:6651"); conf.setInternalListenerName("internal"); setup(); - - AssertJUnit.assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1"); + assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1"); assertNull(pulsar.getConfiguration().getAdvertisedAddress()); assertEquals(conf, pulsar.getConfiguration()); - assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://127.0.0.1:6651"); - assertEquals(pulsar.brokerUrl(conf), "pulsar://127.0.0.1:6660"); - assertEquals(pulsar.webAddress(conf), "http://127.0.0.1:8081"); - assertEquals(pulsar.webAddressTls(conf), "https://127.0.0.1:8082"); + + cleanup(); + resetConfig(); + setup(); + assertEquals(pulsar.getAdvertisedAddress(), "localhost"); + assertEquals(conf, pulsar.getConfiguration()); + assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://localhost:" + pulsar.getBrokerListenPortTls().get()); + assertEquals(pulsar.brokerUrl(conf), "pulsar://localhost:" + pulsar.getBrokerListenPort().get()); + assertEquals(pulsar.webAddress(conf), "http://localhost:" + pulsar.getWebService().getListenPortHTTP().get()); + assertEquals(pulsar.webAddressTls(conf), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get()); } }