Skip to content

Commit

Permalink
[fix][broker] Reject auto create partitioned topic when topic name co…
Browse files Browse the repository at this point in the history
…ntains ``-partition-`` (apache#14920)
  • Loading branch information
mattisonchao authored Apr 8, 2022
1 parent 8a67a38 commit 8add5db
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,10 @@ public NotFoundException(Throwable t) {
super(t);
}
}
public static class InvalidTopicNameException extends PulsarServerException {

public InvalidTopicNameException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.BrokerServiceException;
Expand Down Expand Up @@ -504,6 +505,9 @@ protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllo
if (e.getCause() instanceof RestException) {
throw (RestException) e.getCause();
}
if (e.getCause() instanceof PulsarServerException.InvalidTopicNameException) {
throw new RestException(Status.PRECONDITION_FAILED, e.getCause().getMessage());
}
throw new RestException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2647,6 +2647,11 @@ public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadata

@SuppressWarnings("deprecation")
private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
if (topicName.getLocalName().contains(TopicName.PARTITIONED_TOPIC_SUFFIX)) {
return FutureUtil.failedFuture(new PulsarServerException.
InvalidTopicNameException(
String.format("Invalid topic name: %s , should not contain -partition-", topicName)));
}
final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,28 +29,31 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;

import java.lang.reflect.Field;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.Sets;
import lombok.Cleanup;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -271,4 +274,38 @@ public void testPersistentPartitionedTopicUnload() throws Exception {
producer.close();
}
}

@Test
public void testAutoCreatePartitionedTopicThatNameIncludePartition() throws Exception {
final String topicName = "persistent://prop/autoNs/failedcreate-partition-abcde";
final String ns = "prop/autoNs";
admin.namespaces().createNamespace(ns);
pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned");
try {
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName)
.create();
Assert.fail("unexpected operation");
} catch (PulsarClientException ex) {
Assert.assertTrue(ex.getMessage()
.contains("Invalid topic name"));
}
Assert.assertEquals(admin.topics().getList(ns).size(), 0);
URI tcpLookupUrl = new URI(pulsar.getBrokerServiceUrl());
PulsarClient client = PulsarClient.builder()
.serviceUrl(tcpLookupUrl.toString())
.build();
try {
@Cleanup
Producer<byte[]> producer = client.newProducer()
.topic(topicName)
.create();
Assert.fail("unexpected operation");
} catch (PulsarClientException ex) {
Assert.assertTrue(ex.getMessage()
.contains("Invalid topic name"));
}
Assert.assertEquals(admin.topics().getList(ns).size(), 0);
pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned");
}
}

0 comments on commit 8add5db

Please sign in to comment.