Skip to content

Commit

Permalink
Not allow sub auto create by admin when disable topic auto create (ap…
Browse files Browse the repository at this point in the history
…ache#6685)

### Motivation

Not allow sub auto create by admin when disable topic auto create

### Modifications

change admin code to not allow sub auto create by admin when disable topic auto create
add tests

### Verifying this change
ut passed

* fix sub auto created by admin
* fix test error: create sub partition when update it
* fix flaky test
  • Loading branch information
jiazhai authored Apr 8, 2020
1 parent 4d29bc4 commit 21f6dcd
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ private CompletableFuture<Void> tryCreatePartitionAsync(final int partition, Com
} else if (KeeperException.Code.NODEEXISTS.intValue() == rc) {
log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(),
topicName.getPartition(partition));
result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS));
result.complete(null);
} else if (KeeperException.Code.BADVERSION.intValue() == rc) {
log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.",
clientAppId(), topicName.getPartition(partition));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.admin.ZkAdminPaths;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
Expand Down Expand Up @@ -527,11 +528,12 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL
}
return;
}

if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
try {
tryCreatePartitionsAsync(numPartitions).get();
updatePartitionedTopic(topicName, numPartitions).get();
} catch (Exception e) {
if (e.getCause() instanceof RestException) {
Expand Down Expand Up @@ -1663,7 +1665,8 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su
if (topicName.isPartitioned()) {
internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated);
} else {
getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> {
boolean allowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation();
getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> {
final int numPartitions = partitionMetadata.partitions;
if (numPartitions > 0) {
final CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -1740,7 +1743,10 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn
MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) {
try {
validateAdminAccessForSubscriber(subscriptionName, authoritative);
PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName);

boolean isAllowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation();
PersistentTopic topic = (PersistentTopic) pulsar().getBrokerService()
.getTopic(topicName.toString(), isAllowAutoTopicCreation).thenApply(Optional::get).join();
if (topic.getSubscriptions().containsKey(subscriptionName)) {
asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic"));
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,4 +320,48 @@ public void testAutoCreationNamespaceAllowOverridesBrokerOnProduce() throws Exce
assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString));
}


@Test
public void testNotAllowSubscriptionTopicCreation() throws Exception{
pulsar.getConfiguration().setAllowAutoTopicCreation(false);
String topicName = "persistent://prop/ns-abc/non-partitioned-topic" + System.currentTimeMillis();
String subscriptionName = "non-partitioned-topic-sub";

try {
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
fail("should fail to create subscription once not allowAutoTopicCreation");
} catch (Exception e) {
// expected
}

try {
admin.topics().createSubscription(topicName + "-partition-0",
subscriptionName, MessageId.earliest);
fail("should fail to create subscription once not allowAutoTopicCreation");
} catch (Exception e) {
// expected
}

assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName));
assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName));

try {
admin.topics().createNonPartitionedTopic(topicName);
admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest);
} catch (Exception e) {
// expected
fail("should success to create subscription once topic created");
}

try {
String partitionTopic = "persistent://prop/ns-abc/partitioned-topic" + System.currentTimeMillis();
admin.topics().createPartitionedTopic(partitionTopic, 1);
admin.topics().createSubscription(partitionTopic + "-partition-0", subscriptionName, MessageId.earliest);
} catch (Exception e) {
// expected
fail("should success to create subscription once topic created");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation(
}

@Test
public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws PulsarClientException, PulsarAdminException {
public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws Exception {
conf.setAllowAutoTopicCreation(false);
final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" + System.currentTimeMillis();
admin.topics().createPartitionedTopic(topic, 3);
Expand Down

0 comments on commit 21f6dcd

Please sign in to comment.