Skip to content

Commit

Permalink
Fix getting partition metadata of a nonexistent topic returns 0 (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower authored May 18, 2021
1 parent 3bdc776 commit 403b57a
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.Constants;
import org.apache.pulsar.common.naming.NamespaceBundle;
Expand Down Expand Up @@ -296,26 +295,6 @@ protected void validatePartitionedTopicMetadata(String tenant, String namespace,
}
}

protected void validateTopicExistedAndCheckAllowAutoCreation(String tenant, String namespace,
String encodedTopic, boolean checkAllowAutoCreation) {
try {
PartitionedTopicMetadata partitionedTopicMetadata =
pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).get();
if (partitionedTopicMetadata.partitions < 1) {
if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()
&& checkAllowAutoCreation
&& !pulsar().getBrokerService().isAllowAutoTopicCreation(topicName)) {
throw new RestException(Status.NOT_FOUND,
new PulsarClientException.NotFoundException("Topic not exist"));
}
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to validate topic existed {}://{}/{}/{}",
domain(), tenant, namespace, topicName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Check topic partition meta failed.");
}
}

@Deprecated
protected void validateTopicName(String property, String cluster, String namespace, String encodedTopic) {
String topic = Codec.decode(encodedTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,21 @@ protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean author
boolean checkAllowAutoCreation) {
PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName,
authoritative, checkAllowAutoCreation);
if (metadata.partitions == 0 && !checkAllowAutoCreation) {
// The topic may be a non-partitioned topic, so check if it exists here.
// However, when checkAllowAutoCreation is true, the client will create the topic if it doesn't exist.
// In this case, `partitions == 0` means the automatically created topic is a non-partitioned topic so we
// shouldn't check if the topic exists.
try {
if (!pulsar().getNamespaceService().checkTopicExists(topicName).get()) {
throw new RestException(Status.NOT_FOUND,
new PulsarClientException.NotFoundException("Topic not exist"));
}
} catch (InterruptedException | ExecutionException e) {
log.error("Failed to check if topic '{}' exists", topicName, e);
throw new RestException(Status.INTERNAL_SERVER_ERROR, "Failed to get topic metadata");
}
}
if (metadata.partitions > 1) {
validateClientVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ public PartitionedTopicMetadata getPartitionedMetadata(
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Is check configuration required to automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation);
return super.getPartitionedMetadata(tenant, namespace, encodedTopic, authoritative, checkAllowAutoCreation);
}

@GET
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,6 @@ public PartitionedTopicMetadata getPartitionedMetadata(
@ApiParam(value = "Is check configuration required to automatically create topic")
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicExistedAndCheckAllowAutoCreation(tenant, namespace, encodedTopic, checkAllowAutoCreation);
return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ConfigHelper;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -219,6 +218,17 @@ public Object[][] topicTypeProvider() {
return new Object[][] { { TopicDomain.persistent.value() }, { TopicDomain.non_persistent.value() } };
}

@DataProvider(name = "topicNamesForAllTypes")
public Object[][] topicNamesForAllTypesProvider() {
final List<Object[]> topicNames = new ArrayList<>();
for (int i = 0; i < topicTypeProvider().length; i++) {
for (int j = 0; j < topicNamesProvider().length; j++) {
topicNames.add(new Object[]{ topicTypeProvider()[i][0], topicNamesProvider()[j][0] });
}
}
return topicNames.toArray(new Object[topicNamesProvider().length * topicTypeProvider().length][]);
}

@Test
public void clusters() throws Exception {
admin.clusters().createCluster("usw",
Expand Down Expand Up @@ -869,21 +879,41 @@ public void persistentTopics(String topicName) throws Exception {
assertEquals(admin.topics().getList("prop-xyz/ns1"), Lists.newArrayList());
}

@Test(dataProvider = "topicName")
public void partitionedTopics(String topicName) throws Exception {
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"), Lists.newArrayList());
final String partitionedTopicName = "persistent://prop-xyz/ns1/" + topicName;
@Test(dataProvider = "topicNamesForAllTypes")
public void partitionedTopics(String topicType, String topicName) throws Exception {
final String namespace = "prop-xyz/ns1";
final String partitionedTopicName = topicType + "://" + namespace + "/" + topicName;
final String anotherTopic = topicType + "://" + namespace + "/ds2";
// TODO: there're some gaps between non-persistent topics and persistent topics, so some checks will be skipped
// for non-persistent topics. After the gaps were filled, we can remove this check.
final boolean isPersistent = topicType.equals(TopicDomain.persistent.value());

assertEquals(admin.topics().getPartitionedTopicList(namespace), Lists.newArrayList());

try {
admin.topics().getPartitionedTopicMetadata(partitionedTopicName);
fail("getPartitionedTopicMetadata of " + partitionedTopicName + " should not succeed");
} catch (NotFoundException expected) {
}

admin.topics().createPartitionedTopic(partitionedTopicName, 4);
assertEquals(admin.topics().getPartitionedTopicList("prop-xyz/ns1"),
assertEquals(admin.topics().getPartitionedTopicList(namespace),
Lists.newArrayList(partitionedTopicName));

assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 4);

List<String> topics = admin.topics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 4);
List<String> topics;
if (isPersistent) {
// TODO: for non-persistent topics getList will return 0
topics = admin.topics().getList(namespace);
assertEquals(topics.size(), 4);
}

assertEquals(admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/ns1/ds2").partitions,
0);
try {
admin.topics().getPartitionedTopicMetadata(anotherTopic);
fail("getPartitionedTopicMetadata of " + anotherTopic + " should not succeed");
} catch (NotFoundException expected) {
}
// check the getPartitionedStats for PartitionedTopic returns only partitions metadata, and no partitions info
assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions,
admin.topics().getPartitionedStats(partitionedTopicName,false).metadata.partitions);
Expand All @@ -906,8 +936,12 @@ public void partitionedTopics(String topicName) throws Exception {
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));

try {
admin.topics().deleteSubscription(partitionedTopicName, "my-sub");
fail("should have failed");
if (isPersistent) {
// TODO: for non-persistent topics, deleteSubscription might throw NotFoundException
admin.topics().deleteSubscription(partitionedTopicName, "my-sub");
// TODO: for non-persistent topics, deleteSubscription won't fail
fail("should have failed");
}
} catch (PulsarAdminException.PreconditionFailedException e) {
// ok
} catch (Exception e) {
Expand All @@ -917,12 +951,19 @@ public void partitionedTopics(String topicName) throws Exception {
Consumer<byte[]> consumer1 = client.newConsumer().topic(partitionedTopicName).subscriptionName("my-sub-1")
.subscribe();

assertEquals(Sets.newHashSet(admin.topics().getSubscriptions(partitionedTopicName)),
Sets.newHashSet("my-sub", "my-sub-1"));
if (isPersistent) {
// TODO: for non-persistent topics, getSubscriptions will return a empty set
assertEquals(Sets.newHashSet(admin.topics().getSubscriptions(partitionedTopicName)),
Sets.newHashSet("my-sub", "my-sub-1"));
}

consumer1.close();
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
if (isPersistent) {
// TODO: for non-persistent topics, deleteSubscription might throw NotFoundException
admin.topics().deleteSubscription(partitionedTopicName, "my-sub-1");
// TODO: for non-persistent topics, getSubscriptions will return a empty set
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList("my-sub"));
}

Producer<byte[]> producer = client.newProducer(Schema.BYTES)
.topic(partitionedTopicName)
Expand All @@ -935,15 +976,18 @@ public void partitionedTopics(String topicName) throws Exception {
producer.send(message.getBytes());
}

assertEquals(Sets.newHashSet(admin.topics().getList("prop-xyz/ns1")),
assertEquals(Sets.newHashSet(admin.topics().getList(namespace)),
Sets.newHashSet(partitionedTopicName + "-partition-0", partitionedTopicName + "-partition-1",
partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3"));

// test cumulative stats for partitioned topic
PartitionedTopicStats topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1);
assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10);
if (isPersistent) {
// TODO: for non-persistent topics, the subscription doesn't exist
assertEquals(topicStats.subscriptions.keySet(), Sets.newTreeSet(Lists.newArrayList("my-sub")));
assertEquals(topicStats.subscriptions.get("my-sub").consumers.size(), 1);
assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 10);
}
assertEquals(topicStats.publishers.size(), 1);
assertEquals(topicStats.partitions, Maps.newHashMap());

Expand All @@ -955,8 +999,11 @@ public void partitionedTopics(String topicName) throws Exception {
partitionedTopicName + "-partition-2", partitionedTopicName + "-partition-3"));
TopicStats partitionStats = topicStats.partitions.get(partitionedTopicName + "-partition-0");
assertEquals(partitionStats.publishers.size(), 1);
assertEquals(partitionStats.subscriptions.get("my-sub").consumers.size(), 1);
assertEquals(partitionStats.subscriptions.get("my-sub").msgBacklog, 3, 1);
if (isPersistent) {
// TODO: for non-persistent topics, the subscription doesn't exist
assertEquals(partitionStats.subscriptions.get("my-sub").consumers.size(), 1);
assertEquals(partitionStats.subscriptions.get("my-sub").msgBacklog, 3, 1);
}

try {
admin.topics().skipMessages(partitionedTopicName, "my-sub", 5);
Expand All @@ -965,16 +1012,21 @@ public void partitionedTopics(String topicName) throws Exception {
// ok
}

admin.topics().skipAllMessages(partitionedTopicName, "my-sub");
topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0);
if (isPersistent) {
// TODO: for non-persistent topics, skilAllMessages will cause 500 internal error
admin.topics().skipAllMessages(partitionedTopicName, "my-sub");
topicStats = admin.topics().getPartitionedStats(partitionedTopicName, false);
assertEquals(topicStats.subscriptions.get("my-sub").msgBacklog, 0);
}

producer.close();
consumer.close();

admin.topics().deleteSubscription(partitionedTopicName, "my-sub");

assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList());
if (isPersistent) {
// TODO: for non-persistent topics, deleteSubscription might throw NotFoundException
admin.topics().deleteSubscription(partitionedTopicName, "my-sub");
assertEquals(admin.topics().getSubscriptions(partitionedTopicName), Lists.newArrayList());
}

try {
admin.topics().createPartitionedTopic(partitionedTopicName, 32);
Expand All @@ -988,8 +1040,11 @@ public void partitionedTopics(String topicName) throws Exception {
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

topics = admin.topics().getList("prop-xyz/ns1");
assertEquals(topics.size(), 4);
if (isPersistent) {
// TODO: for non-persistent topics getList will return 0
topics = admin.topics().getList(namespace);
assertEquals(topics.size(), 4);
}

try {
admin.topics().deletePartitionedTopic(partitionedTopicName);
Expand All @@ -1003,14 +1058,18 @@ public void partitionedTopics(String topicName) throws Exception {

admin.topics().deletePartitionedTopic(partitionedTopicName);

assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0);
try {
admin.topics().getPartitionedTopicMetadata(partitionedTopicName);
fail("getPartitionedTopicMetadata of " + partitionedTopicName + " should not succeed");
} catch (NotFoundException expected) {
}

admin.topics().createPartitionedTopic(partitionedTopicName, 32);

assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 32);

try {
admin.topics().deletePartitionedTopic("persistent://prop-xyz/ns1/ds2");
admin.topics().deletePartitionedTopic(anotherTopic);
fail("Should have failed as the partitioned topic was not created");
} catch (NotFoundException nfe) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -792,9 +792,11 @@ public void partitionedTopics(String topicName) throws Exception {
List<String> topics = admin.topics().getList("prop-xyz/use/ns1");
assertEquals(topics.size(), 4);

assertEquals(
admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds2").partitions,
0);
try {
admin.topics().getPartitionedTopicMetadata("persistent://prop-xyz/use/ns1/ds2");
fail("getPartitionedTopicMetadata of persistent://prop-xyz/use/ns1/ds2 should not succeed");
} catch (NotFoundException expected) {
}

// create consumer and subscription
@Cleanup
Expand Down Expand Up @@ -908,7 +910,11 @@ public void partitionedTopics(String topicName) throws Exception {

admin.topics().deletePartitionedTopic(partitionedTopicName);

assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 0);
try {
admin.topics().getPartitionedTopicMetadata(partitionedTopicName);
fail("getPartitionedTopicMetadata of " + partitionedTopicName + " should not succeed");
} catch (NotFoundException expected) {
}

admin.topics().createPartitionedTopic(partitionedTopicName, 32);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
*/
package org.apache.pulsar.broker.service;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -154,8 +154,10 @@ public void testGetPartitionedMetadataWithoutCheckAllowAutoCreation() throws Exc
pulsar.getConfiguration().setDefaultNumPartitions(3);

final String topicString = "persistent://prop/ns-abc/test-topic-3";
int partitions = admin.topics().getPartitionedTopicMetadata(topicString).partitions;
assertEquals(partitions, 0);
try {
admin.topics().getPartitionedTopicMetadata(topicString);
} catch (PulsarAdminException.NotFoundException expected) {
}
assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicString));
}

Expand Down

0 comments on commit 403b57a

Please sign in to comment.