Skip to content

Commit

Permalink
Fix incorrect port of advertisedListener (apache#10961)
Browse files Browse the repository at this point in the history
Fixes apache#10951

### Motivation
The advertisedListener has its own port, and now we have no way to obtain the port of TLS and non-TLS advertisedListener except by setting the listenerName through the client.
Therefore, brokerServiceUrl and webServiceUrl do not return the address and port of the advertisedListener


(cherry picked from commit 99c84c4)
  • Loading branch information
315157973 authored and codelipenghui committed Jun 25, 2021
1 parent b7259cd commit d7bff1e
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@ public static String unsafeLocalhostResolve() {
* Get the address of Broker, first try to get it from AdvertisedAddress.
* If it is not set, try to get the address set by advertisedListener.
* If it is still not set, get it through InetAddress.getLocalHost().
* @param configuration
* @param ignoreAdvertisedListener Sometimes we can’t use the default key of AdvertisedListener,
* setting it to true can ignore AdvertisedListener.
* @return
*/
public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration) {
public static String getAppliedAdvertisedAddress(ServiceConfiguration configuration,
boolean ignoreAdvertisedListener) {
Map<String, AdvertisedListener> result = MultipleListenerValidator
.validateAndAnalysisAdvertisedListener(configuration);

Expand All @@ -66,7 +70,7 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat
}

AdvertisedListener advertisedListener = result.get(configuration.getInternalListenerName());
if (advertisedListener != null) {
if (advertisedListener != null && !ignoreAdvertisedListener) {
String address = advertisedListener.getBrokerServiceUrl().getHost();
if (address != null) {
return address;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.validator;

import java.net.InetAddress;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.ServiceConfigurationUtils;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -46,15 +47,23 @@ public void testGetAppliedAdvertised() throws Exception {
config.setBrokerServicePortTls(Optional.of(6651));
config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, internal:pulsar+ssl://192.0.0.1:6651");
config.setInternalListenerName("internal");
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.1");
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
"192.0.0.1");
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
InetAddress.getLocalHost().getCanonicalHostName());

config = new ServiceConfiguration();
config.setBrokerServicePortTls(Optional.of(6651));
config.setAdvertisedAddress("192.0.0.2");
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config), "192.0.0.2");
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
"192.0.0.2");
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
"192.0.0.2");

config.setAdvertisedAddress(null);
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false),
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public PulsarService(ServiceConfiguration config,
PulsarConfigurationLoader.isComplete(config);
// validate `advertisedAddress`, `advertisedListeners`, `internalListenerName`
this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config);
this.advertisedAddress = ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false);
state = State.Init;
// use `internalListenerName` listener as `advertisedAddress`
this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress());
Expand Down Expand Up @@ -1361,7 +1361,7 @@ public ShutdownService getShutdownService() {

protected String brokerUrl(ServiceConfiguration config) {
if (config.getBrokerServicePort().isPresent()) {
return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
return brokerUrl(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getBrokerListenPort().get());
} else {
return null;
Expand All @@ -1374,7 +1374,7 @@ public static String brokerUrl(String host, int port) {

public String brokerUrlTls(ServiceConfiguration config) {
if (config.getBrokerServicePortTls().isPresent()) {
return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
return brokerUrlTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getBrokerListenPortTls().get());
} else {
return null;
Expand All @@ -1387,7 +1387,7 @@ public static String brokerUrlTls(String host, int port) {

public String webAddress(ServiceConfiguration config) {
if (config.getWebServicePort().isPresent()) {
return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
return webAddress(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getListenPortHTTP().get());
} else {
return null;
Expand All @@ -1400,7 +1400,7 @@ public static String webAddress(String host, int port) {

public String webAddressTls(ServiceConfiguration config) {
if (config.getWebServicePortTls().isPresent()) {
return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config),
return webAddressTls(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true),
getListenPortHTTPS().get());
} else {
return null;
Expand Down Expand Up @@ -1546,7 +1546,7 @@ public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfigu

// worker talks to local broker
String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(
ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig));
brokerConfig.getAdvertisedAddress());
workerConfig.setWorkerHostname(hostname);
workerConfig.setPulsarFunctionsCluster(brokerConfig.getClusterName());
// inherit broker authorization setting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ private void updateLoadBalancingMetrics(final SystemResourceUsage systemResource
List<Metrics> metrics = Lists.newArrayList();
Map<String, String> dimensions = new HashMap<>();

dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf));
dimensions.put("broker", ServiceConfigurationUtils.getAppliedAdvertisedAddress(conf, true));
dimensions.put("metric", "loadBalancing");

Metrics m = Metrics.create(dimensions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,15 @@ public static void main(String[] args) throws Exception {
log.info("Found `brokerServicePortTls` in configuration file. \n"
+ "Will connect pulsar use TLS.");
clientBuilder
.serviceUrl(PulsarService.brokerUrlTls(ServiceConfigurationUtils
.getAppliedAdvertisedAddress(brokerConfig),
.serviceUrl(PulsarService.brokerUrlTls(
ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
brokerConfig.getBrokerServicePortTls().get()))
.allowTlsInsecureConnection(brokerConfig.isTlsAllowInsecureConnection())
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());

} else {
clientBuilder.serviceUrl(PulsarService.brokerUrl(ServiceConfigurationUtils
.getAppliedAdvertisedAddress(brokerConfig),
clientBuilder.serviceUrl(PulsarService.brokerUrl(
ServiceConfigurationUtils.getAppliedAdvertisedAddress(brokerConfig, true),
brokerConfig.getBrokerServicePort().get()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,22 @@ public void testGetWorkerServiceException() throws Exception {
@Test
public void testAppliedAdvertised() throws Exception {
useListenerName = true;
conf.setAdvertisedListeners("internal:pulsar://127.0.0.1, internal:pulsar+ssl://127.0.0.1");
conf.setAdvertisedListeners("internal:pulsar://127.0.0.1:6650, internal:pulsar+ssl://127.0.0.1:6651");
conf.setInternalListenerName("internal");
setup();

AssertJUnit.assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
assertEquals(pulsar.getAdvertisedAddress(), "127.0.0.1");
assertNull(pulsar.getConfiguration().getAdvertisedAddress());
assertEquals(conf, pulsar.getConfiguration());
assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://127.0.0.1:6651");
assertEquals(pulsar.brokerUrl(conf), "pulsar://127.0.0.1:6660");
assertEquals(pulsar.webAddress(conf), "http://127.0.0.1:8081");
assertEquals(pulsar.webAddressTls(conf), "https://127.0.0.1:8082");

cleanup();
resetConfig();
setup();
assertEquals(pulsar.getAdvertisedAddress(), "localhost");
assertEquals(conf, pulsar.getConfiguration());
assertEquals(pulsar.brokerUrlTls(conf), "pulsar+ssl://localhost:" + pulsar.getBrokerListenPortTls().get());
assertEquals(pulsar.brokerUrl(conf), "pulsar://localhost:" + pulsar.getBrokerListenPort().get());
assertEquals(pulsar.webAddress(conf), "http://localhost:" + pulsar.getWebService().getListenPortHTTP().get());
assertEquals(pulsar.webAddressTls(conf), "https://localhost:" + pulsar.getWebService().getListenPortHTTPS().get());
}

}

0 comments on commit d7bff1e

Please sign in to comment.