Skip to content

Commit

Permalink
maxTopicsPerNamespace check should exclude system topic. (apache#10850)
Browse files Browse the repository at this point in the history
### Motivation

The current maxTopicsPerNamespace check logic contains system topics, but it should be excluded. As the even if we turn off `allowAutoTopicCreation`, the systemtopic will auto creation too.

### Modifications

1. Exclude the system topics for the maxTopicsPerNamespace  logic.
2. SystemTopic add the missed logic, such as  `__transaction_buffer_snapshot` 、start with the `__transaction_buffer_snapshot` end with the `__transaction_pending_ack`.

In addition, why is mytopic-partition--1 is a partitioned topic(with --)? Which version of us uses this logic?

```java
        // NOTE: Following behavior is not right actually, but for the backward compatibility, it shouldn't be changed
        assertEquals(TopicName.getPartitionIndex("mytopic-partition--1"), 1);
        assertEquals(TopicName.getPartitionIndex("mytopic-partition-00"), 0);
        assertEquals(TopicName.getPartitionIndex("mytopic-partition-012"), 12);
```
this testcase added with apache@dd8be83#diff-445b0cfa56ca0c784df78e73d9294f2a37f079ca3c15c345b03c09d56f81ebffR204 by @BewareMyPower
  • Loading branch information
yangl authored Jun 22, 2021
1 parent cf2ec14 commit dbcaa98
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.internal.TopicsImpl;
Expand Down Expand Up @@ -640,9 +641,14 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
maxTopicsPerNamespace = pulsar().getConfig().getMaxTopicsPerNamespace();
}

if (maxTopicsPerNamespace > 0) {
// new create check
if (maxTopicsPerNamespace > 0 && !SystemTopicClient.isSystemTopic(topicName)) {
List<String> partitionedTopics = getTopicPartitionList(TopicDomain.persistent);
if (partitionedTopics.size() + numPartitions > maxTopicsPerNamespace) {
// exclude created system topic
long topicsCount =
partitionedTopics.stream().filter(t -> !SystemTopicClient.isSystemTopic(TopicName.get(t)))
.count();
if (topicsCount + numPartitions > maxTopicsPerNamespace) {
log.error("[{}] Failed to create partitioned topic {}, "
+ "exceed maximum number of topics in namespace", clientAppId(), topicName);
resumeAsyncResponseExceptionally(asyncResponse, new RestException(Status.PRECONDITION_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2618,12 +2618,16 @@ private <T> boolean checkMaxTopicsPerNamespace(TopicName topicName, int numParti
maxTopicsPerNamespace = pulsar.getConfig().getMaxTopicsPerNamespace();
}

if (maxTopicsPerNamespace > 0) {
// new create check
if (maxTopicsPerNamespace > 0 && !SystemTopicClient.isSystemTopic(topicName)) {
String partitionedTopicPath = PulsarWebResource.joinPath(MANAGED_LEDGER_PATH_ZNODE,
topicName.getNamespace(), topicName.getDomain().value());
List<String> topics = pulsar().getLocalZkCache().getZooKeeper()
.getChildren(partitionedTopicPath, false);
if (topics.size() + numPartitions > maxTopicsPerNamespace) {
// exclude created system topic
long topicsCount =
topics.stream().filter(t -> !SystemTopicClient.isSystemTopic(TopicName.get(t))).count();
if (topicsCount + numPartitions > maxTopicsPerNamespace) {
log.error("Failed to create persistent topic {}, "
+ "exceed maximum number of topics in namespace", topicName);
topicFuture.completeExceptionally(new RestException(Response.Status.PRECONDITION_FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
Expand Down Expand Up @@ -168,12 +170,20 @@ interface Reader<T> {
}

static boolean isSystemTopic(TopicName topicName) {
if (topicName.isPartitioned()) {
return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME
.equals(TopicName.get(topicName.getPartitionedTopicName()).getLocalName());
TopicName nonePartitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());

// event topic
if (EventsTopicNames.checkTopicIsEventsNames(nonePartitionedTopicName)) {
return true;
}

String localName = nonePartitionedTopicName.getLocalName();
// transaction pending ack topic
if (StringUtils.endsWith(localName, MLPendingAckStore.PENDING_ACK_STORE_SUFFIX)) {
return true;
}

return EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME.equals(topicName.getLocalName());
return false;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pulsar.broker.admin;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -80,17 +79,13 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyData;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyDataImpl;
import org.apache.pulsar.common.policies.data.AutoFailoverPolicyType;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationData;
import org.apache.pulsar.common.policies.data.BrokerNamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.FailureDomainImpl;
import org.apache.pulsar.common.policies.data.FailureDomain;
import org.apache.pulsar.common.policies.data.NamespaceIsolationData;
import org.apache.pulsar.common.policies.data.NamespaceIsolationDataImpl;
import org.apache.pulsar.common.policies.data.PartitionedTopicStats;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
Expand Down Expand Up @@ -1557,6 +1552,39 @@ public void testMaxTopicsPerNamespace() throws Exception {
admin.topics().createNonPartitionedTopic(topic + i + i);
}

// check first create normal topic, then system topics, unlimited even setMaxTopicsPerNamespace
super.internalCleanup();
conf.setMaxTopicsPerNamespace(5);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);


// check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
super.internalCleanup();
conf.setMaxTopicsPerNamespace(5);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
admin.topics().createPartitionedTopic(
"persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
+ "-05c0ded5e9__transaction_pending_ack", 2);
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}

// check producer/consumer auto create partitioned topic
super.internalCleanup();
conf.setMaxTopicsPerNamespace(10);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.systopic;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.pulsar.common.naming.TopicName;
import org.junit.Test;

public class SystemTopicClientTest {

@Test
public void testIsSystemTopic() {
assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("test")));
assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("public/default/test")));
assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("persistent://public/default/test")));
assertFalse(SystemTopicClient.isSystemTopic(TopicName.get("non-persistent://public/default/test")));

assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__change_events")));
assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__change_events-partition-0")));
assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__change_events-partition-1")));
assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__transaction_buffer_snapshot")));
assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-0")));
assertTrue(SystemTopicClient.isSystemTopic(TopicName.get("__transaction_buffer_snapshot-partition-1")));
assertTrue(SystemTopicClient.isSystemTopic(TopicName
.get("topicxxx-partition-0-multiTopicsReader-f433329d68__transaction_pending_ack")));
assertTrue(SystemTopicClient.isSystemTopic(
TopicName.get("topicxxx-multiTopicsReader-f433329d68__transaction_pending_ack")));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
*/
package org.apache.pulsar.common.events;

import org.apache.pulsar.common.naming.TopicName;

import java.util.Arrays;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.pulsar.common.naming.TopicName;

/**
* System topic names for each {@link EventType}.
Expand All @@ -36,18 +34,23 @@ public class EventsTopicNames {
public static final String NAMESPACE_EVENTS_LOCAL_NAME = "__change_events";

/**
* Local topic name for the namespace events.
* Local topic name for the transaction buffer snapshot.
*/
public static final String TRANSACTION_BUFFER_SNAPSHOT = "__transaction_buffer_snapshot";

/**
* The set of all local topic names declared above.
*/
public static final Set<String> EVENTS_TOPIC_NAMES =
Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT)));
Collections.unmodifiableSet(Sets.newHashSet(NAMESPACE_EVENTS_LOCAL_NAME, TRANSACTION_BUFFER_SNAPSHOT));

public static boolean checkTopicIsEventsNames(TopicName topicName) {
return EVENTS_TOPIC_NAMES.contains(topicName.getLocalName());
String name;
if (topicName.isPartitioned()) {
name = TopicName.get(topicName.getPartitionedTopicName()).getLocalName();
} else {
name = topicName.getLocalName();
}
return EVENTS_TOPIC_NAMES.contains(name);
}
}

0 comments on commit dbcaa98

Please sign in to comment.