Skip to content

Commit

Permalink
Support listenerName for HttpLookupService (apache#10319)
Browse files Browse the repository at this point in the history
### Motivation
Support listenerName for http lookup

### Modifications
Add listenerName to http API

### Verifying this change
1) test lookup
2) test redirect
  • Loading branch information
315157973 authored May 6, 2021
1 parent c1d5162 commit fdbc5b2
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public class TopicLookupBase extends PulsarWebResource {
private static final String LOOKUP_PATH_V1 = "/lookup/v2/destination/";
private static final String LOOKUP_PATH_V2 = "/lookup/v2/topic/";

protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative, AsyncResponse asyncResponse) {
protected void internalLookupTopicAsync(TopicName topicName, boolean authoritative,
AsyncResponse asyncResponse, String listenerName) {
if (!pulsar().getBrokerService().getLookupRequestSemaphore().tryAcquire()) {
log.warn("No broker was found available for topic {}", topicName);
asyncResponse.resume(new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
Expand All @@ -77,7 +78,8 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati

CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName,
LookupOptions.builder().authoritative(authoritative).loadTopicsInBundle(false).build());
LookupOptions.builder().advertisedListenerName(listenerName)
.authoritative(authoritative).loadTopicsInBundle(false).build());

lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
Expand All @@ -97,8 +99,10 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
: result.getLookupData().getHttpUrl();
checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
redirect = new URI(String.format("%s%s%s?authoritative=%s", redirectUrl, lookupPath,
topicName.getLookupName(), newAuthoritative));
String path = String.format("%s%s%s?authoritative=%s",
redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
path = listenerName == null ? path : path + "&listenerName=" + listenerName;
redirect = new URI(path);
} catch (URISyntaxException | NullPointerException e) {
log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
completeLookupResponseExceptionally(asyncResponse, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,10 @@ public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @Pat
@PathParam("cluster") String cluster, @PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended AsyncResponse asyncResponse) {
@Suspended AsyncResponse asyncResponse,
@QueryParam("listenerName") String listenerName) {
TopicName topicName = getTopicName(topicDomain, property, cluster, namespace, encodedTopic);
internalLookupTopicAsync(topicName, authoritative, asyncResponse);
internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ public class TopicLookup extends TopicLookupBase {
public void lookupTopicAsync(@PathParam("topic-domain") String topicDomain, @PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@Suspended AsyncResponse asyncResponse) {
@Suspended AsyncResponse asyncResponse,
@QueryParam("listenerName") String listenerName) {
TopicName topicName = getTopicName(topicDomain, tenant, namespace, encodedTopic);
internalLookupTopicAsync(topicName, authoritative, asyncResponse);
internalLookupTopicAsync(topicName, authoritative, asyncResponse, listenerName);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void crossColoLookup() throws Exception {

AsyncResponse asyncResponse = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
asyncResponse);
asyncResponse, null);

ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse).resume(arg.capture());
Expand Down Expand Up @@ -162,7 +162,7 @@ public void testNotEnoughLookupPermits() throws Exception {

AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic1", false,
asyncResponse1);
asyncResponse1, null);

ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
Expand Down Expand Up @@ -198,15 +198,15 @@ public void testValidateReplicationSettingsOnNamespace() throws Exception {

AsyncResponse asyncResponse = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns1, "empty-cluster",
false, asyncResponse);
false, asyncResponse, null);

ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse).resume(arg.capture());
assertEquals(arg.getValue().getClass(), RestException.class);

AsyncResponse asyncResponse2 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), property, cluster, ns2,
"invalid-localCluster", false, asyncResponse2);
"invalid-localCluster", false, asyncResponse2, null);
ArgumentCaptor<Throwable> arg2 = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse2).resume(arg2.capture());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,25 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import lombok.Cleanup;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.lookup.LookupResult;
import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.HttpLookupService;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
Expand All @@ -42,12 +51,16 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;

@Test(groups = "broker-api")
public class PulsarMultiListenersWithInternalListenerNameTest extends MockedPulsarServiceBaseTest {
private final boolean withInternalListener;
private ExecutorService executorService;
private String hostAndBrokerPort;
private String hostAndBrokerPortSsl;
private EventLoopGroup eventExecutors;

public PulsarMultiListenersWithInternalListenerNameTest() {
this(true);
Expand All @@ -61,6 +74,7 @@ protected PulsarMultiListenersWithInternalListenerNameTest(boolean withInternalL
@Override
protected void setup() throws Exception {
this.executorService = Executors.newFixedThreadPool(1);
this.eventExecutors = new NioEventLoopGroup();
this.isTcpLookup = true;
String host = InetAddress.getLocalHost().getHostAddress();
int brokerPort = getFreePort();
Expand Down Expand Up @@ -93,16 +107,27 @@ protected void doInitConf() throws Exception {
protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
clientBuilder.listenerName("internal");
}

@Test
public void testFindBrokerWithListenerName() throws Throwable {
public void testFindBrokerWithListenerName() throws Exception {
admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAllowedClusters(Sets.newHashSet("localhost"));
this.admin.tenants().createTenant("public", tenantInfo);
this.admin.namespaces().createNamespace("public/default");

doFindBrokerWithListenerName(true);
doFindBrokerWithListenerName(false);
}

private void doFindBrokerWithListenerName(boolean useHttp) throws Exception {
ClientConfigurationData conf = new ClientConfigurationData();
conf.setListenerName("internal");
conf.setServiceUrl(pulsar.getWebServiceAddress());
conf.setMaxLookupRedirects(10);

@Cleanup
LookupService lookupService = new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
LookupService lookupService = useHttp ? new HttpLookupService(conf, eventExecutors) :
new BinaryProtoLookupService((PulsarClientImpl) this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
// test request 1
{
Expand All @@ -122,12 +147,51 @@ public void testFindBrokerWithListenerName() throws Throwable {
}
}

@Test
public void testHttpLookupRedirect() throws Exception {
admin.clusters().createCluster("localhost", new ClusterData(pulsar.getWebServiceAddress()));
TenantInfo tenantInfo = new TenantInfo();
tenantInfo.setAllowedClusters(Sets.newHashSet("localhost"));
this.admin.tenants().createTenant("public", tenantInfo);
this.admin.namespaces().createNamespace("public/default");
ClientConfigurationData conf = new ClientConfigurationData();
conf.setListenerName("internal");
conf.setServiceUrl(pulsar.getWebServiceAddress());
conf.setMaxLookupRedirects(10);

@Cleanup
HttpLookupService lookupService = new HttpLookupService(conf, eventExecutors);
NamespaceService namespaceService = pulsar.getNamespaceService();

LookupResult lookupResult = new LookupResult(pulsar.getWebServiceAddress(), null,
pulsar.getBrokerServiceUrl(), null, true);
Optional<LookupResult> optional = Optional.of(lookupResult);
String address = "192.168.0.1:8080";
String httpAddress = "192.168.0.1:8081";
NamespaceEphemeralData namespaceEphemeralData = new NamespaceEphemeralData("pulsar://" + address,
null, "http://" + httpAddress, null, false);
LookupResult lookupResult2 = new LookupResult(namespaceEphemeralData);
Optional<LookupResult> optional2 = Optional.of(lookupResult2);
doReturn(CompletableFuture.completedFuture(optional), CompletableFuture.completedFuture(optional2))
.when(namespaceService).getBrokerServiceUrlAsync(any(), any());

CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> future =
lookupService.getBroker(TopicName.get("persistent://public/default/test"));

Pair<InetSocketAddress, InetSocketAddress> result = future.get(10, TimeUnit.SECONDS);
Assert.assertEquals(result.getKey().toString(), address);
Assert.assertEquals(result.getValue().toString(), address);
}

@AfterMethod(alwaysRun = true)
@Override
protected void cleanup() throws Exception {
if (this.executorService != null) {
this.executorService.shutdownNow();
}
if (eventExecutors != null) {
eventExecutors.shutdownGracefully();
}
super.internalCleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.impl.schema.SchemaUtils;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace.Mode;
Expand All @@ -48,6 +49,7 @@
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -56,6 +58,7 @@ public class HttpLookupService implements LookupService {

private final HttpClient httpClient;
private final boolean useTls;
private final String listenerName;

private static final String BasePathV1 = "lookup/v2/destination/";
private static final String BasePathV2 = "lookup/v2/topic/";
Expand All @@ -64,6 +67,7 @@ public HttpLookupService(ClientConfigurationData conf, EventLoopGroup eventLoopG
throws PulsarClientException {
this.httpClient = new HttpClient(conf, eventLoopGroup);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
}

@Override
Expand All @@ -77,11 +81,14 @@ public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
* @param topicName topic-name
* @return broker-socket-address that serves given topic
*/
@Override
@SuppressWarnings("deprecation")
public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
String basePath = topicName.isV2() ? BasePathV2 : BasePathV1;

return httpClient.get(basePath + topicName.getLookupName(), LookupData.class).thenCompose(lookupData -> {
String path = basePath + topicName.getLookupName();
path = StringUtils.isBlank(listenerName) ? path : path + "?listenerName=" + Codec.encode(listenerName);
return httpClient.get(path, LookupData.class)
.thenCompose(lookupData -> {
// Convert LookupData into as SocketAddress, handling exceptions
URI uri = null;
try {
Expand Down

0 comments on commit fdbc5b2

Please sign in to comment.