diff --git a/pom.xml b/pom.xml
index 0274245138e6a..29a0ae5dbc83b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,7 +80,7 @@ flexible messaging model and an intuitive client API.
8
8
-
+
**/Test*.java,**/*Test.java,**/*Tests.java,**/*TestCase.java
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index b1a142737c600..85c1bc65fcadd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -419,7 +419,8 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
}
// remove from owned namespace map and ephemeral node from ZK
- final List> futures = Lists.newArrayList();
+ final List> topicFutures = Lists.newArrayList();
+ final List> bundleFutures = Lists.newArrayList();
try {
// firstly remove all topics including system topics
if (!topics.isEmpty()) {
@@ -433,12 +434,12 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
// Distinguish partitioned topic to avoid duplicate deletion of the same schema
- futures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
+ topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
partitionedTopic, true, true));
partitionedTopics.add(partitionedTopic);
}
} else {
- futures.add(pulsar().getAdminClient().topics().deleteAsync(
+ topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
nonPartitionedTopics.add(topic);
}
@@ -459,14 +460,35 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
+ "and non-partitioned-topics:{} in namespace:{}.",
partitionedTopics, nonPartitionedTopics, namespaceName);
}
+
+ final CompletableFuture topicFutureEx =
+ FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse
+ .resume(new RestException((PulsarAdminException) exception.getCause()));
+ } else {
+ log.error("[{}] Failed to remove forcefully owned namespace {}",
+ clientAppId(), namespaceName, exception);
+ asyncResponse.resume(new RestException(exception.getCause()));
+ }
+ return exception;
+ }
+
+ return null;
+ });
+ if (topicFutureEx.join() != null) {
+ return;
+ }
}
+
// forcefully delete namespace bundles
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle bundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then we do not need to delete the bundle
if (pulsar().getNamespaceService().getOwner(bundle).isPresent()) {
- futures.add(pulsar().getAdminClient().namespaces()
+ bundleFutures.add(pulsar().getAdminClient().namespaces()
.deleteNamespaceBundleAsync(namespaceName.toString(), bundle.getBundleRange(), true));
}
}
@@ -476,7 +498,7 @@ protected void internalDeleteNamespaceForcefully(AsyncResponse asyncResponse, bo
return;
}
- FutureUtil.waitForAll(futures).handle((result, exception) -> {
+ FutureUtil.waitForAll(bundleFutures).handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
@@ -1866,7 +1888,7 @@ protected List internalGetAntiAffinityNamespaces(String cluster, String
return namespaces.stream().filter(ns -> {
Optional policies;
try {
- policies = getLocalPolicies().getLocalPolicies(namespaceName);
+ policies = getLocalPolicies().getLocalPolicies(NamespaceName.get(ns));
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
similarity index 96%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 61ac01ad9ed4f..513398fb3e81e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -36,7 +36,6 @@
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -105,7 +104,7 @@
@Slf4j
@Test(groups = "broker-admin")
-public class AdminApiTest2 extends MockedPulsarServiceBaseTest {
+public class AdminApi2Test extends MockedPulsarServiceBaseTest {
private MockedPulsarService mockPulsarSetup;
@@ -1025,9 +1024,11 @@ public void brokerNamespaceIsolationPoliciesUpdateOnTime() throws Exception {
parameters1.put("min_limit", "1");
parameters1.put("usage_threshold", "100");
+ final List primaryList = new ArrayList<>();
+ primaryList.add(brokerName + ".*");
NamespaceIsolationData nsPolicyData1 = NamespaceIsolationData.builder()
.namespaces(Collections.singletonList(ns1Name))
- .primary(Collections.singletonList(brokerName + ".*"))
+ .primary(primaryList)
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
@@ -1576,60 +1577,6 @@ public void testForceDeleteNamespace() throws Exception {
}
}
- @Test
- public void testDistinguishTopicTypeWhenForceDeleteNamespace() throws Exception {
- conf.setForceDeleteNamespaceAllowed(true);
- final String ns = "prop-xyz/distinguish-topic-type-ns";
- final String exNs = "prop-xyz/ex-distinguish-topic-type-ns";
- admin.namespaces().createNamespace(ns, 2);
- admin.namespaces().createNamespace(exNs, 2);
-
- final String p1 = "persistent://" + ns + "/p1";
- final String p5 = "persistent://" + ns + "/p5";
- final String np = "persistent://" + ns + "/np";
-
- admin.topics().createPartitionedTopic(p1, 1);
- admin.topics().createPartitionedTopic(p5, 5);
- admin.topics().createNonPartitionedTopic(np);
-
- final String exNp = "persistent://" + exNs + "/np";
- admin.topics().createNonPartitionedTopic(exNp);
- // insert an invalid topic name
- pulsar.getLocalMetadataStore().put(
- "/managed-ledgers/" + exNs + "/persistent/", "".getBytes(), Optional.empty()).join();
-
- List topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(ns)).get();
- List exTopics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(exNs)).get();
-
- // ensure that the topic list contains all the topics
- List allTopics = new ArrayList<>(Arrays.asList(np, TopicName.get(p1).getPartition(0).toString()));
- for (int i = 0; i < 5; i++) {
- allTopics.add(TopicName.get(p5).getPartition(i).toString());
- }
- Assert.assertEquals(allTopics.stream().filter(t -> !topics.contains(t)).count(), 0);
- Assert.assertTrue(exTopics.contains("persistent://" + exNs + "/"));
- // partition num = p1 + p5 + np
- Assert.assertEquals(topics.size(), 1 + 5 + 1);
- Assert.assertEquals(exTopics.size(), 1 + 1);
-
- admin.namespaces().deleteNamespace(ns, true);
- Arrays.asList(p1, p5, np).forEach(t -> {
- try {
- admin.schemas().getSchemaInfo(t);
- } catch (PulsarAdminException e) {
- // all the normal topics' schemas have been deleted
- Assert.assertEquals(e.getStatusCode(), 404);
- }
- });
-
- try {
- admin.namespaces().deleteNamespace(exNs, true);
- fail("Should fail due to invalid topic");
- } catch (Exception e) {
- //ok
- }
- }
-
@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
@@ -1721,11 +1668,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
- admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
- admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
- admin.topics().createPartitionedTopic(
- "persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
- + "-05c0ded5e9__transaction_pending_ack", 2);
+ admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
// check first create system topics, then normal topic, unlimited even setMaxTopicsPerNamespace
@@ -1735,11 +1678,7 @@ public void testMaxTopicsPerNamespace() throws Exception {
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(brokerUrl.toString()).build());
admin.tenants().createTenant("testTenant", tenantInfo);
admin.namespaces().createNamespace("testTenant/ns1", Sets.newHashSet("test"));
- admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 2);
- admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__transaction_buffer_snapshot", 2);
- admin.topics().createPartitionedTopic(
- "persistent://testTenant/ns1/__transaction_buffer_snapshot-multiTopicsReader"
- + "-05c0ded5e9__transaction_pending_ack", 2);
+ admin.topics().createPartitionedTopic("persistent://testTenant/ns1/__change_events", 6);
for (int i = 0; i < 5; ++i) {
admin.topics().createPartitionedTopic(topic + i, 1);
}
@@ -1910,6 +1849,7 @@ public void testMaxSubPerTopicApi() throws Exception {
@Test(timeOut = 30000)
public void testMaxSubPerTopic() throws Exception {
+ pulsar.getConfiguration().setMaxSubscriptionsPerTopic(0);
final String myNamespace = "prop-xyz/ns" + UUID.randomUUID();
admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
final String topic = "persistent://" + myNamespace + "/testMaxSubPerTopic";
@@ -1917,9 +1857,8 @@ public void testMaxSubPerTopic() throws Exception {
final int maxSub = 2;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, maxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
- Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
- field.setAccessible(true);
- Awaitility.await().until(() -> (int) field.get(persistentTopic) == maxSub);
+ Awaitility.await().until(() ->
+ persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == maxSub);
List> consumerList = new ArrayList<>(maxSub);
for (int i = 0; i < maxSub; i++) {
@@ -1936,7 +1875,8 @@ public void testMaxSubPerTopic() throws Exception {
}
//After removing the restriction, it should be able to create normally
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
- Awaitility.await().until(() -> field.get(persistentTopic) == null);
+ Awaitility.await().until(() ->
+ persistentTopic.getHierarchyTopicPolicies().getMaxSubscriptionsPerTopic().get() == 0);
Consumer> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
@@ -1981,16 +1921,16 @@ public void testMaxSubPerTopicPriority() throws Exception {
final int nsLevelMaxSub = 4;
admin.namespaces().setMaxSubscriptionsPerTopic(myNamespace, nsLevelMaxSub);
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get();
- Field field = PersistentTopic.class.getSuperclass().getDeclaredField("maxSubscriptionsPerTopic");
- field.setAccessible(true);
- Awaitility.await().until(() -> (int) field.get(persistentTopic) == nsLevelMaxSub);
+ Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
+ .getMaxSubscriptionsPerTopic().get() == nsLevelMaxSub);
Consumer> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString())
.subscribe();
consumerList.add(consumer);
assertEquals(consumerList.size(), 3);
//After removing the restriction, it should fail again
admin.namespaces().removeMaxSubscriptionsPerTopic(myNamespace);
- Awaitility.await().until(() -> field.get(persistentTopic) == null);
+ Awaitility.await().until(() -> persistentTopic.getHierarchyTopicPolicies()
+ .getMaxSubscriptionsPerTopic().get() == brokerLevelMaxSub);
try {
client.newConsumer().topic(topic).subscriptionName(UUID.randomUUID().toString()).subscribe();
fail("should fail");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
similarity index 99%
rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
index 9d814662bae26..c74d5ee773318 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v1/V1_AdminApi2Test.java
@@ -73,7 +73,7 @@
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
-public class V1_AdminApiTest2 extends MockedPulsarServiceBaseTest {
+public class V1_AdminApi2Test extends MockedPulsarServiceBaseTest {
private MockedPulsarService mockPulsarSetup;