Skip to content

Commit

Permalink
Return more informative error message when trying to create subscript…
Browse files Browse the repository at this point in the history
…ion on non-persistent throug Rest API or pulsar-admin CLI. (apache#7831)

Fixes apache#7397

Motivation
When use pulsar-admin to create a subscription on a non-persistent topic, get the server error
This change return more informative error message when trying to create subscript ion on non-persistent through Rest API or pulsar-admin CLI.

Modifications
Currently when creating subscription is called with non-persistent topic service will try to create the subscription which will fail with casting exception when trying to cast NonPersistentSubscription to PersistentSubscription and client will see internal error.
Add check if create subscription command is called for a non-persistent topic before actually

Verifying this change
This change added tests and can be verified as follows:

Added unit test
Verified with local standalone
  • Loading branch information
MarvinCai authored Aug 21, 2020
1 parent dcc84c9 commit 7b60e2d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,7 @@ public void resetCursorOnPosition(@Suspended final AsyncResponse asyncResponse,
@ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
@ApiResponse(code = 405, message = "Not supported for partitioned topics") })
Expand All @@ -532,6 +533,10 @@ public void createSubscription(@Suspended final AsyncResponse asyncResponse, @Pa
@QueryParam("replicated") boolean replicated) {
try {
validateTopicName(property, cluster, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" +
"can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,7 @@ public void expireMessagesForAllSubscriptions(
@ApiOperation(value = "Create a subscription on the topic.", notes = "Creates a subscription on the topic at the specified message id")
@ApiResponses(value = {
@ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
@ApiResponse(code = 400, message = "Create subscription on non persistent topic is not supported"),
@ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
"subscriber is not authorized to access this operation"),
@ApiResponse(code = 403, message = "Don't have admin permission"),
Expand Down Expand Up @@ -1001,6 +1002,10 @@ public void createSubscription(
) {
try {
validateTopicName(tenant, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic" +
"can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Arrays;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response.Status;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
Expand Down Expand Up @@ -205,6 +206,18 @@ public void testGetSubscriptions() {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

@Test
public void testCreateSubscriptionForNonPersistentTopic() throws InterruptedException {
doReturn(TopicDomain.non_persistent.value()).when(persistentTopics).domain();
AsyncResponse response = mock(AsyncResponse.class);
ArgumentCaptor<WebApplicationException> responseCaptor = ArgumentCaptor.forClass(RestException.class);
persistentTopics.createSubscription(response, testTenant, testNamespace,
"testCreateSubscriptionForNonPersistentTopic", "sub",
true, (MessageIdImpl) MessageId.earliest, false);
verify(response, timeout(5000).times(1)).resume(responseCaptor.capture());
Assert.assertEquals(responseCaptor.getValue().getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
}

@Test
public void testTerminatePartitionedTopic() {
String testLocalTopicName = "topic-not-found";
Expand Down

0 comments on commit 7b60e2d

Please sign in to comment.