diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 98d370f26084f..cafb183c332c8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -282,7 +282,7 @@ private CompletableFuture tryCreatePartitionAsync(final int partition, Com } else if (KeeperException.Code.NODEEXISTS.intValue() == rc) { log.info("[{}] Topic partition {} is exists, doing nothing.", clientAppId(), topicName.getPartition(partition)); - result.completeExceptionally(KeeperException.create(KeeperException.Code.NODEEXISTS)); + result.complete(null); } else if (KeeperException.Code.BADVERSION.intValue() == rc) { log.warn("[{}] Fail to create topic partition {} with concurrent modification, retry now.", clientAppId(), topicName.getPartition(partition)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 26dcaebd4cb62..ff4463d96a90d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -66,6 +66,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.admin.AdminResource; import org.apache.pulsar.broker.admin.ZkAdminPaths; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; @@ -527,11 +528,12 @@ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateL } return; } - + if (numPartitions <= 0) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } try { + tryCreatePartitionsAsync(numPartitions).get(); updatePartitionedTopic(topicName, numPartitions).get(); } catch (Exception e) { if (e.getCause() instanceof RestException) { @@ -1663,7 +1665,8 @@ protected void internalCreateSubscription(AsyncResponse asyncResponse, String su if (topicName.isPartitioned()) { internalCreateSubscriptionForNonPartitionedTopic(asyncResponse, subscriptionName, targetMessageId, authoritative, replicated); } else { - getPartitionedTopicMetadataAsync(topicName, authoritative, false).thenAccept(partitionMetadata -> { + boolean allowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation(); + getPartitionedTopicMetadataAsync(topicName, authoritative, allowAutoTopicCreation).thenAccept(partitionMetadata -> { final int numPartitions = partitionMetadata.partitions; if (numPartitions > 0) { final CompletableFuture future = new CompletableFuture<>(); @@ -1740,7 +1743,10 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn MessageIdImpl targetMessageId, boolean authoritative, boolean replicated) { try { validateAdminAccessForSubscriber(subscriptionName, authoritative); - PersistentTopic topic = (PersistentTopic) getOrCreateTopic(topicName); + + boolean isAllowAutoTopicCreation = pulsar().getConfiguration().isAllowAutoTopicCreation(); + PersistentTopic topic = (PersistentTopic) pulsar().getBrokerService() + .getTopic(topicName.toString(), isAllowAutoTopicCreation).thenApply(Optional::get).join(); if (topic.getSubscriptions().containsKey(subscriptionName)) { asyncResponse.resume(new RestException(Status.CONFLICT, "Subscription already exists for topic")); return; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java index 05b6f03064f0c..3e906ef07633a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -320,4 +320,48 @@ public void testAutoCreationNamespaceAllowOverridesBrokerOnProduce() throws Exce assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicString)); assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicString)); } + + + @Test + public void testNotAllowSubscriptionTopicCreation() throws Exception{ + pulsar.getConfiguration().setAllowAutoTopicCreation(false); + String topicName = "persistent://prop/ns-abc/non-partitioned-topic" + System.currentTimeMillis(); + String subscriptionName = "non-partitioned-topic-sub"; + + try { + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + fail("should fail to create subscription once not allowAutoTopicCreation"); + } catch (Exception e) { + // expected + } + + try { + admin.topics().createSubscription(topicName + "-partition-0", + subscriptionName, MessageId.earliest); + fail("should fail to create subscription once not allowAutoTopicCreation"); + } catch (Exception e) { + // expected + } + + assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); + assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName)); + + try { + admin.topics().createNonPartitionedTopic(topicName); + admin.topics().createSubscription(topicName, subscriptionName, MessageId.earliest); + } catch (Exception e) { + // expected + fail("should success to create subscription once topic created"); + } + + try { + String partitionTopic = "persistent://prop/ns-abc/partitioned-topic" + System.currentTimeMillis(); + admin.topics().createPartitionedTopic(partitionTopic, 1); + admin.topics().createSubscription(partitionTopic + "-partition-0", subscriptionName, MessageId.earliest); + } catch (Exception e) { + // expected + fail("should success to create subscription once topic created"); + } + + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java index 8fad844bc8a8e..45033b591359a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionCreationTest.java @@ -104,7 +104,7 @@ public void testCreateConsumerForNonPartitionedTopicWhenEnableTopicAutoCreation( } @Test - public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws PulsarClientException, PulsarAdminException { + public void testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation() throws Exception { conf.setAllowAutoTopicCreation(false); final String topic = "testCreateConsumerForPartitionedTopicUpdateWhenDisableTopicAutoCreation-" + System.currentTimeMillis(); admin.topics().createPartitionedTopic(topic, 3);