diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index a13884a21e63c..d8cde658d6ec2 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -376,6 +376,7 @@ protected void internalCreatePartitionedTopic(int numPartitions) { if (numPartitions <= 0) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } + validatePartitionTopicName(topicName.getLocalName()); try { boolean topicExist = pulsar().getNamespaceService() .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) @@ -411,7 +412,7 @@ protected void internalCreatePartitionedTopic(int numPartitions) { protected void internalCreateNonPartitionedTopic(boolean authoritative) { validateAdminAccessForTenant(topicName.getTenant()); - + validateNonPartitionTopicName(topicName.getLocalName()); if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } @@ -439,6 +440,10 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) { */ protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) { validateAdminAccessForTenant(topicName.getTenant()); + // Only do the validation if it's the first hop. + if (!updateLocalTopicOnly) { + validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions); + } if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) { Set clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject()); @@ -1903,6 +1908,127 @@ private void validateClientVersion() { return; } + /** + * Validate update of number of partition for partitioned topic. + * If there's already non partition topic with same name and contains partition suffix "-partition-" + * followed by numeric value X then the new number of partition of that partitioned topic can not be greater + * than that X else that non partition topic will essentially be overwritten and cause unexpected consequence. + * + * @param topicName + */ + private void validatePartitionTopicUpdate(String topicName, int numberOfPartition) { + List existingTopicList = internalGetList(); + TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName); + PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false); + int oldPartition = metadata.partitions; + String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX; + for (String exsitingTopicName : existingTopicList) { + if (exsitingTopicName.contains(prefix)) { + try { + long suffix = Long.parseLong(exsitingTopicName.substring( + exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX) + + TopicName.PARTITIONED_TOPIC_SUFFIX.length())); + // Skip partition of partitioned topic by making sure the numeric suffix greater than old partition number. + if (suffix >= oldPartition && suffix <= (long) numberOfPartition) { + log.warn("[{}] Already have non partition topic {} which contains partition " + + "suffix '-partition-' and end with numeric value smaller than the new number of partition. " + + "Update of partitioned topic {} could cause conflict.", clientAppId(), exsitingTopicName, topicName); + throw new RestException(Status.PRECONDITION_FAILED, + "Already have non partition topic" + exsitingTopicName + " which contains partition suffix '-partition-' " + + "and end with numeric value and end with numeric value smaller than the new " + + "number of partition. Update of partitioned topic " + topicName + " could cause conflict."); + } + } catch (NumberFormatException e) { + // Do nothing, if value after partition suffix is not pure numeric value, + // as it can't conflict with internal created partitioned topic's name. + } + } + } + } + + /** + * Validate partitioned topic name. + * Validation will fail and throw RestException if + * 1) There's already a partitioned topic with same topic name and have some of its partition created. + * 2) There's already non partition topic with same name and contains partition suffix "-partition-" + * followed by numeric value. In this case internal created partition of partitioned topic could override + * the existing non partition topic. + * + * @param topicName + */ + private void validatePartitionTopicName(String topicName) { + List existingTopicList = internalGetList(); + String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX; + for (String existingTopicName : existingTopicList) { + if (existingTopicName.contains(prefix)) { + try { + Long.parseLong(existingTopicName.substring( + existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX) + + TopicName.PARTITIONED_TOPIC_SUFFIX.length())); + log.warn("[{}] Already have topic {} which contains partition " + + "suffix '-partition-' and end with numeric value. Creation of partitioned topic {}" + + "could cause conflict.", clientAppId(), existingTopicName, topicName); + throw new RestException(Status.PRECONDITION_FAILED, + "Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " + + "and end with numeric value, Creation of partitioned topic " + topicName + + " could cause conflict."); + } catch (NumberFormatException e) { + // Do nothing, if value after partition suffix is not pure numeric value, + // as it can't conflict with internal created partitioned topic's name. + } + } + } + } + + /** + * Validate non partition topic name, + * Validation will fail and throw RestException if + * 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition + * suffix is numeric value larger than the number of partition if there's already a partition topic with same + * name(the part before suffix "-partition-"). + * 2)Topic name contains partition suffix "-partition-" and the remaining part follow the partition + * suffix is numeric value but there isn't a partitioned topic with same name. + * + * @param topicName + */ + private void validateNonPartitionTopicName(String topicName) { + if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) { + try { + // First check if what's after suffix "-partition-" is number or not, if not number then can create. + int partitionIndex = topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX); + long suffix = Long.parseLong(topicName.substring(partitionIndex + + TopicName.PARTITIONED_TOPIC_SUFFIX.length())); + TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName.substring(0, partitionIndex)); + PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false); + + // Partition topic index is 0 to (number of partition - 1) + if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) { + log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" + + " a number smaller then number of partition of partitioned topic {}.", + clientAppId(), topicName, partitionTopicName.getLocalName()); + throw new RestException(Status.PRECONDITION_FAILED, + "Can't create topic " + topicName + " with \"-partition-\" followed by" + + " a number smaller then number of partition of partitioned topic " + + partitionTopicName.getLocalName()); + } else if (metadata.partitions == 0) { + log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" + + " numeric value if there isn't a partitioned topic {} created.", + clientAppId(), topicName, partitionTopicName.getLocalName()); + throw new RestException(Status.PRECONDITION_FAILED, + "Can't create topic " + topicName + " with \"-partition-\" followed by" + + " numeric value if there isn't a partitioned topic " + + partitionTopicName.getLocalName() + " created."); + } + // If there is a partitioned topic with the same name and numeric suffix is smaller than the + // number of partition for that partitioned topic, validation will pass. + } catch (NumberFormatException e) { + // Do nothing, if value after partition suffix is not pure numeric value, + // as it can't conflict if user want to create partitioned topic with same + // topic name prefix in the future. + } + } + } + protected MessageId internalGetLastMessageId(boolean authoritative) { validateAdminOperationOnTopic(authoritative); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index ed7829379b591..ea96cecf380f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -919,7 +919,9 @@ public void partitionedTopics(String topicName) throws Exception { try { admin.topics().createPartitionedTopic(partitionedTopicName, 32); fail("Should have failed as the partitioned topic already exists"); - } catch (ConflictException ce) { + } catch (PreconditionFailedException e) { + // Expecting PreconditionFailedException instead of ConflictException as it'll + // fail validation before actually try to create metadata in ZK. } producer = client.newProducer(Schema.BYTES) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index 3d8a29a6b5feb..300e3de173674 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -18,24 +18,29 @@ */ package org.apache.pulsar.broker.admin; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.pulsar.broker.admin.v2.NonPersistentTopics; import org.apache.pulsar.broker.admin.v2.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.broker.authentication.AuthenticationDataHttps; +import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; import org.apache.pulsar.broker.web.PulsarWebResource; import org.apache.pulsar.broker.web.RestException; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; import org.apache.zookeeper.KeeperException; import org.mockito.ArgumentCaptor; +import org.powermock.core.classloader.annotations.PrepareForTest; import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; @@ -48,14 +53,18 @@ import java.lang.reflect.Field; import java.util.List; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +@PrepareForTest(PersistentTopics.class) public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { private PersistentTopics persistentTopics; @@ -218,13 +227,58 @@ public void testNonPartitionedTopics() { @Test public void testCreateNonPartitionedTopic() { - final String topicName = "standard-topic"; + final String topicName = "standard-topic-partition-a"; persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata( testTenant, testNamespace, topicName, true, false); Assert.assertEquals(pMetadata.partitions, 0); } + @Test(expectedExceptions = RestException.class) + public void testCreateNonPartitionedTopicWithInvalidName() { + final String topicName = "standard-topic-partition-10"; + doAnswer(invocation -> { + TopicName partitionedTopicname = invocation.getArgument(0, TopicName.class); + assert(partitionedTopicname.getLocalName().equals("standard-topic")); + return new PartitionedTopicMetadata(10); + }).when(persistentTopics).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean()); + persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); + } + + @Test(expectedExceptions = RestException.class) + public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException { + // Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation. + final String nonPartitionTopicName1 = "standard-topic"; + final String nonPartitionTopicName2 = "special-topic-partition-123"; + final String partitionedTopicName = "special-topic"; + LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class); + ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class); + doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService(); + doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache(); + doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString()); + persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5); + } + + @Test(expectedExceptions = RestException.class) + public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception { + // Already have non partition topic special-topic-partition-10, shouldn't able to update number of partitioned topic to more than 10. + final String nonPartitionTopicName2 = "special-topic-partition-10"; + final String partitionedTopicName = "special-topic"; + LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class); + ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class); + doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService(); + doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache(); + doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString()); + doAnswer(invocation -> { + persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace"); + persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname"); + return null; + }).when(persistentTopics).validatePartitionedTopicName(any(), any(), any()); + doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString()); + persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5); + persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10); + } + @Test public void testUnloadTopic() { final String topicName = "standard-topic-to-be-unload"; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java index 35974d43f35fd..24adb74a5a5e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest.java @@ -871,8 +871,10 @@ public void partitionedTopics(String topicName) throws Exception { try { admin.topics().createPartitionedTopic(partitionedTopicName, 32); - fail("Should have failed as the partitioned topic already exists"); - } catch (ConflictException ce) { + fail("Should have failed as the partitioned topic exists with its partition created"); + } catch (PreconditionFailedException e) { + // Expecting PreconditionFailedException instead of ConflictException as it'll + // fail validation before actually try to create metadata in ZK. } producer = client.newProducer(Schema.BYTES) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java index 9b173fab57230..0d693663b76fc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java @@ -686,16 +686,14 @@ public void testMultiTopicsMessageListener() throws Exception { consumer.close(); } - /** * Test topic partitions auto subscribed. * * Steps: * 1. Create a consumer with 2 topics, and each topic has 2 partitions: xx-partition-0, xx-partition-1. - * 2. produce message to xx-partition-2, and verify consumer could not receive message. - * 3. update topics to have 3 partitions. - * 4. trigger partitionsAutoUpdate. this should be done automatically, this is to save time to manually trigger. - * 5. produce message to xx-partition-2 again, and verify consumer could receive message. + * 2. update topics to have 3 partitions. + * 3. trigger partitionsAutoUpdate. this should be done automatically, this is to save time to manually trigger. + * 4. produce message to xx-partition-2 again, and verify consumer could receive message. * */ @Test(timeOut = 30000) @@ -715,52 +713,42 @@ public void testTopicAutoUpdatePartitions() throws Exception { // 1. Create a consumer Consumer consumer = pulsarClient.newConsumer() - .topics(topicNames) - .subscriptionName(subscriptionName) - .subscriptionType(SubscriptionType.Shared) - .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) - .receiverQueueSize(4) - .autoUpdatePartitions(true) - .subscribe(); + .topics(topicNames) + .subscriptionName(subscriptionName) + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS) + .receiverQueueSize(4) + .autoUpdatePartitions(true) + .subscribe(); assertTrue(consumer instanceof MultiTopicsConsumerImpl); MultiTopicsConsumerImpl topicsConsumer = (MultiTopicsConsumerImpl) consumer; - // 2. use partition-2 producer, - Producer producer1 = pulsarClient.newProducer().topic(topicName1 + "-partition-2") - .enableBatching(false) - .create(); - Producer producer2 = pulsarClient.newProducer().topic(topicName2 + "-partition-2") - .enableBatching(false) - .create(); - for (int i = 0; i < totalMessages; i++) { - producer1.send((messagePredicate + "topic1-partition-2 index:" + i).getBytes()); - producer2.send((messagePredicate + "topic2-partition-2 index:" + i).getBytes()); - log.info("produce message to partition-2. message index: {}", i); - } - // since partition-2 not subscribed, could not receive any message. - Message message = consumer.receive(200, TimeUnit.MILLISECONDS); - assertNull(message); - - // 3. update to 3 partitions + // 2. update to 3 partitions admin.topics().updatePartitionedTopic(topicName1, 3); admin.topics().updatePartitionedTopic(topicName2, 3); - // 4. trigger partitionsAutoUpdate. this should be done automatically in 1 minutes, + // 3. trigger partitionsAutoUpdate. this should be done automatically in 1 minutes, // this is to save time to manually trigger. log.info("trigger partitionsAutoUpdateTimerTask"); Timeout timeout = topicsConsumer.getPartitionsAutoUpdateTimeout(); timeout.task().run(timeout); Thread.sleep(200); - // 5. produce message to xx-partition-2 again, and verify consumer could receive message. + // 4. produce message to xx-partition-2, and verify consumer could receive message. + Producer producer1 = pulsarClient.newProducer().topic(topicName1 + "-partition-2") + .enableBatching(false) + .create(); + Producer producer2 = pulsarClient.newProducer().topic(topicName2 + "-partition-2") + .enableBatching(false) + .create(); for (int i = 0; i < totalMessages; i++) { producer1.send((messagePredicate + "topic1-partition-2 index:" + i).getBytes()); producer2.send((messagePredicate + "topic2-partition-2 index:" + i).getBytes()); log.info("produce message to partition-2 again. messageindex: {}", i); } int messageSet = 0; - message = consumer.receive(); + Message message = consumer.receive(); do { messageSet ++; consumer.acknowledge(message); diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 06190d8d73a96..8f5bb2ded6cdd 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -2096,7 +2096,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { std::string topicName2 = "persistent://public/default/patternTopicsAutoConsumerPubSub2"; std::string topicName3 = "persistent://public/default/patternTopicsAutoConsumerPubSub3"; // This will not match pattern - std::string topicName4 = "persistent://public/default/patternMultiTopicsNotMatchPubSub4"; + std::string topicName4 = "persistent://public/default/notMatchPatternTopicsAutoConsumerPubSub4"; // call admin api to make topics partitioned std::string url1 = @@ -2106,7 +2106,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) { std::string url3 = adminUrl + "admin/v2/persistent/public/default/patternTopicsAutoConsumerPubSub3/partitions"; std::string url4 = - adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsNotMatchPubSub4/partitions"; + adminUrl + "admin/v2/persistent/public/default/notMatchPatternTopicsAutoConsumerPubSub4/partitions"; int res = makePutRequest(url1, "2"); ASSERT_FALSE(res != 204 && res != 409); diff --git a/site2/docs/admin-api-non-partitioned-topics.md b/site2/docs/admin-api-non-partitioned-topics.md index a59a68a225168..61f7409b47938 100644 --- a/site2/docs/admin-api-non-partitioned-topics.md +++ b/site2/docs/admin-api-non-partitioned-topics.md @@ -41,6 +41,12 @@ $ bin/pulsar-admin topics create \ persistent://my-tenant/my-namespace/my-topic ``` +> #### Note +> +> It's only allowed to create non partitioned topic of name contains suffix '-partition-' followed by numeric value like +> 'xyz-topic-partition-10', if there's already a partitioned topic with same name, in this case 'xyz-topic', and has +> number of partition larger then that numeric value in this case 11(partition index is start from 0). Else creation of such topic will fail. + #### REST API {@inject: endpoint|PUT|/admin/v2/persistent/:tenant/:namespace/:topic|operation/createNonPartitionedTopic} diff --git a/site2/docs/admin-api-partitioned-topics.md b/site2/docs/admin-api-partitioned-topics.md index ab3d9cd95e0c6..fe0ab9853d35a 100644 --- a/site2/docs/admin-api-partitioned-topics.md +++ b/site2/docs/admin-api-partitioned-topics.md @@ -34,6 +34,7 @@ need to provide a name for the topic as well as the desired number of partitions You can create partitioned topics using the [`create-partitioned-topic`](reference-pulsar-admin.md#create-partitioned-topic) command and specifying the topic name as an argument and the number of partitions using the `-p` or `--partitions` flag. + Here's an example: ```shell @@ -42,6 +43,13 @@ $ bin/pulsar-admin topics create-partitioned-topic \ --partitions 4 ``` +> #### Note +> +> If there already exists a non partitioned topic with suffix '-partition-' followed by numeric value like +> 'xyz-topic-partition-10', then you can not create partitioned topic with name 'xyz-topic' as the partitions +> of the partitioned topic could override the existing non partitioned topic. You have to delete that non +> partitioned topic first then create the partitioned topic. + #### REST API {@inject: endpoint|PUT|/admin/v2/persistent/:tenant/:namespace/:topic/partitions|operation/createPartitionedTopic}