diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java index 1fd1d0770213a..40dd53b3bafca 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/PulsarServerException.java @@ -44,4 +44,10 @@ public NotFoundException(Throwable t) { super(t); } } + public static class InvalidTopicNameException extends PulsarServerException { + + public InvalidTopicNameException(String message) { + super(message); + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index fe5d1056e18b0..b00226e3a2991 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -38,6 +38,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.BrokerServiceException; @@ -504,6 +505,9 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllo if (e.getCause() instanceof RestException) { throw (RestException) e.getCause(); } + if (e.getCause() instanceof PulsarServerException.InvalidTopicNameException) { + throw new RestException(Status.PRECONDITION_FAILED, e.getCause().getMessage()); + } throw new RestException(e); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6f1ccc3b1ace1..0e66d3246ec0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2647,6 +2647,11 @@ public CompletableFuture fetchPartitionedTopicMetadata @SuppressWarnings("deprecation") private CompletableFuture createDefaultPartitionedTopicAsync(TopicName topicName) { + if (topicName.getLocalName().contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) { + return FutureUtil.failedFuture(new PulsarServerException. + InvalidTopicNameException( + String.format("Invalid topic name: %s , should not contain -partition-", topicName))); + } final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName); final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); checkArgument(defaultNumPartitions > 0, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index b906941770413..6d3ec63778582 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -29,14 +29,14 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; - import java.lang.reflect.Field; +import java.net.URI; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.TimeUnit; - import com.google.common.collect.Sets; +import lombok.Cleanup; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; @@ -44,6 +44,8 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageRoutingMode; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.naming.NamespaceBundle; @@ -51,6 +53,7 @@ import org.apache.pulsar.common.policies.data.Policies; import org.apache.pulsar.common.policies.data.TopicStats; import org.awaitility.Awaitility; +import org.junit.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -271,4 +274,38 @@ public void testPersistentPartitionedTopicUnload() throws Exception { producer.close(); } } + + @Test + public void testAutoCreatePartitionedTopicThatNameIncludePartition() throws Exception { + final String topicName = "persistent://prop/autoNs/failedcreate-partition-abcde"; + final String ns = "prop/autoNs"; + admin.namespaces().createNamespace(ns); + pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned"); + try { + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topicName) + .create(); + Assert.fail("unexpected operation"); + } catch (PulsarClientException ex) { + Assert.assertTrue(ex.getMessage() + .contains("Invalid topic name")); + } + Assert.assertEquals(admin.topics().getList(ns).size(), 0); + URI tcpLookupUrl = new URI(pulsar.getBrokerServiceUrl()); + PulsarClient client = PulsarClient.builder() + .serviceUrl(tcpLookupUrl.toString()) + .build(); + try { + @Cleanup + Producer producer = client.newProducer() + .topic(topicName) + .create(); + Assert.fail("unexpected operation"); + } catch (PulsarClientException ex) { + Assert.assertTrue(ex.getMessage() + .contains("Invalid topic name")); + } + Assert.assertEquals(admin.topics().getList(ns).size(), 0); + pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned"); + } }