Skip to content

Commit

Permalink
Support advertised listeners for HTTP and HTTPS services (apache#14839)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored Mar 28, 2022
1 parent 12faf2d commit 3f66c26
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,20 @@ public static String getAppliedAdvertisedAddress(ServiceConfiguration configurat
* Gets the internal advertised listener for broker-to-broker communication.
* @return a non-null advertised listener
*/
public static AdvertisedListener getInternalListener(ServiceConfiguration config) {
public static AdvertisedListener getInternalListener(ServiceConfiguration config, String protocol) {
Map<String, AdvertisedListener> result = MultipleListenerValidator
.validateAndAnalysisAdvertisedListener(config);
AdvertisedListener internal = result.get(config.getInternalListenerName());
if (internal == null || !internal.hasUriForProtocol(protocol)) {
// Search for an advertised listener for same protocol
for (AdvertisedListener l : result.values()) {
if (l.hasUriForProtocol(protocol)) {
internal = l;
break;
}
}
}

if (internal == null) {
// synthesize an advertised listener based on legacy configuration properties
String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey()
+ "`");
}
URI pulsarAddress = null, pulsarSslAddress = null;
URI pulsarAddress = null, pulsarSslAddress = null, pulsarHttpAddress = null, pulsarHttpsAddress = null;
for (final String strUri : entry.getValue()) {
try {
URI uri = URI.create(strUri);
Expand All @@ -98,7 +98,22 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
throw new IllegalArgumentException("there are redundant configure for listener `"
+ entry.getKey() + "`");
}
} else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "http")) {
if (pulsarHttpAddress == null) {
pulsarHttpAddress = uri;
} else {
throw new IllegalArgumentException("there are redundant configure for listener `"
+ entry.getKey() + "`");
}
} else if (StringUtils.equalsIgnoreCase(uri.getScheme(), "https")) {
if (pulsarHttpsAddress == null) {
pulsarHttpsAddress = uri;
} else {
throw new IllegalArgumentException("there are redundant configure for listener `"
+ entry.getKey() + "`");
}
}

String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> new TreeSet<>());
sets.add(entry.getKey());
Expand All @@ -108,11 +123,15 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
}
} catch (Throwable cause) {
throw new IllegalArgumentException("the value " + strUri + " in the `advertisedListeners` "
+ "configure is invalid");
+ "configure is invalid", cause);
}
}
result.put(entry.getKey(), AdvertisedListener.builder().brokerServiceUrl(pulsarAddress)
.brokerServiceUrlTls(pulsarSslAddress).build());
result.put(entry.getKey(), AdvertisedListener.builder()
.brokerServiceUrl(pulsarAddress)
.brokerServiceUrlTls(pulsarSslAddress)
.brokerHttpUrl(pulsarHttpAddress)
.brokerHttpsUrl(pulsarHttpsAddress)
.build());
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,7 @@ public TransactionBufferClient getTransactionBufferClient() {
* Gets the broker service URL (non-TLS) associated with the internal listener.
*/
protected String brokerUrl(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar");
return internalListener.getBrokerServiceUrl() != null
? internalListener.getBrokerServiceUrl().toString() : null;
}
Expand All @@ -1472,7 +1472,7 @@ public static String brokerUrl(String host, int port) {
* Gets the broker service URL (TLS) associated with the internal listener.
*/
public String brokerUrlTls(ServiceConfiguration config) {
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config);
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "pulsar+ssl");
return internalListener.getBrokerServiceUrlTls() != null
? internalListener.getBrokerServiceUrlTls().toString() : null;
}
Expand All @@ -1483,7 +1483,10 @@ public static String brokerUrlTls(String host, int port) {

public String webAddress(ServiceConfiguration config) {
if (config.getWebServicePort().isPresent()) {
return webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "http");
return internalListener.getBrokerHttpUrl() != null
? internalListener.getBrokerHttpUrl().toString()
: webAddress(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTP().get());
} else {
return null;
}
Expand All @@ -1495,7 +1498,10 @@ public static String webAddress(String host, int port) {

public String webAddressTls(ServiceConfiguration config) {
if (config.getWebServicePortTls().isPresent()) {
return webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(config, "https");
return internalListener.getBrokerHttpsUrl() != null
? internalListener.getBrokerHttpsUrl().toString()
: webAddressTls(ServiceConfigurationUtils.getWebServiceAddress(config), getListenPortHTTPS().get());
} else {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public static void main(String[] args) throws Exception {
brokerConfig.getBrokerClientAuthenticationParameters());
}

AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig);
AdvertisedListener internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar+ssl");
if (internalListener.getBrokerServiceUrlTls() != null) {
log.info("Found a TLS-based advertised listener in configuration file. \n"
+ "Will connect pulsar use TLS.");
Expand All @@ -120,6 +120,7 @@ public static void main(String[] args) throws Exception {
.tlsTrustCertsFilePath(brokerConfig.getTlsCertificateFilePath());

} else {
internalListener = ServiceConfigurationUtils.getInternalListener(brokerConfig, "pulsar");
clientBuilder.serviceUrl(internalListener.getBrokerServiceUrl().toString());
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.loadbalance;

import static org.testng.Assert.assertEquals;
import java.net.URI;
import java.util.Optional;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.util.PortManager;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.lookup.data.LookupData;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class AdvertisedListenersTest extends MultiBrokerBaseTest {
@Override
protected int numberOfAdditionalBrokers() {
return 1;
}

@Override
protected void doInitConf() throws Exception {
super.doInitConf();

updateConfig(conf, "BROKER-X");
}

@Override
protected ServiceConfiguration createConfForAdditionalBroker(int additionalBrokerIndex) {
ServiceConfiguration conf = super.createConfForAdditionalBroker(additionalBrokerIndex);
updateConfig(conf, "BROKER-" + additionalBrokerIndex);
return conf;
}

private void updateConfig(ServiceConfiguration conf, String advertisedAddress) {
int pulsarPort = PortManager.nextFreePort();
int httpPort = PortManager.nextFreePort();
int httpsPort = PortManager.nextFreePort();

// Use invalid domain name as identifier and instead make sure the advertised listeners work as intended
this.conf.setAdvertisedAddress(advertisedAddress);
this.conf.setAdvertisedListeners(
"public:pulsar://localhost:" + pulsarPort +
",public_http:http://localhost:" + httpPort +
",public_https:https://localhost:" + httpsPort);
this.conf.setBrokerServicePort(Optional.of(pulsarPort));
this.conf.setWebServicePort(Optional.of(httpPort));
this.conf.setWebServicePortTls(Optional.of(httpsPort));
}

@Test
public void testLookup() throws Exception {
HttpGet request =
new HttpGet(pulsar.getWebServiceAddress() + "/lookup/v2/topic/persistent/public/default/my-topic");
request.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
request.addHeader(HttpHeaders.ACCEPT, "application/json");

@Cleanup
CloseableHttpClient httpClient = HttpClients.createDefault();

@Cleanup
CloseableHttpResponse response = httpClient.execute(request);

HttpEntity entity = response.getEntity();
LookupData ld = ObjectMapperFactory.getThreadLocal().readValue(EntityUtils.toString(entity), LookupData.class);
System.err.println("Lookup data: " + ld);

assertEquals(new URI(ld.getBrokerUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrl()).getHost(), "localhost");
assertEquals(new URI(ld.getHttpUrlTls()).getHost(), "localhost");


// Produce data
@Cleanup
Producer<String> p = pulsarClient.newProducer(Schema.STRING)
.topic("my-topic")
.create();

p.send("hello");

// Verify we can get the correct HTTP redirect to the advertised listener
for (PulsarAdmin a : getAllAdmins()) {
TopicStats s = a.topics().getStats("my-topic");
assertEquals(s.getPublishers().size(), 1);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,29 @@ public class AdvertisedListener {
@Setter
// the broker service uri with ssl
private URI brokerServiceUrlTls;

//
@Getter
@Setter
// the broker service uri without ssl
private URI brokerHttpUrl;
//
@Getter
@Setter
// the broker service uri with ssl
private URI brokerHttpsUrl;

public boolean hasUriForProtocol(String protocol) {
if ("pulsar".equals(protocol)) {
return brokerServiceUrl != null;
} else if ("pulsar+ssl".equals(protocol)) {
return brokerServiceUrlTls != null;
} else if ("http".equals(protocol)) {
return brokerHttpUrl != null;
} else if ("https".equals(protocol)) {
return brokerHttpsUrl != null;
} else {
return false;
}
}
}

0 comments on commit 3f66c26

Please sign in to comment.