Skip to content

Commit

Permalink
add current time into topic name for ReplicatorTest (apache#7092)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiazhai authored May 29, 2020
1 parent ff594b8 commit cd5351f
Showing 1 changed file with 33 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void testConfigChange() throws Exception {
// Run a set of producer tasks to create the topics
List<Future<Void>> results = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/topic-%d", i));
final TopicName dest = TopicName.get(String
.format("persistent://pulsar/ns/topic-%d-%d", System.currentTimeMillis(), i));

results.add(executor.submit(new Callable<Void>() {
@Override
Expand Down Expand Up @@ -206,26 +207,31 @@ public void testConcurrentReplicator() throws Exception {
final String namespace = "pulsar/concurrent";
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2"));
final TopicName topicName = TopicName.get(String.format("persistent://" + namespace + "/topic-%d", 0));
final TopicName topicName = TopicName
.get(String.format("persistent://" + namespace + "/topic-%d-%d", System.currentTimeMillis(), 0));

@Cleanup
PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
PulsarClient client1 = PulsarClient.builder()
.serviceUrl(url1.toString()).statsInterval(0, TimeUnit.SECONDS)
.build();
Producer<byte[]> producer = client1.newProducer().topic(topicName.toString())
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();
producer.close();

PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService().getOrCreateTopic(topicName.toString()).get();
PersistentTopic topic = (PersistentTopic) pulsar1.getBrokerService()
.getOrCreateTopic(topicName.toString()).get();

PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService().getReplicationClient("r3"));
PulsarClientImpl pulsarClient = spy((PulsarClientImpl) pulsar1.getBrokerService()
.getReplicationClient("r3"));
final Method startRepl = PersistentTopic.class.getDeclaredMethod("startReplicator", String.class);
startRepl.setAccessible(true);

Field replClientField = BrokerService.class.getDeclaredField("replicationClients");
replClientField.setAccessible(true);
ConcurrentOpenHashMap<String, PulsarClient> replicationClients = (ConcurrentOpenHashMap<String, PulsarClient>) replClientField
ConcurrentOpenHashMap<String, PulsarClient> replicationClients =
(ConcurrentOpenHashMap<String, PulsarClient>) replClientField
.get(pulsar1.getBrokerService());
replicationClients.put("r3", pulsarClient);

Expand All @@ -242,8 +248,10 @@ public void testConcurrentReplicator() throws Exception {
}
Thread.sleep(3000);

Mockito.verify(pulsarClient, Mockito.times(1)).createProducerAsync(Mockito.any(ProducerConfigurationData.class),
Mockito.any(Schema.class), eq(null));
Mockito.verify(pulsarClient, Mockito.times(1))
.createProducerAsync(
Mockito.any(ProducerConfigurationData.class),
Mockito.any(Schema.class), eq(null));

executor.shutdown();
}
Expand Down Expand Up @@ -400,7 +408,8 @@ public void testFailures() throws Exception {
try {
// 1. Create a consumer using the reserved consumer id prefix "pulsar.repl."

final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/res-cons-id"));
final TopicName dest = TopicName
.get(String.format("persistent://pulsar/ns/res-cons-id-%d", System.currentTimeMillis()));

// Create another consumer using replication prefix as sub id
MessageConsumer consumer = new MessageConsumer(url2, dest, "pulsar.repl.");
Expand All @@ -415,7 +424,8 @@ public void testFailures() throws Exception {
@Test(timeOut = 30000)
public void testReplicatePeekAndSkip() throws Exception {

final TopicName dest = TopicName.get("persistent://pulsar/ns/peekAndSeekTopic");
final TopicName dest = TopicName.get(
String.format("persistent://pulsar/ns/peekAndSeekTopic-%d", System.currentTimeMillis()));

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
Expand All @@ -440,7 +450,8 @@ public void testReplicatorClearBacklog() throws Exception {
// This test is to verify that reset cursor fails on global topic
SortedSet<String> testDests = new TreeSet<String>();

final TopicName dest = TopicName.get("persistent://pulsar/ns/clearBacklogTopic");
final TopicName dest = TopicName
.get(String.format("persistent://pulsar/ns/clearBacklogTopic-%d", System.currentTimeMillis()));
testDests.add(dest.toString());

@Cleanup
Expand Down Expand Up @@ -493,7 +504,8 @@ public void testReplicationForBatchMessages() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");

// Run a set of producer tasks to create the topics
final TopicName dest = TopicName.get(String.format("persistent://pulsar/ns/repltopicbatch-%d", System.nanoTime()));
final TopicName dest = TopicName
.get(String.format("persistent://pulsar/ns/repltopicbatch-%d", System.nanoTime()));

@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest, true);
Expand Down Expand Up @@ -547,7 +559,7 @@ public void testReplicationForBatchMessages() throws Exception {
@Test(timeOut = 30000)
public void testDeleteReplicatorFailure() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/ns/repltopicbatch";
final String topicName = "persistent://pulsar/ns/repltopicbatch-" + System.currentTimeMillis() + "-";
final TopicName dest = TopicName.get(topicName);

@Cleanup
Expand Down Expand Up @@ -588,7 +600,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
@Test(priority = 5, timeOut = 30000)
public void testReplicatorProducerClosing() throws Exception {
log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
final String topicName = "persistent://pulsar/ns/repltopicbatch";
final String topicName = "persistent://pulsar/ns/repltopicbatch-" + System.currentTimeMillis() + "-";
final TopicName dest = TopicName.get(topicName);

@Cleanup
Expand Down Expand Up @@ -686,7 +698,7 @@ public void testResumptionAfterBacklogRelaxed() throws Exception {
*/
@Test(timeOut = 15000)
public void testCloseReplicatorStartProducer() throws Exception {
TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor");
TopicName dest = TopicName.get("persistent://pulsar/ns1/closeCursor-" + System.currentTimeMillis() + "-");
// Producer on r1
@Cleanup
MessageProducer producer1 = new MessageProducer(url1, dest);
Expand Down Expand Up @@ -732,7 +744,7 @@ public void testCloseReplicatorStartProducer() throws Exception {

@Test(timeOut = 30000)
public void verifyChecksumAfterReplication() throws Exception {
final String topicName = "persistent://pulsar/ns/checksumAfterReplication";
final String topicName = "persistent://pulsar/ns/checksumAfterReplication-" + System.currentTimeMillis() + "-";

PulsarClient c1 = PulsarClient.builder().serviceUrl(url1.toString()).build();
Producer<byte[]> p1 = c1.newProducer().topic(topicName)
Expand Down Expand Up @@ -771,8 +783,10 @@ public void testReplicatorOnPartitionedTopic(boolean isPartitionedTopic) throws
log.info("--- Starting ReplicatorTest::{} --- ", methodName);

final String namespace = "pulsar/partitionedNs-" + isPartitionedTopic;
final String persistentTopicName = "persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
final String nonPersistentTopicName = "non-persistent://" + namespace + "/partTopic-" + isPartitionedTopic;
final String persistentTopicName =
"persistent://" + namespace + "/partTopic-" + System.currentTimeMillis() + "-" + isPartitionedTopic;
final String nonPersistentTopicName =
"non-persistent://" + namespace + "/partTopic-" + System.currentTimeMillis() + "-"+ isPartitionedTopic;
BrokerService brokerService = pulsar1.getBrokerService();

admin1.namespaces().createNamespace(namespace);
Expand Down Expand Up @@ -824,7 +838,7 @@ public void testReplicatedCluster() throws Exception {
log.info("--- Starting ReplicatorTest::testReplicatedCluster ---");

final String namespace = "pulsar/global/repl";
final String topicName = String.format("persistent://%s/topic1", namespace);
final String topicName = String.format("persistent://%s/topic1-%d", namespace, System.currentTimeMillis());
admin1.namespaces().createNamespace(namespace);
admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2", "r3"));
admin1.topics().createPartitionedTopic(topicName, 4);
Expand Down

0 comments on commit cd5351f

Please sign in to comment.