Skip to content

Commit

Permalink
[fix][client] Messages with inconsistent consumer epochs are not filt…
Browse files Browse the repository at this point in the history
…ered when using batch receive and trigger timeout. (apache#17318)
  • Loading branch information
shibd authored Oct 11, 2022
1 parent c5ae308 commit 12e78b2
Show file tree
Hide file tree
Showing 6 changed files with 312 additions and 179 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
Expand Down Expand Up @@ -193,4 +194,42 @@ public void testShouldMaintainOrderForIndividualTopicInMultiTopicsConsumer()
}
Assert.assertEquals(numPartitions * numMessages, receivedCount);
}

@Test
public void testBatchReceiveAckTimeout()
throws PulsarAdminException, PulsarClientException {
String topicName = newTopicName();
int numPartitions = 2;
int numMessages = 100000;
admin.topics().createPartitionedTopic(topicName, numPartitions);

@Cleanup
Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
.topic(topicName)
.enableBatching(false)
.blockIfQueueFull(true)
.create();

@Cleanup
Consumer<Long> consumer = pulsarClient
.newConsumer(Schema.INT64)
.topic(topicName)
.receiverQueueSize(numMessages)
.batchReceivePolicy(
BatchReceivePolicy.builder().maxNumMessages(1).timeout(2, TimeUnit.SECONDS).build()
).ackTimeout(1000, TimeUnit.MILLISECONDS)
.subscriptionName(methodName)
.subscribe();

producer.newMessage()
.value(1l)
.send();

// first batch receive
Assert.assertEquals(consumer.batchReceive().size(), 1);
// Not ack, trigger redelivery this message.
Awaitility.await().untilAsserted(() -> {
Assert.assertEquals(consumer.batchReceive().size(), 1);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import java.lang.reflect.Field;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
Expand All @@ -33,17 +32,17 @@
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -226,13 +225,15 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
final String topic = "testDoNotRedeliveryMarkDeleteMessages";
final String subName = "my-sub";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Key_Shared)
.ackTimeout(1, TimeUnit.SECONDS)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
Expand Down Expand Up @@ -261,12 +262,15 @@ public void testDoNotRedeliveryMarkDeleteMessages() throws PulsarClientException
public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testRedeliveryAddEpoch";
final String subName = "my-sub";
ConsumerBase<String> consumer = ((ConsumerBase<String>) pulsarClient.newConsumer(Schema.STRING)

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand All @@ -275,14 +279,9 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";
producer.send(test1);

PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService().getTopics()
.get(TopicName.get("persistent://public/default/" + topic).toString()).get().get();
PersistentDispatcherSingleActiveConsumer persistentDispatcherSingleActiveConsumer =
(PersistentDispatcherSingleActiveConsumer) persistentTopic.getSubscription(subName).getDispatcher();

consumer.setConsumerEpoch(1);
producer.send(test1);
Message<String> message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
consumer.redeliverUnacknowledgedMessages();
Expand All @@ -309,39 +308,136 @@ public void testRedeliveryAddEpoch(boolean enableBatch) throws Exception{
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);

Field field = consumer.getClass().getDeclaredField("connectionHandler");
field.setAccessible(true);
ConnectionHandler connectionHandler = (ConnectionHandler) field.get(consumer);

field = connectionHandler.getClass().getDeclaredField("CLIENT_CNX_UPDATER");
field.setAccessible(true);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

((ConsumerImpl<String>) consumer).grabCnx();
consumer.grabCnx();

message = consumer.receive(3, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(message.getValue(), test3);
}

@Test(dataProvider = "enableBatch")
public void testRedeliveryAddEpochAndPermits(boolean enableBatch) throws Exception {
final String topic = "testRedeliveryAddEpochAndPermits";
final String subName = "my-sub";
// set receive queue size is 4, and first send 4 messages,
// then call redeliver messages, assert receive msg num.
int receiveQueueSize = 4;

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.receiverQueueSize(receiveQueueSize)
.autoScaledReceiverQueueSizeEnabled(false)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

consumer.setConsumerEpoch(1);
for (int i = 0; i < receiveQueueSize; i++) {
producer.send("pulsar" + i);
}
assertNull(consumer.receive(1, TimeUnit.SECONDS));

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < receiveQueueSize; i++) {
Message<String> msg = consumer.receive();
assertEquals("pulsar" + i, msg.getValue());
}
}

@Test(dataProvider = "enableBatch")
public void testBatchReceiveRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testBatchReceiveRedeliveryAddEpoch";
final String subName = "my-sub";

@Cleanup
ConsumerImpl<String> consumer = ((ConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(1000, TimeUnit.MILLISECONDS).build())
.subscriptionType(SubscriptionType.Failover)
.subscribe());

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

String test1 = "Pulsar1";
String test2 = "Pulsar2";
String test3 = "Pulsar3";

consumer.setConsumerEpoch(1);
producer.send(test1);

Messages<String> messages;
Message<String> message;

messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
consumer.acknowledgeCumulativeAsync(message).get();
assertEquals(message.getValue(), test1);

consumer.setConsumerEpoch(3);
producer.send(test2);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);
consumer.redeliverUnacknowledgedMessages();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test2);
consumer.acknowledgeCumulativeAsync(message).get();

consumer.setConsumerEpoch(6);
producer.send(test3);
messages = consumer.batchReceive();
assertEquals(messages.size(), 0);

ConnectionHandler connectionHandler = consumer.getConnectionHandler();
connectionHandler.cnx().channel().close();

consumer.grabCnx();
messages = consumer.batchReceive();
assertEquals(messages.size(), 1);
message = messages.iterator().next();
assertEquals(message.getValue(), test3);
}

@DataProvider(name = "enableBatch")
public static Object[][] enableBatch() {
return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
}


@Test(dataProvider = "enableBatch")
public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exception{
final String topic = "testMultiConsumerRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
Expand Down Expand Up @@ -382,4 +478,66 @@ public void testMultiConsumerRedeliveryAddEpoch(boolean enableBatch) throws Exce
message = consumer.receive(5, TimeUnit.SECONDS);
assertNull(message);
}

@Test(dataProvider = "enableBatch", invocationCount = 10)
public void testMultiConsumerBatchRedeliveryAddEpoch(boolean enableBatch) throws Exception{

final String topic = "testMultiConsumerBatchRedeliveryAddEpoch";
final String subName = "my-sub";
admin.topics().createPartitionedTopic(topic, 5);
final int messageNumber = 50;

@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
.batchReceivePolicy(BatchReceivePolicy.builder().timeout(2, TimeUnit.SECONDS).build())
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.subscribe();

@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(enableBatch)
.create();

for (int i = 0; i < messageNumber; i++) {
producer.send("" + i);
}

int receiveNum = 0;
while (receiveNum < messageNumber) {
receiveNum += consumer.batchReceive().size();
}

// redeliverUnacknowledgedMessages once
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 1);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);

// redeliverUnacknowledgedMessages twice
consumer.redeliverUnacknowledgedMessages();

receiveNum = 0;
while (receiveNum < messageNumber) {
Messages<String> messages = consumer.batchReceive();
receiveNum += messages.size();
for (Message<String> message : messages) {
assertEquals((((MessageImpl)((TopicMessageImpl) message).getMessage())).getConsumerEpoch(), 2);
}
}

// can't receive message again
assertEquals(consumer.batchReceive().size(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,7 @@ protected void notifyPendingBatchReceivedCallBack() {

reentrantLock.lock();
try {
notifyPendingBatchReceivedCallBack(opBatchReceive);
notifyPendingBatchReceivedCallBack(opBatchReceive.future);
} finally {
reentrantLock.unlock();
}
Expand Down Expand Up @@ -941,7 +941,7 @@ private OpBatchReceive<T> nextBatchReceive() {
return opBatchReceive;
}

protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatchReceive) {
protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Expand All @@ -953,8 +953,7 @@ protected final void notifyPendingBatchReceivedCallBack(OpBatchReceive<T> opBatc
}
msgPeeked = incomingMessages.peek();
}

completePendingBatchReceive(opBatchReceive.future, messages);
completePendingBatchReceive(batchReceiveFuture, messages);
}

protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
Expand Down Expand Up @@ -1182,7 +1181,7 @@ protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
|| getSubType() == CommandSubscribe.SubType.Exclusive)
&& message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
&& message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
log.warn("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+ "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
message.release();
message.recycle();
Expand Down
Loading

0 comments on commit 12e78b2

Please sign in to comment.