From f2076b44764ffbbf39e01b3873a6735a90bc3d47 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Tue, 11 Apr 2023 22:50:04 +0800 Subject: [PATCH] [fix][test] Fix flaky testCreateTopicWithZombieReplicatorCursor (#20037) --- .../persistent/PersistentTopicTest.java | 29 ++++++++++++++----- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index c63be7aad01cd..412b8207e34c7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -38,7 +38,9 @@ import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; +import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -52,6 +54,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.client.LedgerHandle; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; @@ -83,6 +86,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; +@Slf4j @Test(groups = "broker") public class PersistentTopicTest extends BrokerTestBase { @@ -558,10 +562,10 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) admin.tenants().updateTenant("prop", tenantInfo); if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Collections.singletonList(remoteCluster)); + admin.topics().setReplicationClusters(topicName, Arrays.asList("test", remoteCluster)); } else { admin.namespaces().setNamespaceReplicationClustersAsync( - namespace, Collections.singleton(remoteCluster)).get(); + namespace, Sets.newHashSet("test", remoteCluster)).get(); } final PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false) @@ -576,16 +580,27 @@ public void testCreateTopicWithZombieReplicatorCursor(boolean topicLevelPolicy) }; assertEquals(getCursors.get(), Collections.singleton(conf.getReplicatorPrefix() + "." + remoteCluster)); + // PersistentTopics#onPoliciesUpdate might happen in different threads, so there might be a race between two + // updates of the replication clusters. So here we sleep for a while to reduce the flakiness. + Thread.sleep(100); + + // Configure the local cluster to avoid the topic being deleted in PersistentTopics#checkReplication if (topicLevelPolicy) { - admin.topics().setReplicationClusters(topicName, Collections.emptyList()); + admin.topics().setReplicationClusters(topicName, Collections.singletonList("test")); } else { - admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.emptySet()).get(); + admin.namespaces().setNamespaceReplicationClustersAsync(namespace, Collections.singleton("test")).get(); } admin.clusters().deleteCluster(remoteCluster); // Now the cluster and its related policy has been removed but the replicator cursor still exists - topic.initialize().get(3, TimeUnit.SECONDS); - Awaitility.await().atMost(3, TimeUnit.SECONDS) - .until(() -> !topic.getManagedLedger().getCursors().iterator().hasNext()); + Awaitility.await().atMost(Duration.ofSeconds(10)).until(() -> { + log.info("Before initialize..."); + try { + topic.initialize().get(3, TimeUnit.SECONDS); + } catch (ExecutionException e) { + log.warn("Failed to initialize: {}", e.getCause().getMessage()); + } + return !topic.getManagedLedger().getCursors().iterator().hasNext(); + }); } }