Skip to content

Commit

Permalink
Validate topic name before creating partition/non partition topic via…
Browse files Browse the repository at this point in the history
… admin cli. (apache#5148)

fix apache#4994
For non partition topic,

Topic name contains partition suffix "-partition-" and the remaining part follow the partition
suffix is numeric value larger than the number of partition if there's already a partition topic with same
name(the part before suffix "-partition-").
2)Topic name contains partition suffix "-partition-" and the remaining part follow the partition
suffix is numeric value but there isn't a partitioned topic with same name.
For partition topic,
Validation will fail and throw RestException if

There's already a partitioned topic with same topic name and have some of its partition created.
There's already non partition topic with same name and contains partition suffix "-partition-"
followed by numeric value. In this case internal created partition of partitioned topic could override
the existing non partition topic.
This is for non partition topic created before we enforce the check as we will prevent creation of non partition topic with such name which could lost of confusion.
For update partition topic,
Validation will fail if there's already non partition topic with same name and contains partition suffix "-partition-" followed by numeric value X then the new number of partition of that partitioned topic can not be greater than that X else that non partition topic will essentially be overwritten and cause unexpected consequence.

Also removed TopicsConsumerImplTest#testTopicAutoUpdatePartitions in flavor of PartitionedProducerConsumerTest#testAutoUpdatePartitionsForProducerConsumer, as they're both testing auto partition update to consumer/producer. But TopicsConsumerImplTest#testTopicAutoUpdatePartitions was trying to manually create non partitioned topic with name like "xyz-topic-partition-5" by creating new producer of that topic, while 5 is larger than a existing partitioned topic xyz-topic's actual number of partition. Which make the validation fails and shouldn't be encouraged.
  • Loading branch information
MarvinCai authored and jiazhai committed Nov 6, 2019
1 parent 5c771d2 commit e0ea6b5
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,7 @@ protected void internalCreatePartitionedTopic(int numPartitions) {
if (numPartitions <= 0) {
throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0");
}
validatePartitionTopicName(topicName.getLocalName());
try {
boolean topicExist = pulsar().getNamespaceService()
.getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL)
Expand Down Expand Up @@ -411,7 +412,7 @@ protected void internalCreatePartitionedTopic(int numPartitions) {

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());

validateNonPartitionTopicName(topicName.getLocalName());
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
}
Expand Down Expand Up @@ -439,6 +440,10 @@ protected void internalCreateNonPartitionedTopic(boolean authoritative) {
*/
protected void internalUpdatePartitionedTopic(int numPartitions, boolean updateLocalTopicOnly) {
validateAdminAccessForTenant(topicName.getTenant());
// Only do the validation if it's the first hop.
if (!updateLocalTopicOnly) {
validatePartitionTopicUpdate(topicName.getLocalName(), numPartitions);
}

if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) {
Set<String> clusters = getNamespaceReplicatedClusters(topicName.getNamespaceObject());
Expand Down Expand Up @@ -1903,6 +1908,127 @@ private void validateClientVersion() {
return;
}

/**
* Validate update of number of partition for partitioned topic.
* If there's already non partition topic with same name and contains partition suffix "-partition-"
* followed by numeric value X then the new number of partition of that partitioned topic can not be greater
* than that X else that non partition topic will essentially be overwritten and cause unexpected consequence.
*
* @param topicName
*/
private void validatePartitionTopicUpdate(String topicName, int numberOfPartition) {
List<String> existingTopicList = internalGetList();
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName);
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);
int oldPartition = metadata.partitions;
String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String exsitingTopicName : existingTopicList) {
if (exsitingTopicName.contains(prefix)) {
try {
long suffix = Long.parseLong(exsitingTopicName.substring(
exsitingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
// Skip partition of partitioned topic by making sure the numeric suffix greater than old partition number.
if (suffix >= oldPartition && suffix <= (long) numberOfPartition) {
log.warn("[{}] Already have non partition topic {} which contains partition " +
"suffix '-partition-' and end with numeric value smaller than the new number of partition. " +
"Update of partitioned topic {} could cause conflict.", clientAppId(), exsitingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have non partition topic" + exsitingTopicName + " which contains partition suffix '-partition-' " +
"and end with numeric value and end with numeric value smaller than the new " +
"number of partition. Update of partitioned topic " + topicName + " could cause conflict.");
}
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
}

/**
* Validate partitioned topic name.
* Validation will fail and throw RestException if
* 1) There's already a partitioned topic with same topic name and have some of its partition created.
* 2) There's already non partition topic with same name and contains partition suffix "-partition-"
* followed by numeric value. In this case internal created partition of partitioned topic could override
* the existing non partition topic.
*
* @param topicName
*/
private void validatePartitionTopicName(String topicName) {
List<String> existingTopicList = internalGetList();
String prefix = topicName + TopicName.PARTITIONED_TOPIC_SUFFIX;
for (String existingTopicName : existingTopicList) {
if (existingTopicName.contains(prefix)) {
try {
Long.parseLong(existingTopicName.substring(
existingTopicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX)
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
log.warn("[{}] Already have topic {} which contains partition " +
"suffix '-partition-' and end with numeric value. Creation of partitioned topic {}"
+ "could cause conflict.", clientAppId(), existingTopicName, topicName);
throw new RestException(Status.PRECONDITION_FAILED,
"Already have topic " + existingTopicName + " which contains partition suffix '-partition-' " +
"and end with numeric value, Creation of partitioned topic " + topicName +
" could cause conflict.");
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict with internal created partitioned topic's name.
}
}
}
}

/**
* Validate non partition topic name,
* Validation will fail and throw RestException if
* 1) Topic name contains partition suffix "-partition-" and the remaining part follow the partition
* suffix is numeric value larger than the number of partition if there's already a partition topic with same
* name(the part before suffix "-partition-").
* 2)Topic name contains partition suffix "-partition-" and the remaining part follow the partition
* suffix is numeric value but there isn't a partitioned topic with same name.
*
* @param topicName
*/
private void validateNonPartitionTopicName(String topicName) {
if (topicName.contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
try {
// First check if what's after suffix "-partition-" is number or not, if not number then can create.
int partitionIndex = topicName.indexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
long suffix = Long.parseLong(topicName.substring(partitionIndex
+ TopicName.PARTITIONED_TOPIC_SUFFIX.length()));
TopicName partitionTopicName = TopicName.get(domain(), namespaceName, topicName.substring(0, partitionIndex));
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(partitionTopicName, false, false);

// Partition topic index is 0 to (number of partition - 1)
if (metadata.partitions > 0 && suffix >= (long) metadata.partitions) {
log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
" a number smaller then number of partition of partitioned topic {}.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
"Can't create topic " + topicName + " with \"-partition-\" followed by" +
" a number smaller then number of partition of partitioned topic " +
partitionTopicName.getLocalName());
} else if (metadata.partitions == 0) {
log.warn("[{}] Can't create topic {} with \"-partition-\" followed by" +
" numeric value if there isn't a partitioned topic {} created.",
clientAppId(), topicName, partitionTopicName.getLocalName());
throw new RestException(Status.PRECONDITION_FAILED,
"Can't create topic " + topicName + " with \"-partition-\" followed by" +
" numeric value if there isn't a partitioned topic " +
partitionTopicName.getLocalName() + " created.");
}
// If there is a partitioned topic with the same name and numeric suffix is smaller than the
// number of partition for that partitioned topic, validation will pass.
} catch (NumberFormatException e) {
// Do nothing, if value after partition suffix is not pure numeric value,
// as it can't conflict if user want to create partitioned topic with same
// topic name prefix in the future.
}
}
}

protected MessageId internalGetLastMessageId(boolean authoritative) {
validateAdminOperationOnTopic(authoritative);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -919,7 +919,9 @@ public void partitionedTopics(String topicName) throws Exception {
try {
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
fail("Should have failed as the partitioned topic already exists");
} catch (ConflictException ce) {
} catch (PreconditionFailedException e) {
// Expecting PreconditionFailedException instead of ConflictException as it'll
// fail validation before actually try to create metadata in ZK.
}

producer = client.newProducer(Schema.BYTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,29 @@
*/
package org.apache.pulsar.broker.admin;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.pulsar.broker.admin.v2.NonPersistentTopics;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
import org.apache.zookeeper.KeeperException;
import org.mockito.ArgumentCaptor;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
Expand All @@ -48,14 +53,18 @@
import java.lang.reflect.Field;
import java.util.List;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

@PrepareForTest(PersistentTopics.class)
public class PersistentTopicsTest extends MockedPulsarServiceBaseTest {

private PersistentTopics persistentTopics;
Expand Down Expand Up @@ -218,13 +227,58 @@ public void testNonPartitionedTopics() {

@Test
public void testCreateNonPartitionedTopic() {
final String topicName = "standard-topic";
final String topicName = "standard-topic-partition-a";
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata(
testTenant, testNamespace, topicName, true, false);
Assert.assertEquals(pMetadata.partitions, 0);
}

@Test(expectedExceptions = RestException.class)
public void testCreateNonPartitionedTopicWithInvalidName() {
final String topicName = "standard-topic-partition-10";
doAnswer(invocation -> {
TopicName partitionedTopicname = invocation.getArgument(0, TopicName.class);
assert(partitionedTopicname.getLocalName().equals("standard-topic"));
return new PartitionedTopicMetadata(10);
}).when(persistentTopics).getPartitionedTopicMetadata(any(), anyBoolean(), anyBoolean());
persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true);
}

@Test(expectedExceptions = RestException.class)
public void testCreatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws KeeperException, InterruptedException {
// Test the case in which user already has topic like topic-name-partition-123 created before we enforce the validation.
final String nonPartitionTopicName1 = "standard-topic";
final String nonPartitionTopicName2 = "special-topic-partition-123";
final String partitionedTopicName = "special-topic";
LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName1, nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
}

@Test(expectedExceptions = RestException.class)
public void testUpdatePartitionedTopicHavingNonPartitionTopicWithPartitionSuffix() throws Exception {
// Already have non partition topic special-topic-partition-10, shouldn't able to update number of partitioned topic to more than 10.
final String nonPartitionTopicName2 = "special-topic-partition-10";
final String partitionedTopicName = "special-topic";
LocalZooKeeperCacheService mockLocalZooKeeperCacheService = mock(LocalZooKeeperCacheService.class);
ZooKeeperChildrenCache mockZooKeeperChildrenCache = mock(ZooKeeperChildrenCache.class);
doReturn(mockLocalZooKeeperCacheService).when(pulsar).getLocalZkCacheService();
doReturn(mockZooKeeperChildrenCache).when(mockLocalZooKeeperCacheService).managedLedgerListCache();
doReturn(ImmutableSet.of(nonPartitionTopicName2)).when(mockZooKeeperChildrenCache).get(anyString());
doAnswer(invocation -> {
persistentTopics.namespaceName = NamespaceName.get("tenant", "namespace");
persistentTopics.topicName = TopicName.get("persistent", "tenant", "cluster", "namespace", "topicname");
return null;
}).when(persistentTopics).validatePartitionedTopicName(any(), any(), any());
doNothing().when(persistentTopics).validateAdminAccessForTenant(anyString());
persistentTopics.createPartitionedTopic(testTenant, testNamespace, partitionedTopicName, 5);
persistentTopics.updatePartitionedTopic(testTenant, testNamespace, partitionedTopicName, true, 10);
}

@Test
public void testUnloadTopic() {
final String topicName = "standard-topic-to-be-unload";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,8 +871,10 @@ public void partitionedTopics(String topicName) throws Exception {

try {
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
fail("Should have failed as the partitioned topic already exists");
} catch (ConflictException ce) {
fail("Should have failed as the partitioned topic exists with its partition created");
} catch (PreconditionFailedException e) {
// Expecting PreconditionFailedException instead of ConflictException as it'll
// fail validation before actually try to create metadata in ZK.
}

producer = client.newProducer(Schema.BYTES)
Expand Down
Loading

0 comments on commit e0ea6b5

Please sign in to comment.