Skip to content

Commit

Permalink
fix wrong result for looking up a non-exist topic by rest api (apache…
Browse files Browse the repository at this point in the history
…#13055)

Fixes apache#13028

### Motivation

For now, the result of the `lookup` command for a topic not existing by the rest API will return a brokerUrl.
This pull request resolves this problem.

### Modifications
Add topic exist check before get the brokerUrl
1. if not exist, return a ResetException with status code NOT_FOUND
2. if exist, get the brokerUrl

### Verifying this change

This change added tests and can be verified as follows:
HttpTopicLookupv2Test.testLookupTopicNotExist
  • Loading branch information
aloyszhang authored Dec 14, 2021
1 parent 1891f6e commit c94c81c
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,54 +80,66 @@ protected void internalLookupTopicAsync(TopicName topicName, boolean authoritati
return;
}

CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName,
LookupOptions.builder().advertisedListenerName(listenerName)
.authoritative(authoritative).loadTopicsInBundle(false).build());

lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
log.warn("No broker was found available for topic {}", topicName);
completeLookupResponseExceptionally(asyncResponse,
new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
pulsar().getNamespaceService().checkTopicExists(topicName).thenAccept(exist -> {
if (!exist && !pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)) {
completeLookupResponseExceptionally(asyncResponse, new RestException(Response.Status.NOT_FOUND,
"Topic not found."));
return;
}
CompletableFuture<Optional<LookupResult>> lookupFuture = pulsar().getNamespaceService()
.getBrokerServiceUrlAsync(topicName,
LookupOptions.builder().advertisedListenerName(listenerName)
.authoritative(authoritative).loadTopicsInBundle(false).build());

LookupResult result = optionalResult.get();
// We have found either a broker that owns the topic, or a broker to which we should redirect the client to
if (result.isRedirect()) {
boolean newAuthoritative = result.isAuthoritativeRedirect();
URI redirect;
try {
String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
: result.getLookupData().getHttpUrl();
checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
String path = String.format("%s%s%s?authoritative=%s",
redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
path = listenerName == null ? path : path + "&listenerName=" + listenerName;
redirect = new URI(path);
} catch (URISyntaxException | NullPointerException e) {
log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
completeLookupResponseExceptionally(asyncResponse, e);
lookupFuture.thenAccept(optionalResult -> {
if (optionalResult == null || !optionalResult.isPresent()) {
log.warn("No broker was found available for topic {}", topicName);
completeLookupResponseExceptionally(asyncResponse,
new WebApplicationException(Response.Status.SERVICE_UNAVAILABLE));
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
}
completeLookupResponseExceptionally(asyncResponse,
new WebApplicationException(Response.temporaryRedirect(redirect).build()));

} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, result.getLookupData());
LookupResult result = optionalResult.get();
// We have found either a broker that owns the topic, or a broker to
// which we should redirect the client to
if (result.isRedirect()) {
boolean newAuthoritative = result.isAuthoritativeRedirect();
URI redirect;
try {
String redirectUrl = isRequestHttps() ? result.getLookupData().getHttpUrlTls()
: result.getLookupData().getHttpUrl();
checkNotNull(redirectUrl, "Redirected cluster's service url is not configured");
String lookupPath = topicName.isV2() ? LOOKUP_PATH_V2 : LOOKUP_PATH_V1;
String path = String.format("%s%s%s?authoritative=%s",
redirectUrl, lookupPath, topicName.getLookupName(), newAuthoritative);
path = listenerName == null ? path : path + "&listenerName=" + listenerName;
redirect = new URI(path);
} catch (URISyntaxException | NullPointerException e) {
log.error("Error in preparing redirect url for {}: {}", topicName, e.getMessage(), e);
completeLookupResponseExceptionally(asyncResponse, e);
return;
}
if (log.isDebugEnabled()) {
log.debug("Redirect lookup for topic {} to {}", topicName, redirect);
}
completeLookupResponseExceptionally(asyncResponse,
new WebApplicationException(Response.temporaryRedirect(redirect).build()));

} else {
// Found broker owning the topic
if (log.isDebugEnabled()) {
log.debug("Lookup succeeded for topic {} -- broker: {}", topicName, result.getLookupData());
}
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
completeLookupResponseSuccessfully(asyncResponse, result.getLookupData());
}
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topicName, exception.getMessage(), exception);
completeLookupResponseExceptionally(asyncResponse, exception);
}).exceptionally(exception -> {
log.warn("Failed to lookup broker for topic {}: {}", topicName, exception.getMessage(), exception);
completeLookupResponseExceptionally(asyncResponse, exception);
return null;
});
}).exceptionally(e -> {
log.warn("Failed to check exist for topic {}: {} when lookup", topicName, e.getMessage(), e);
completeLookupResponseExceptionally(asyncResponse, e);
return null;
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.lookup.http;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -49,6 +50,7 @@
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.Policies;
import org.mockito.ArgumentCaptor;
Expand Down Expand Up @@ -124,7 +126,44 @@ public void crossColoLookup() throws Exception {
WebApplicationException wae = (WebApplicationException) arg.getValue();
assertEquals(wae.getResponse().getStatus(), Status.TEMPORARY_REDIRECT.getStatusCode());
}


@Test
public void testLookupTopicNotExist() throws Exception {

MockTopicLookup destLookup = spy(new MockTopicLookup());
doReturn(false).when(destLookup).isRequestHttps();
destLookup.setPulsar(pulsar);
doReturn("null").when(destLookup).clientAppId();
Field uriField = PulsarWebResource.class.getDeclaredField("uri");
uriField.setAccessible(true);
UriInfo uriInfo = mock(UriInfo.class);
uriField.set(destLookup, uriInfo);
URI uri = URI.create("http://localhost:8080/lookup/v2/destination/topic/myprop/usc/ns2/topic1");
doReturn(uri).when(uriInfo).getRequestUri();
doReturn(true).when(config).isAuthorizationEnabled();

NamespaceService namespaceService = pulsar.getNamespaceService();
CompletableFuture<Boolean> future = new CompletableFuture<>();
future.complete(false);
doReturn(future).when(namespaceService).checkTopicExists(any(TopicName.class));

AsyncResponse asyncResponse1 = mock(AsyncResponse.class);
destLookup.lookupTopicAsync(TopicDomain.persistent.value(), "myprop", "usc", "ns2", "topic_not_exist", false,
asyncResponse1, null, null);

ArgumentCaptor<Throwable> arg = ArgumentCaptor.forClass(Throwable.class);
verify(asyncResponse1).resume(arg.capture());
assertEquals(arg.getValue().getClass(), RestException.class);
RestException restException = (RestException) arg.getValue();
assertEquals(restException.getResponse().getStatus(), Status.NOT_FOUND.getStatusCode());
}

static class MockTopicLookup extends TopicLookup {
@Override
protected void validateClusterOwnership(String s) {
// do nothing
}
}

@Test
public void testNotEnoughLookupPermits() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.pulsar.zookeeper.ZookeeperServerTest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
Expand Down Expand Up @@ -112,12 +113,16 @@ public void testNonPersistentPartitionsAreNotAutoCreatedWhenThePartitionedTopicD
final String topicPartitionName = "non-persistent://public/default/issue-9173-partition-0";

// Then error when subscribe to a partition of a non-persistent topic that does not exist
assertThrows(PulsarClientException.TopicDoesNotExistException.class,
assertThrows(PulsarClientException.NotFoundException.class,
() -> pulsarClient.newConsumer().topic(topicPartitionName).subscriptionName("sub-issue-9173").subscribe());

// Then error when produce to a partition of a non-persistent topic that does not exist
assertThrows(PulsarClientException.TopicDoesNotExistException.class,
() -> pulsarClient.newProducer().topic(topicPartitionName).create());
try {
pulsarClient.newProducer().topic(topicPartitionName).create();
Assert.fail("Should failed due to topic not exist");
} catch (Exception e) {
assertTrue(e instanceof PulsarClientException.NotFoundException);
}
} finally {
conf.setAllowAutoTopicCreation(defaultAllowAutoTopicCreation);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4112,8 +4112,7 @@ public void testTopicDoesNotExists() throws Exception {
.topic(topic).subscriptionName("sub").subscribe();
fail("should fail");
} catch (Exception e) {
String retryTopic = topic + "-sub-RETRY";
assertTrue(e.getMessage().contains("Topic " + retryTopic + " does not exist"));
assertTrue(e instanceof PulsarClientException.NotFoundException);
} finally {
conf.setAllowAutoTopicCreation(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public void testCreateProducerOnNotExistsTopic() throws PulsarClientException, I
.sendTimeout(100, TimeUnit.MILLISECONDS)
.create();
Assert.fail("Create producer should failed while topic does not exists.");
} catch (PulsarClientException ignore) {
} catch (PulsarClientException e) {
Assert.assertTrue(e instanceof PulsarClientException.NotFoundException);
}
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl) pulsarClient).timer();
Assert.assertEquals(timer.pendingTimeouts(), 0);
Assert.assertEquals(((PulsarClientImpl) pulsarClient).producersCount(), 0);
}

Expand Down

0 comments on commit c94c81c

Please sign in to comment.