Skip to content

Commit

Permalink
[fix][flaky-test]AdminApi2Test.testDeleteNamespace (apache#17157)
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode authored Aug 19, 2022
1 parent 694aa13 commit de6948c
Showing 1 changed file with 49 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.admin;

import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.common.naming.SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -45,6 +47,7 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import javax.ws.rs.NotAcceptableException;
import javax.ws.rs.core.Response.Status;
Expand Down Expand Up @@ -102,6 +105,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
Expand Down Expand Up @@ -1422,6 +1426,9 @@ public void testDeleteNamespace() throws Exception {
admin.topics().createPartitionedTopic(topic, 10);
assertFalse(admin.topics().getList(namespace).isEmpty());

// Wait for change event topic and compaction create finish.
awaitChangeEventTopicAndCompactionCreateFinish(namespace, String.format("persistent://%s", topic));

try {
admin.namespaces().deleteNamespace(namespace, false);
fail("should have failed due to namespace not empty");
Expand All @@ -1445,7 +1452,49 @@ public void testDeleteNamespace() throws Exception {

final String bundleDataPath = "/loadbalance/bundle-data/" + namespace;
assertFalse(pulsar.getLocalMetadataStore().exists(bundleDataPath).join());
}

private void awaitChangeEventTopicAndCompactionCreateFinish(String ns, String topic) throws Exception {
if (!pulsar.getConfiguration().isSystemTopicEnabled()){
return;
}
// Trigger change event topic create.
SubscribeRate subscribeRate = new SubscribeRate(-1, 60);
admin.topicPolicies().setSubscribeRate(topic, subscribeRate);
// Wait for change event topic and compaction create finish.
String allowAutoTopicCreationType = pulsar.getConfiguration().getAllowAutoTopicCreationType();
int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions();
ArrayList<String> expectChangeEventTopics = new ArrayList<>();
if ("non-partitioned".equals(allowAutoTopicCreationType)){
String t = String.format("persistent://%s/%s", ns, NAMESPACE_EVENTS_LOCAL_NAME);
expectChangeEventTopics.add(t);
} else {
for (int i = 0; i < defaultNumPartitions; i++){
String t = String.format("persistent://%s/%s-partition-%s", ns, NAMESPACE_EVENTS_LOCAL_NAME, i);
expectChangeEventTopics.add(t);
}
}
Awaitility.await().until(() -> {
boolean finished = true;
for (String changeEventTopicName : expectChangeEventTopics){
CompletableFuture<Optional<Topic>> completableFuture = pulsar.getBrokerService().getTopic(changeEventTopicName, false);
if (completableFuture == null){
finished = false;
}
Optional<Topic> optionalTopic = completableFuture.get();
if (!optionalTopic.isPresent()){
finished = false;
}
PersistentTopic changeEventTopic = (PersistentTopic) optionalTopic.get();
if (!changeEventTopic.isCompactionEnabled()){
continue;
}
if (!changeEventTopic.getSubscriptions().containsKey(COMPACTION_SUBSCRIPTION)){
finished = false;
}
}
return finished;
});
}

@Test
Expand Down

0 comments on commit de6948c

Please sign in to comment.