Skip to content

Commit

Permalink
Bugfix an unload non-existent topic (apache#3946)
Browse files Browse the repository at this point in the history
*Motivation*

Fixes apache#3935

When an unload operation is being made to an non-existent topic a null pointer
exception is thrown due to bad handling exception on general catch clause.

*Modifications*

  - Refactor getTopicReference() for a simplified version.
  - Simplify exception thrown when partitioned topics was not found.
  - Fix NPE when unload operation is being made with a non-existent topic.
  - Add test in order to assert unload on normal situation.
  - Add test to assert npe from unload with non existent topic.
  • Loading branch information
lovelle authored and jiazhai committed Apr 1, 2019
1 parent b6c509a commit e36ae70
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1309,24 +1309,25 @@ public static CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMet
*/
private Topic getTopicReference(TopicName topicName) {
return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
.orElseThrow(() -> {
if (topicName.toString().contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
TopicName partitionTopicName = TopicName.get(topicName.getPartitionedTopicName());
PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(partitionTopicName, false);
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String errSrc;
if (partitionedTopicMetadata != null) {
errSrc = " has zero partitions";
} else {
errSrc = " has no metadata";
}
return new RestException(Status.NOT_FOUND, "Partitioned Topic not found: " + topicName.toString() + errSrc);
} else if (!internalGetList().contains(topicName.toString())) {
return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
}
return new RestException(Status.NOT_FOUND, "Topic not found");
});
.orElseThrow(() -> topicNotFoundReason(topicName));
}

private RestException topicNotFoundReason(TopicName topicName) {
if (!topicName.isPartitioned()) {
return new RestException(Status.NOT_FOUND, "Topic not found");
}

PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata(
TopicName.get(topicName.getPartitionedTopicName()), false);
if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) {
final String topicErrorType = partitionedTopicMetadata == null ?
"has no metadata" : "has zero partitions";
return new RestException(Status.NOT_FOUND, String.format(
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
} else if (!internalGetList().contains(topicName.toString())) {
return new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
}
return new RestException(Status.NOT_FOUND, "Partitioned Topic not found");
}

private Topic getOrCreateTopic(TopicName topicName) {
Expand Down Expand Up @@ -1463,8 +1464,8 @@ protected void unloadTopic(TopicName topicName, boolean authoritative) {
log.error("[{}] topic {} not found", clientAppId(), topicName);
throw new RestException(Status.NOT_FOUND, "Topic does not exist");
} catch (Exception e) {
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getCause().getMessage(), e);
throw new RestException(e.getCause());
log.error("[{}] Failed to unload topic {}, {}", clientAppId(), topicName, e.getMessage(), e);
throw new RestException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import javax.ws.rs.core.Response;
import javax.ws.rs.core.UriInfo;
import java.lang.reflect.Field;
import java.util.List;
Expand Down Expand Up @@ -140,4 +141,21 @@ public void testCreateNonPartitionedTopic() {
testTenant, testNamespace, topicName, true);
Assert.assertEquals(pMetadata.partitions, 0);
}

@Test
public void testUnloadTopic() {
final String topicName = "standard-topic-to-be-unload";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
persistentTopics.unloadTopic(testTenant, testNamespace, topicName, true);
}

@Test(expectedExceptions = RestException.class)
public void testUnloadTopicShallThrowNotFoundWhenTopicNotExist() {
try {
persistentTopics.unloadTopic(testTenant, testNamespace,"non-existent-topic", true);
} catch (RestException e) {
Assert.assertEquals(e.getResponse().getStatus(), Response.Status.NOT_FOUND.getStatusCode());
throw e;
}
}
}

0 comments on commit e36ae70

Please sign in to comment.