Skip to content

Commit

Permalink
Change un-ack messages start tracking behavior (apache#3079)
Browse files Browse the repository at this point in the history
### Motivation
#### Expected behavior

User process the same message many times but failed, if user set a dead letter policy, when message process times exceed the max redelivery count in dead letter policy, message will send to the dead letter topic.

#### Actual behavior

When a consumer subscribe a topic, but wait a while then start receive messages, but messages already send to dead letter topic.

#### Steps to reproduce

Here is the code to reproduce

```java
public class RedeliveryIssue {

    public static void main(String[] args) throws PulsarClientException, InterruptedException {

        final String topic = "my-topic";

        PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build();

        Consumer<byte[]> consumer = client.newConsumer()
                .topic(topic)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName(UUID.randomUUID().toString())
                .ackTimeout(3, TimeUnit.SECONDS)
                .deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(2).build())
                .subscribe();

        Producer<byte[]> producer = client.newProducer()
                .topic(topic)
                .create();

        producer.send(("a message").getBytes());

        // wait a while, message will send to dead letter topic
        Thread.sleep(10000L);

        do {
            // can't receive message
            Message<byte[]> msg = consumer.receive();
            System.out.println(new String(msg.getValue()));
        } while (true);
    }
}
```

#### System configuration
**Pulsar version**: 2.2.0

### Modifications

Remove un-ack message tracking on message received.
Add un-ack message tracking on consumer call receive

### Result

UT passed
  • Loading branch information
codelipenghui authored and sijie committed Dec 3, 2018
1 parent 8cd900c commit 5d06b1e
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public class DeadLetterTopicTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -256,4 +257,32 @@ public void testDeadLetterTopicByCustomTopicName() throws Exception {
assertNull(checkMessage);
checkConsumer.close();
}

/**
* issue https://github.com/apache/pulsar/issues/3077
*/
@Test(timeOut = 200000)
public void testDeadLetterWithoutConsumerReceiveImmediately() throws PulsarClientException, InterruptedException {
final String topic = "persistent://my-property/my-ns/dead-letter-topic-without-consumer-receive-immediately";

Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("my-subscription")
.ackTimeout(1, TimeUnit.SECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(1).build())
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.create();

producer.send(("a message").getBytes());

// Wait a while, message should not be send to DLQ
Thread.sleep(5000L);

Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
assertNotNull(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,11 @@ public void testCheckUnAcknowledgedMessageTimer() throws PulsarClientException,

Thread.sleep((long) (ackTimeOutMillis * 1.1));

for (int i = 0; i < totalMessages - 1; i++) {
for (int i = 0; i < totalMessages; i++) {
Message<byte[]> msg = consumer.receive();
consumer.acknowledge(msg);
if (i != totalMessages - 1) {
consumer.acknowledge(msg);
}
}

assertEquals(consumer.getUnAckedMessageTracker().size(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,7 @@ protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
return interceptMsg;
Expand Down Expand Up @@ -325,6 +326,7 @@ protected CompletableFuture<Message<T>> internalReceiveAsync() {
if (message == null && conf.getReceiverQueueSize() == 0) {
sendFlowPermitsToBroker(cnx(), 1);
} else if (message != null) {
trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
messageProcessed(interceptMsg);
result.complete(interceptMsg);
Expand Down Expand Up @@ -385,6 +387,7 @@ protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarCl
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
trackMessage(message);
Message<T> interceptMsg = beforeConsume(message);
if (interceptMsg != null) {
messageProcessed(interceptMsg);
Expand Down Expand Up @@ -829,11 +832,11 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
unAckedMessageTracker.add((MessageIdImpl) message.getMessageId());
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl)message.getMessageId(), Collections.singletonList(message));
}
if (!pendingReceives.isEmpty()) {
trackMessage(message);
notifyPendingReceivedCallback(message, null);
} else if (conf.getReceiverQueueSize() != 0 || waitingOnReceiveForZeroQueueSize) {
incomingMessages.add(message);
Expand Down Expand Up @@ -957,7 +960,6 @@ void receiveIndividualMessagesFromBatch(MessageMetadata msgMetadata, int redeliv
MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex());
BatchMessageAcker acker = BatchMessageAcker.newAcker(batchSize);
unAckedMessageTracker.add(batchMessage);
List<MessageImpl<T>> possibleToDeadLetter = null;
if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleToDeadLetter = new ArrayList<>();
Expand Down Expand Up @@ -1068,6 +1070,20 @@ protected synchronized void messageProcessed(Message<?> msg) {
}
}

protected void trackMessage(Message<?> msg) {
if (msg != null) {
MessageId messageId = msg.getMessageId();
if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
MessageIdImpl id = (MessageIdImpl)messageId;
if (id instanceof BatchMessageIdImpl) {
// do not add each item in batch message into tracker
id = new MessageIdImpl(id.getLedgerId(), id.getEntryId(), getPartitionIndex());
}
unAckedMessageTracker.add(id);
}
}
}

void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}
Expand Down

0 comments on commit 5d06b1e

Please sign in to comment.