Skip to content

Commit

Permalink
[fix][client] Fix reach redeliverCount client can't send batch messag…
Browse files Browse the repository at this point in the history
…es to DLQ (apache#17317)
  • Loading branch information
congbobo184 authored Aug 28, 2022
1 parent 3422ab4 commit 0909853
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1180,6 +1180,7 @@ public void testSendTxnAckMessageToDLQ() throws Exception {
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.enableBatching(false)
.sendTimeout(1, TimeUnit.SECONDS)
.create();

Expand Down Expand Up @@ -1224,4 +1225,69 @@ public void testSendTxnAckMessageToDLQ() throws Exception {

assertEquals(value, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
}

@Test
public void testSendTxnAckBatchMessageToDLQ() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnAckBatchMessageToDLQ";
String subName = "test";
String value1 = "test1";
String value2 = "test2";
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.sendTimeout(1, TimeUnit.SECONDS)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
// consumer can't receive the same message three times
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
.subscriptionName(subName)
.subscribe();

@Cleanup
Consumer<byte[]> deadLetterConsumer = pulsarClient.newConsumer()
.topic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
topic, subName))
.subscriptionType(SubscriptionType.Shared)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
.subscriptionName("test")
.subscribe();

producer.sendAsync(value1.getBytes());
producer.sendAsync(value2.getBytes());
Transaction transaction = pulsarClient.newTransaction().withTransactionTimeout(1, TimeUnit.MINUTES)
.build().get();

Message<byte[]> message = consumer.receive();
assertEquals(value1, new String(message.getValue()));
// consumer receive the batch message one the first time, redeliverCount = 0
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();

transaction.abort().get();

// consumer will receive the batch message two and then receive
// the message one and message two again, redeliverCount = 1
for (int i = 0; i < 3; i ++) {
message = consumer.receive();
}

transaction = pulsarClient.newTransaction().withTransactionTimeout(5, TimeUnit.MINUTES)
.build().get();

assertEquals(value2, new String(message.getValue()));
// consumer receive the batch message two the second time, redeliverCount = 1, also can be received
consumer.acknowledgeAsync(message.getMessageId(), transaction).get();

transaction.abort().get();

// consumer receive the batch message the third time, redeliverCount = 2,
// the message will be sent to DLQ, can't receive
assertNull(consumer.receive(3, TimeUnit.SECONDS));

assertEquals(value1, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
assertEquals(value2, new String(deadLetterConsumer.receive(3, TimeUnit.SECONDS).getValue()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1570,8 +1570,15 @@ void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata,
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
}

if (possibleToDeadLetter != null && possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.put(batchMessage, possibleToDeadLetter);
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put(batchMessage,
possibleToDeadLetter);
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
return;
}
}
}

if (log.isDebugEnabled()) {
Expand Down

0 comments on commit 0909853

Please sign in to comment.