diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java new file mode 100644 index 0000000000000..ff03e425ccbcd --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import java.util.UUID; + +/** + * Holds util methods used in test. + */ +public class BrokerTestUtil { + // Generate unique name for different test run. + public static String newUniqueName(String prefix) { + return prefix + "-" + UUID.randomUUID(); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 826116ca74ed6..8c8ddff895f9b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -60,6 +60,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedgerInfo; import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ConfigHelper; import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.PulsarService; @@ -2621,7 +2622,7 @@ public void testCreateAndDeleteNamespaceWithBundles() throws Exception { Sets.newHashSet("test", "usw")); admin.tenants().updateTenant("prop-xyz", tenantInfo); - String ns = "prop-xyz/ns-" + System.nanoTime(); + String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns"); admin.namespaces().createNamespace(ns, 24); admin.namespaces().deleteNamespace(ns); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java index d3f5d7ba134a3..5c08fd58277d4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/IncrementPartitionsTest.java @@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.admin.AdminApiTest.MockedPulsarService; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; import org.apache.pulsar.client.api.Consumer; @@ -107,7 +108,8 @@ public void testIncrementPartitionsOfTopic() throws Exception { @Test public void testIncrementPartitionsWithNoSubscriptions() throws Exception { - final String partitionedTopicName = "persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime(); + final String partitionedTopicName = + BrokerTestUtil.newUniqueName("persistent://prop-xyz/use/ns1/test-topic"); admin.topics().createPartitionedTopic(partitionedTopicName, 1); assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, 1); @@ -133,7 +135,8 @@ public void testIncrementPartitionsWithNoSubscriptions() throws Exception { @Test public void testIncrementPartitionsWithReaders() throws Exception { - TopicName partitionedTopicName = TopicName.get("persistent://prop-xyz/use/ns1/test-topic-" + System.nanoTime()); + TopicName partitionedTopicName = TopicName.get( + BrokerTestUtil.newUniqueName("persistent://prop-xyz/use/ns1/test-topic")); admin.topics().createPartitionedTopic(partitionedTopicName.toString(), 1); assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName.toString()).partitions, 1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index b7e3e522850c9..ce31fd1edd2dc 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -20,6 +20,7 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.PulsarClientException; import static org.mockito.ArgumentMatchers.any; @@ -1140,7 +1141,7 @@ public void testDeleteNamespace() throws Exception { */ @Test public void testForceDeleteNamespace() throws Exception { - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); String topic = namespace + "/topic"; admin.namespaces().createNamespace(namespace, 100); @@ -1360,7 +1361,7 @@ private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) throws @Test public void testDeleteNonPartitionedTopicMultipleTimes() throws Exception { - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); String topic = namespace + "/topic"; admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster)); @@ -1388,7 +1389,7 @@ public void testDeleteNonPartitionedTopicMultipleTimes() throws Exception { @Test public void testDeletePartitionedTopicMultipleTimes() throws Exception { - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); String topic = namespace + "/topic"; admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster)); @@ -1417,7 +1418,7 @@ public void testDeletePartitionedTopicMultipleTimes() throws Exception { @Test public void testRetentionPolicyValidation() throws Exception { - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); admin.namespaces().createNamespace(namespace, Sets.newHashSet(testLocalCluster)); @@ -1596,7 +1597,7 @@ public void testRetentionPolicyValidationAsPartOfAllPolicies() throws Exception public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarClientException { pulsar.getConfiguration().setAuthorizationEnabled(false); pulsar.getConfiguration().setTopicLevelPoliciesEnabled(false); - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); String topic = namespace + "/test-subscription-enabled"; admin.namespaces().createNamespace(namespace); Set subscriptionTypes = new HashSet<>(); @@ -1666,7 +1667,7 @@ public void testSubscriptionTypesEnabled() throws PulsarAdminException, PulsarCl private void assertValidRetentionPolicyAsPartOfAllPolicies(Policies policies, int retentionTimeInMinutes, int retentionSizeInMB) throws PulsarAdminException { - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); RetentionPolicies retention = new RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB); policies.retention_policies = retention; admin.namespaces().createNamespace(namespace, policies); @@ -1675,7 +1676,7 @@ private void assertValidRetentionPolicyAsPartOfAllPolicies(Policies policies, in private void assertInvalidRetentionPolicyAsPartOfAllPolicies(Policies policies, int retentionTimeInMinutes, int retentionSizeInMB) { - String namespace = this.testTenant + "/namespace-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName(this.testTenant + "/namespace"); try { RetentionPolicies retention = new RetentionPolicies(retentionTimeInMinutes, retentionSizeInMB); policies.retention_policies = retention; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index ba9ccb13894be..3894f7c742908 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -40,6 +40,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo; import org.apache.bookkeeper.util.StringUtils; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -309,10 +310,10 @@ public void testTopicWithWildCardChar() throws Exception { @Test public void testDeleteTopicWithMissingData() throws Exception { - String namespace = "prop/usc-" + System.nanoTime(); + String namespace = BrokerTestUtil.newUniqueName("prop/usc"); admin.namespaces().createNamespace(namespace); - String topic = namespace + "/my-topic-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName(namespace + "/my-topic"); @Cleanup PulsarClient client = PulsarClient.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java index f1a3017e9f6e7..f230c713f7877 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PeerReplicatorTest.java @@ -28,6 +28,7 @@ import java.util.LinkedHashSet; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Producer; @@ -196,7 +197,7 @@ public void testPeerClusterInReplicationClusterListChange() throws Exception { admin1.clusters().updatePeerClusterNames("r3", null); final String serviceUrl = pulsar3.getBrokerServiceUrl(); - final String namespace1 = "pulsar/global/peer-change-repl-ns-" + System.nanoTime(); + final String namespace1 = BrokerTestUtil.newUniqueName("pulsar/global/peer-change-repl-ns"); admin1.namespaces().createNamespace(namespace1); // add replication cluster admin1.namespaces().setNamespaceReplicationClusters(namespace1, Sets.newHashSet("r1")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java index dd900174ed01f..d1595fa3f2fb5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentFailoverE2ETest.java @@ -34,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -277,7 +278,7 @@ public void testSimpleConsumerEventsWithPartition() throws Exception { int numPartitions = 4; - final String topicName = "persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition-" + System.nanoTime(); + final String topicName = BrokerTestUtil.newUniqueName("persistent://prop/use/ns-abc/testSimpleConsumerEventsWithPartition"); final TopicName destName = TopicName.get(topicName); final String subName = "sub1"; final int numMsgs = 100; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java index 9cccdd56c5056..143625beebb80 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java @@ -52,6 +52,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.CursorAlreadyClosedException; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; import org.apache.pulsar.broker.service.persistent.PersistentReplicator; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -127,7 +128,7 @@ public void testConfigChange() throws Exception { List> results = Lists.newArrayList(); for (int i = 0; i < 10; i++) { final TopicName dest = TopicName.get(String - .format("persistent://pulsar/ns/topic-%d-%d", System.currentTimeMillis(), i)); + .format(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/topic-" + i))); results.add(executor.submit(new Callable() { @Override @@ -225,11 +226,11 @@ public void testConcurrentReplicator() throws Exception { log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---"); - final String namespace = "pulsar/concurrent"; + final String namespace = BrokerTestUtil.newUniqueName("pulsar/concurrent"); admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); final TopicName topicName = TopicName - .get(String.format("persistent://" + namespace + "/topic-%d-%d", System.currentTimeMillis(), 0)); + .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic")); @Cleanup PulsarClient client1 = PulsarClient.builder() @@ -290,7 +291,7 @@ public void testReplication(String namespace) throws Exception { // runtime. // Run a set of producer tasks to create the topics final TopicName dest = TopicName - .get(String.format("persistent://%s/repltopic-%d", namespace, System.nanoTime())); + .get(BrokerTestUtil.newUniqueName("persistent://" + namespace + "/repltopic")); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -369,7 +370,7 @@ public void testReplicationOverrides() throws Exception { // Run a set of producer tasks to create the topics for (int i = 0; i < 10; i++) { final TopicName dest = TopicName - .get(String.format("persistent://pulsar/ns/repltopic-%d", System.nanoTime())); + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopic")); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -430,7 +431,7 @@ public void testFailures() throws Exception { // 1. Create a consumer using the reserved consumer id prefix "pulsar.repl." final TopicName dest = TopicName - .get(String.format("persistent://pulsar/ns/res-cons-id-%d", System.currentTimeMillis())); + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/res-cons-id-")); // Create another consumer using replication prefix as sub id MessageConsumer consumer = new MessageConsumer(url2, dest, "pulsar.repl."); @@ -446,7 +447,7 @@ public void testFailures() throws Exception { public void testReplicatePeekAndSkip() throws Exception { final TopicName dest = TopicName.get( - String.format("persistent://pulsar/ns/peekAndSeekTopic-%d", System.currentTimeMillis())); + BrokerTestUtil.newUniqueName("persistent://pulsar/ns/peekAndSeekTopic")); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -472,7 +473,7 @@ public void testReplicatorClearBacklog() throws Exception { SortedSet testDests = new TreeSet(); final TopicName dest = TopicName - .get(String.format("persistent://pulsar/ns/clearBacklogTopic-%d", System.currentTimeMillis())); + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/clearBacklogTopic")); testDests.add(dest.toString()); @Cleanup @@ -502,7 +503,7 @@ public void testResetCursorNotFail() throws Exception { // This test is to verify that reset cursor fails on global topic final TopicName dest = TopicName - .get(String.format("persistent://pulsar/ns/resetrepltopic-%d", System.nanoTime())); + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/resetrepltopic")); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -526,7 +527,7 @@ public void testReplicationForBatchMessages() throws Exception { // Run a set of producer tasks to create the topics final TopicName dest = TopicName - .get(String.format("persistent://pulsar/ns/repltopicbatch-%d", System.nanoTime())); + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch")); @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest, true); @@ -580,7 +581,7 @@ public void testReplicationForBatchMessages() throws Exception { @Test(timeOut = 30000) public void testDeleteReplicatorFailure() throws Exception { log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---"); - final String topicName = "persistent://pulsar/ns/repltopicbatch-" + System.currentTimeMillis() + "-"; + final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"); final TopicName dest = TopicName.get(topicName); @Cleanup @@ -621,7 +622,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { @Test(priority = 5, timeOut = 30000) public void testReplicatorProducerClosing() throws Exception { log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---"); - final String topicName = "persistent://pulsar/ns/repltopicbatch-" + System.currentTimeMillis() + "-"; + final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/repltopicbatch"); final TopicName dest = TopicName.get(topicName); @Cleanup @@ -662,7 +663,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { Thread.sleep(200); TopicName dest = TopicName - .get(String.format("persistent://pulsar/ns1/%s-%d", policy, System.currentTimeMillis())); + .get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/%s-" + policy)); // Producer on r1 @Cleanup @@ -721,7 +722,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception { */ @Test(timeOut = 15000) public void testCloseReplicatorStartProducer() throws Exception { - TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor-" + System.currentTimeMillis() + "-"); + TopicName dest = TopicName.get(BrokerTestUtil.newUniqueName("persistent://pulsar/ns1/closeCursor")); // Producer on r1 @Cleanup MessageProducer producer1 = new MessageProducer(url1, dest); @@ -767,7 +768,7 @@ public void testCloseReplicatorStartProducer() throws Exception { @Test(timeOut = 30000) public void verifyChecksumAfterReplication() throws Exception { - final String topicName = "persistent://pulsar/ns/checksumAfterReplication-" + System.currentTimeMillis() + "-"; + final String topicName = BrokerTestUtil.newUniqueName("persistent://pulsar/ns/checksumAfterReplication"); PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build(); Producer p1 = c1.newProducer().topic(topicName) @@ -805,11 +806,9 @@ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws log.info("--- Starting ReplicatorTest::{} --- ", methodName); - final String namespace = "pulsar/partitionedNs-" + isPartitionedTopic; - final String persistentTopicName = - "persistent://" + namespace + "/partTopic-" + System.currentTimeMillis() + "-" + isPartitionedTopic; - final String nonPersistentTopicName = - "non-persistent://" + namespace + "/partTopic-" + System.currentTimeMillis() + "-"+ isPartitionedTopic; + final String namespace = BrokerTestUtil.newUniqueName("pulsar/partitionedNs-" + isPartitionedTopic); + final String persistentTopicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/partTopic-" + isPartitionedTopic); + final String nonPersistentTopicName = BrokerTestUtil.newUniqueName("non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic); BrokerService brokerService = pulsar1.getBrokerService(); admin1.namespaces().createNamespace(namespace); @@ -852,7 +851,7 @@ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws } assertTrue(e.getCause() instanceof NamingException); } - + } @Test @@ -861,7 +860,7 @@ public void testReplicatedCluster() throws Exception { log.info("--- Starting ReplicatorTest::testReplicatedCluster ---"); final String namespace = "pulsar/global/repl"; - final String topicName = String.format("persistent://%s/topic1-%d", namespace, System.currentTimeMillis()); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1"); admin1.namespaces().createNamespace(namespace); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); admin1.topics().createPartitionedTopic(topicName, 4); @@ -908,8 +907,8 @@ public void testUpdateGlobalTopicPartition() throws Exception { final String cluster1 = pulsar1.getConfig().getClusterName(); final String cluster2 = pulsar2.getConfig().getClusterName(); - final String namespace = "pulsar/ns-" + System.nanoTime(); - final String topicName = "persistent://" + namespace + "/topic1"; + final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns"); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/topic1"); int startPartitions = 4; int newPartitions = 8; final String subscriberName = "sub1"; @@ -960,9 +959,9 @@ public void testTopicReplicatedAndProducerCreate(String topicPrefix, String topi final String cluster1 = pulsar1.getConfig().getClusterName(); final String cluster2 = pulsar2.getConfig().getClusterName(); - final String namespace = "pulsar/ns-" + System.nanoTime(); - final String partitionedTopicName = topicPrefix + namespace + topicName + "-partitioned"; - final String nonPartitionedTopicName = topicPrefix + namespace + topicName + "-non-partitioned"; + final String namespace = BrokerTestUtil.newUniqueName("pulsar/ns"); + final String partitionedTopicName = BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + "-partitioned"); + final String nonPartitionedTopicName = BrokerTestUtil.newUniqueName(topicPrefix + namespace + topicName + "-non-partitioned"); final int startPartitions = 4; admin1.namespaces().createNamespace(namespace, Sets.newHashSet(cluster1, cluster2)); admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3")); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java index 799d7f1123612..804dfbbb3833a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/DelayedDeliveryTest.java @@ -34,6 +34,7 @@ import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; @@ -71,7 +72,7 @@ public void cleanup() throws Exception { @Test public void testDelayedDelivery() throws Exception { - String topic = "testNegativeAcks-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("testNegativeAcks"); @Cleanup Consumer failoverConsumer = pulsarClient.newConsumer(Schema.STRING) @@ -126,7 +127,7 @@ public void testDelayedDelivery() @Test public void testInterleavedMessages() throws Exception { - String topic = "testInterleavedMessages-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("testInterleavedMessages"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) @@ -178,7 +179,7 @@ public void testInterleavedMessages() @Test public void testEverythingFilteredInMultipleReads() throws Exception { - String topic = "testEverythingFilteredInMultipleReads-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("testEverythingFilteredInMultipleReads"); @Cleanup Consumer sharedConsumer = pulsarClient.newConsumer(Schema.STRING) @@ -227,7 +228,7 @@ public void testEverythingFilteredInMultipleReads() @Test public void testDelayedDeliveryWithMultipleConcurrentReadEntries() throws Exception { - String topic = "persistent://public/default/testDelayedDelivery-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testDelayedDelivery"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) @@ -289,7 +290,7 @@ public void testDelayedDeliveryWithMultipleConcurrentReadEntries() @Test public void testOrderingDispatch() throws PulsarClientException { - String topic = "persistent://public/default/testOrderingDispatch-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("persistent://public/default/testOrderingDispatch"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java index d284254b82305..192f8fe730e32 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionConfigTest.java @@ -23,6 +23,7 @@ import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.PulsarClientException; @@ -49,7 +50,7 @@ public void cleanup() throws Exception { @Test public void createReplicatedSubscription() throws Exception { this.conf.setEnableReplicatedSubscriptions(true); - String topic = "createReplicatedSubscription-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("createReplicatedSubscription"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) @@ -71,7 +72,7 @@ public void createReplicatedSubscription() throws Exception { @Test public void upgradeToReplicatedSubscription() throws Exception { this.conf.setEnableReplicatedSubscriptions(true); - String topic = "upgradeToReplicatedSubscription-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscription"); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) @@ -97,7 +98,7 @@ public void upgradeToReplicatedSubscription() throws Exception { @Test public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception { this.conf.setEnableReplicatedSubscriptions(true); - String topic = "upgradeToReplicatedSubscriptionAfterRestart-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("upgradeToReplicatedSubscriptionAfterRestart"); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) @@ -125,7 +126,7 @@ public void upgradeToReplicatedSubscriptionAfterRestart() throws Exception { @Test public void testDisableReplicatedSubscriptions() throws Exception { this.conf.setEnableReplicatedSubscriptions(false); - String topic = "disableReplicatedSubscriptions-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("disableReplicatedSubscriptions"); Consumer consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) .subscriptionName("sub") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java index a1c9a001c549d..c3a7324943ca2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleTypedProducerConsumerTest.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.schema.SchemaRegistry; import org.apache.pulsar.broker.service.schema.exceptions.InvalidSchemaDataException; import org.apache.pulsar.client.api.schema.GenericRecord; @@ -601,7 +602,7 @@ public void testAutoBytesProducer() throws Exception { @Test public void testMessageBuilderLoadConf() throws Exception { - String topic = "my-topic-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("my-topic"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java index 2b634118abde5..1bec4d235674b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.service.Dispatcher; import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; @@ -222,7 +223,7 @@ public void testBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionT log.info("-- Starting {} test --", methodName); final String namespace = "my-property/throttling_ns"; - final String topicName = "persistent://" + namespace + "/throttlingAll-" + System.nanoTime(); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll"); final String subName = "my-subscriber-name-" + subscription; final int byteRate = 100; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java index a81f6a1f5235b..af266305c4c26 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ConsumerDedupPermitsUpdate.java @@ -25,6 +25,7 @@ import lombok.Cleanup; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -65,7 +66,7 @@ public Object[][] combinations() { @Test(timeOut = 30000, dataProvider = "combinations") public void testConsumerDedup(boolean batchingEnabled, int receiverQueueSize) throws Exception { - String topic = "persistent://my-property/my-ns/my-topic-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java index 6c9de8b64eda3..366695a3848a2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/NegativeAcksTest.java @@ -28,6 +28,7 @@ import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; @@ -103,7 +104,7 @@ public void testNegativeAcks(boolean batching, boolean usePartitions, Subscripti throws Exception { log.info("Test negative acks batching={} partitions={} subType={} negAckDelayMs={}", batching, usePartitions, subscriptionType, negAcksDelayMillis); - String topic = "testNegativeAcks-" + System.nanoTime(); + String topic = BrokerTestUtil.newUniqueName("testNegativeAcks"); @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.STRING) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java index 4845c2d8e1f2b..8f433d9328479 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/websocket/proxy/ProxyPublishConsumeTest.java @@ -50,6 +50,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerAccessMode; import org.apache.pulsar.client.api.ProducerConsumerBase; @@ -266,7 +267,7 @@ public void socketTestEndOfTopic() throws Exception { public void unsubscribeTest() throws Exception { final String namespace = "my-property/my-ns"; final String topic = namespace + "/" + "my-topic7"; - final String topicName = "persistent://" + topic + System.nanoTime(); + final String topicName = BrokerTestUtil.newUniqueName("persistent://" + topic); admin.topics().createPartitionedTopic(topicName, 3); final String subscription = "my-sub";