diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java index c47b8ca79812d..7507181f34bdd 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java @@ -85,10 +85,20 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat * Gets the internal advertised listener for broker-to-broker communication. * @return a non-null advertised listener */ - public static AdvertisedListener getInternalListener(ServiceConfiguration config) { + public static AdvertisedListener getInternalListener(ServiceConfiguration config, String protocol) { Map result = MultipleListenerValidator .validateAndAnalysisAdvertisedListener(config); AdvertisedListener internal = result.get(config.getInternalListenerName()); + if (internal == null || !internal.hasUriForProtocol(protocol)) { + // Search for an advertised listener for same protocol + for (AdvertisedListener l : result.values()) { + if (l.hasUriForProtocol(protocol)) { + internal = l; + break; + } + } + } + if (internal == null) { // synthesize an advertised listener based on legacy configuration properties String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java index cb9eaf76e9fea..05688f0660b18 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java @@ -80,7 +80,7 @@ public static Map validateAndAnalysisAdvertisedListe throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`"); } - URI pulsarAddress = null, pulsarSslAddress = null; + URI pulsarAddress = null, pulsarSslAddress = null, pulsarHttpAddress = null, pulsarHttpsAddress = null; for (final String strUri : entry.getValue()) { try { URI uri = URI.create(strUri); @@ -98,7 +98,22 @@ public static Map validateAndAnalysisAdvertisedListe throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`"); } + } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "http")) { + if (pulsarHttpAddress == null) { + pulsarHttpAddress = uri; + } else { + throw new IllegalArgumentException("there are redundant configure for listener `" + + entry.getKey() + "`"); + } + } else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "https")) { + if (pulsarHttpsAddress == null) { + pulsarHttpsAddress = uri; + } else { + throw new IllegalArgumentException("there are redundant configure for listener `" + + entry.getKey() + "`"); + } } + String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort()); Set sets = reverseMappings.computeIfAbsent(hostPort, k -> new TreeSet<>()); sets.add(entry.getKey()); @@ -108,11 +123,15 @@ public static Map validateAndAnalysisAdvertisedListe } } catch (Throwable cause) { throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` " - + "configure is invalid"); + + "configure is invalid", cause); } } - result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress) - .brokerServiceUrlTls(pulsarSslAddress).build()); + result.put(entry.getKey(), AdvertisedListener.builder() + .brokerServiceUrl(pulsarAddress) + .brokerServiceUrlTls(pulsarSslAddress) + .brokerHttpUrl(pulsarHttpAddress) + .brokerHttpsUrl(pulsarHttpsAddress) + .build()); } return result; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index 88c7384975a02..730fe789e9fd0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -1459,7 +1459,7 @@ public TransactionBufferClient getTransactionBufferClient() { * Gets the broker service URL (non-TLS) associated with the internal listener. */ protected String brokerUrl(ServiceConfiguration config) { - AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config); + AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar"); return internalListener.getBrokerServiceUrl() != null ? internalListener.getBrokerServiceUrl().toString() : null; } @@ -1472,7 +1472,7 @@ public static String brokerUrl(String host, int port) { * Gets the broker service URL (TLS) associated with the internal listener. */ public String brokerUrlTls(ServiceConfiguration config) { - AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config); + AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar+ssl"); return internalListener.getBrokerServiceUrlTls() != null ? internalListener.getBrokerServiceUrlTls().toString() : null; } @@ -1483,7 +1483,10 @@ public static String brokerUrlTls(String host, int port) { public String webAddress(ServiceConfiguration config) { if (config.getWebServicePort().isPresent()) { - return webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get()); + AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http"); + return internalListener.getBrokerHttpUrl() != null + ? internalListener.getBrokerHttpUrl().toString() + : webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get()); } else { return null; } @@ -1495,7 +1498,10 @@ public static String webAddress(String host, int port) { public String webAddressTls(ServiceConfiguration config) { if (config.getWebServicePortTls().isPresent()) { - return webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get()); + AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https"); + return internalListener.getBrokerHttpsUrl() != null + ? internalListener.getBrokerHttpsUrl().toString() + : webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get()); } else { return null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java index 263fedca623f0..8e397afcf910d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactorTool.java @@ -110,7 +110,7 @@ public static void main(String[] args) throws Exception { brokerConfig.getBrokerClientAuthenticationParameters()); } - AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig); + AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl"); if (internalListener.getBrokerServiceUrlTls() != null) { log.info("Found a TLS-based advertised listener in configuration file. \n" + "Will connect pulsar use TLS."); @@ -120,6 +120,7 @@ public static void main(String[] args) throws Exception { .tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath()); } else { + internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar"); clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString()); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java new file mode 100644 index 0000000000000..6ff49674e2e77 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/AdvertisedListenersTest.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.loadbalance; + +import static org.testng.Assert.assertEquals; +import java.net.URI; +import java.util.Optional; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.util.PortManager; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHeaders; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.apache.pulsar.broker.MultiBrokerBaseTest; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.lookup.data.LookupData; +import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.util.ObjectMapperFactory; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class AdvertisedListenersTest extends MultiBrokerBaseTest { + @Override + protected int numberOfAdditionalBrokers() { + return 1; + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + + updateConfig(conf, "BROKER-X"); + } + + @Override + protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) { + ServiceConfiguration conf = super.createConfForAdditionalBroker(additionalBrokerIndex); + updateConfig(conf, "BROKER-" + additionalBrokerIndex); + return conf; + } + + private void updateConfig(ServiceConfiguration conf, String advertisedAddress) { + int pulsarPort = PortManager.nextFreePort(); + int httpPort = PortManager.nextFreePort(); + int httpsPort = PortManager.nextFreePort(); + + // Use invalid domain name as identifier and instead make sure the advertised listeners work as intended + this.conf.setAdvertisedAddress(advertisedAddress); + this.conf.setAdvertisedListeners( + "public:pulsar://localhost:" + pulsarPort + + ",public_http:http://localhost:" + httpPort + + ",public_https:https://localhost:" + httpsPort); + this.conf.setBrokerServicePort(Optional.of(pulsarPort)); + this.conf.setWebServicePort(Optional.of(httpPort)); + this.conf.setWebServicePortTls(Optional.of(httpsPort)); + } + + @Test + public void testLookup() throws Exception { + HttpGet request = + new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic"); + request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json"); + request.addHeader(HttpHeaders.ACCEPT, "application/json"); + + @Cleanup + CloseableHttpClient httpClient = HttpClients.createDefault(); + + @Cleanup + CloseableHttpResponse response = httpClient.execute(request); + + HttpEntity entity = response.getEntity(); + LookupData ld = ObjectMapperFactory.getThreadLocal().readValue(EntityUtils.toString(entity), LookupData.class); + System.err.println("Lookup data: " + ld); + + assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost"); + assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost"); + assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost"); + + + // Produce data + @Cleanup + Producer p = pulsarClient.newProducer(Schema.STRING) + .topic("my-topic") + .create(); + + p.send("hello"); + + // Verify we can get the correct HTTP redirect to the advertised listener + for (PulsarAdmin a : getAllAdmins()) { + TopicStats s = a.topics().getStats("my-topic"); + assertEquals(s.getPublishers().size(), 1); + } + } + +} diff --git a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java index a310974ffa358..b73fdab4483be 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/AdvertisedListener.java @@ -44,4 +44,29 @@ public class AdvertisedListener { @Setter // the broker service uri with ssl private URI brokerServiceUrlTls; + + // + @Getter + @Setter + // the broker service uri without ssl + private URI brokerHttpUrl; + // + @Getter + @Setter + // the broker service uri with ssl + private URI brokerHttpsUrl; + + public boolean hasUriForProtocol(String protocol) { + if ("pulsar".equals(protocol)) { + return brokerServiceUrl != null; + } else if ("pulsar+ssl".equals(protocol)) { + return brokerServiceUrlTls != null; + } else if ("http".equals(protocol)) { + return brokerHttpUrl != null; + } else if ("https".equals(protocol)) { + return brokerHttpsUrl != null; + } else { + return false; + } + } }