Skip to content

Commit

Permalink
Add explicit test case for NonDurable Subscriptions and Broker restart (
Browse files Browse the repository at this point in the history
apache#10723)

Add new tests cases that cover broker restart and Non Durable Subscriptions:
- test all subscription types with NonDurable subscription mode
- test the case of broker restart
  • Loading branch information
eolivelli authored Jun 1, 2021
1 parent 3e21d05 commit ebc8740
Showing 1 changed file with 64 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import java.util.concurrent.TimeUnit;

import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentSubscription;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.assertNotNull;
Expand All @@ -37,6 +39,7 @@
import static org.testng.AssertJUnit.assertTrue;

@Test(groups = "broker-api")
@Slf4j
public class NonDurableSubscriptionTest extends ProducerConsumerBase {

@BeforeMethod
Expand Down Expand Up @@ -75,7 +78,7 @@ public void testNonDurableSubscription() throws Exception {
}
// 3 receive the first 5 messages
for (int i = 0; i < 5; i++) {
Message<String> message = consumer.receive(1, TimeUnit.SECONDS);
Message<String> message = consumer.receive();
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
consumer.acknowledge(message);
Expand All @@ -84,7 +87,7 @@ public void testNonDurableSubscription() throws Exception {
((ConsumerImpl)consumer).getClientCnx().close();
// 5 for non-durable we are going to restart from the next entry
for (int i = 5; i < messageNum; i++) {
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
Message<String> message = consumer.receive();
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
}
Expand Down Expand Up @@ -126,4 +129,63 @@ public void testDeleteInactiveNonPersistentSubscription() throws Exception {
assertNull(nonPersistentSubscription);

}

@DataProvider(name = "subscriptionTypes")
public static Object[][] subscriptionTypes() {
Object[][] result = new Object[SubscriptionType.values().length][];
int i = 0;
for (SubscriptionType type : SubscriptionType.values()) {
result[i++] = new Object[] {type};
}
return result;
}

@Test(dataProvider = "subscriptionTypes")
public void testNonDurableSubscriptionRecovery(SubscriptionType subscriptionType) throws Exception {
log.info("testing {}", subscriptionType);
String topicName = "persistent://my-property/my-ns/nonDurable-sub-recorvery-"+subscriptionType;
// 1 setup producer、consumer
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName)
.create();
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topicName)
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionType(subscriptionType)
.subscriptionName("my-nonDurable-subscriber")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
// 2 send messages
int messageNum = 15;
for (int i = 0; i < messageNum; i++) {
producer.send("message" + i);
}
// 3 receive the first 5 messages
for (int i = 0; i < 5; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
consumer.acknowledge(message);
}
// 4 trigger reconnect
((ConsumerImpl)consumer).getClientCnx().close();

// 5 for non-durable we are going to restart from the next entry
for (int i = 5; i < 10; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
}

// 6 restart broker
restartBroker();

// 7 for non-durable we are going to restart from the next entry
for (int i = 10; i < messageNum; i++) {
Message<String> message = consumer.receive();
assertNotNull(message);
Assert.assertEquals(message.getValue(), "message" + i);
}

}
}

0 comments on commit ebc8740

Please sign in to comment.