Skip to content

Commit

Permalink
[Issuse 13640][broker] Fix non persistent topic subscription error. (a…
Browse files Browse the repository at this point in the history
…pache#13685)

Fixes apache#13640 

### Motivation

If pulsar broker started with `allowAutoSubscriptionCreation=false`, there are no way to subscribe non persistent topic.

### Modifications

Add `isPersistent` check in `ServerCnx`.
  • Loading branch information
dragonls authored Jan 18, 2022
1 parent 2262a5d commit 3dbe418
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ public void createSubscription(@Suspended final AsyncResponse asyncResponse, @Pa
try {
validateTopicName(property, cluster, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic"
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
internalCreateSubscription(asyncResponse, decode(encodedSubName), messageId, authoritative, replicated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1348,7 +1348,7 @@ public void createSubscription(
try {
validateTopicName(tenant, namespace, topic);
if (!topicName.isPersistent()) {
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic"
throw new RestException(Response.Status.BAD_REQUEST, "Create subscription on non-persistent topic "
+ "can only be done through client");
}
MessageIdImpl messageId = resetCursorData == null ? null :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) {

boolean rejectSubscriptionIfDoesNotExist = isDurable
&& !service.isAllowAutoSubscriptionCreation(topicName.toString())
&& !topic.getSubscriptions().containsKey(subscriptionName);
&& !topic.getSubscriptions().containsKey(subscriptionName)
&& topic.isPersistent();

if (rejectSubscriptionIfDoesNotExist) {
return FutureUtil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,8 @@ default boolean isSystemTopic() {
return false;
}

boolean isPersistent();

/* ------ Transaction related ------ */

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1106,4 +1106,9 @@ public CompletableFuture<Void> truncate() {
protected boolean isTerminated() {
return false;
}

@Override
public boolean isPersistent() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2905,6 +2905,11 @@ public boolean isSystemTopic() {
return false;
}

@Override
public boolean isPersistent() {
return true;
}

private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,18 @@ public void testAutoSubscriptionCreationNamespaceDisallowOverridesBroker() throw
assertFalse(admin.topics().getSubscriptions(topicName.toString()).contains(subscriptionName));
}

@Test
public void testNonPersistentTopicSubscriptionCreationWithAutoCreationDisable() throws Exception {
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(false);

final String topicName = "non-persistent://prop/ns-abc/test-subtopic-" + testId.getAndIncrement();
final String subscriptionName = "test-subtopic-sub";

admin.topics().createNonPartitionedTopic(topicName);

// Subscribe operation should be successful
pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
assertTrue(admin.topics().getSubscriptions(topicName).contains(subscriptionName));
}

}

0 comments on commit 3dbe418

Please sign in to comment.