Skip to content

Commit

Permalink
Issue apache#2584: unacked message is not redelivered on time (apache…
Browse files Browse the repository at this point in the history
…#2590)

### Motivation

unacked message is not redelivered after setting ackTimeout, but it is actually redelivered after 2*acktimeout.

The main reason is in UnAckedMessageTracker.
```
    public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ackTimeoutMillis) {
        this.stop();
        timeout = client.timer().newTimeout(new TimerTask() {
            @OverRide
            public void run(Timeout t) throws Exception {
                if (isAckTimeout()) {   < === first timeout, it is false, because oldOpenSet is empty.
                    log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
                    Set<MessageId> messageIds = new HashSet<>();
                    oldOpenSet.forEach(messageIds::add);
                    oldOpenSet.clear();
                    consumerBase.redeliverUnacknowledgedMessages(messageIds);
                }
                toggle();    < === toggle after timeout
                timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
            }
        }, ackTimeoutMillis, TimeUnit.MILLISECONDS);
    }
```
before first timeout, all messageId was added in CurrentSet, not in OldOpenSet, so isAckTimeout() is false, and `redeliverUnacknowledgedMessages` was not called at first timeout.

Related issue: apache#2584 

### Modifications

- move `toggle()` from behind if clause to before if clause.
- add ut

### Result

ut passed
  • Loading branch information
jiazhai authored and sijie committed Sep 20, 2018
1 parent 40ed8e4 commit 0ab2325
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ public void testAsyncProducerAndReceiveAsyncAndAsyncAck(int batchMessageDelayMs,
Set<String> messageSet = Sets.newHashSet();
for (int i = 0; i < numMessages; i++) {
future_msg = consumer.receiveAsync();
Thread.sleep(10);
msg = future_msg.get();
String receivedMessage = new String(msg.getData());
log.info("Received message: [{}]", receivedMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand Down Expand Up @@ -2662,4 +2663,50 @@ public void received(Consumer consumer, Message message)
assertEquals(latch.getCount(), 1);
consumer.close();
}

/**
* Ack timeout message is redelivered on time.
* Related github issue #2584
*/
@Test
public void testAckTimeoutRedeliver() throws Exception {
log.info("-- Starting {} test --", methodName);

// create consumer and producer
ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic("persistent://my-property/my-ns/ack-timeout-topic")
.subscriptionName("subscriber-1")
.ackTimeout(1, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Shared)
.acknowledgmentGroupTime(0, TimeUnit.SECONDS)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer()
.topic("persistent://my-property/my-ns/ack-timeout-topic")
.enableBatching(false)
.messageRoutingMode(MessageRoutingMode.SinglePartition)
.create();

// (1) Produced one Message
String content = "my-message-will-ack-timeout";
producer.send(content.getBytes());

// (2) consumer to receive messages, and not ack
Message<byte[]> message = consumer.receive();

// (3) should be re-delivered once ack-timeout.
Thread.sleep(1000);
message = consumer.receive(200, TimeUnit.MILLISECONDS);
assertNotNull(message);

Thread.sleep(1000);
message = consumer.receive(200, TimeUnit.MILLISECONDS);
assertNotNull(message);

assertEquals(content, new String(message.getData()));

producer.close();
consumer.close();
log.info("-- Exiting {} test --", methodName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,17 @@ public void testSharedSingleAckedPartitionedTopic() throws Exception {

private static int receiveAllMessage(Consumer<?> consumer, boolean ackMessages) throws Exception {
int messagesReceived = 0;
Message<?> msg = consumer.receive(1, TimeUnit.SECONDS);
Message<?> msg = consumer.receive(200, TimeUnit.MILLISECONDS);
while (msg != null) {
++messagesReceived;
log.info("Consumer received {}", new String(msg.getData()));
log.info("Consumer {} received {}", consumer.getConsumerName(), new String(msg.getData()));

if (ackMessages) {
consumer.acknowledge(msg);
log.info("Consumer {} acknowledged {}", consumer.getConsumerName(), new String(msg.getData()));
}

msg = consumer.receive(1, TimeUnit.SECONDS);
msg = consumer.receive(200, TimeUnit.MILLISECONDS);
}

return messagesReceived;
Expand Down Expand Up @@ -283,56 +284,31 @@ public void testFailoverSingleAckedPartitionedTopic() throws Exception {
}

// 4. Receive messages
Message<byte[]> message1 = consumer1.receive();
Message<byte[]> message2 = consumer2.receive();
int messageCount1 = 0;
int messageCount2 = 0;
int ackCount1 = 0;
int ackCount2 = 0;
do {
if (message1 != null) {
log.info("Consumer1 received " + new String(message1.getData()));
messageCount1 += 1;
}
if (message2 != null) {
log.info("Consumer2 received " + new String(message2.getData()));
messageCount2 += 1;
consumer2.acknowledge(message2);
ackCount2 += 1;
}
message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);

messageCount1 += receiveAllMessage(consumer1, false);
messageCount2 += receiveAllMessage(consumer2, true);

log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
log.info(key + " ackCount1 = " + ackCount1);
log.info(key + " ackCount2 = " + ackCount2);

assertEquals(messageCount1 + messageCount2, totalMessages);

Thread.sleep((int) (ackTimeOutMillis * 1.1));

// 5. Check if Messages redelivered again
// Since receive is a blocking call hoping that timeout will kick in
log.info(key + " Timeout should be triggered now");
message1 = consumer1.receive();
messageCount1 = 0;
do {
if (message1 != null) {
log.info("Consumer1 received " + new String(message1.getData()));
messageCount1 += 1;
consumer1.acknowledge(message1);
ackCount1 += 1;
}
if (message2 != null) {
log.info("Consumer2 received " + new String(message2.getData()));
messageCount2 += 1;
}
message1 = consumer1.receive(500, TimeUnit.MILLISECONDS);
message2 = consumer2.receive(500, TimeUnit.MILLISECONDS);
} while (message1 != null || message2 != null);

messageCount1 += receiveAllMessage(consumer1, true);
messageCount2 += receiveAllMessage(consumer2, false);

log.info(key + " messageCount1 = " + messageCount1);
log.info(key + " messageCount2 = " + messageCount2);
log.info(key + " ackCount1 = " + ackCount1);
log.info(key + " ackCount2 = " + ackCount2);
assertEquals(ackCount1 + messageCount2, totalMessages);

assertEquals(messageCount1 + messageCount2, totalMessages);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ public void start(PulsarClientImpl client, ConsumerBase<?> consumerBase, long ac
timeout = client.timer().newTimeout(new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
toggle();
if (isAckTimeout()) {
log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
Set<MessageId> messageIds = new HashSet<>();
oldOpenSet.forEach(messageIds::add);
oldOpenSet.clear();
consumerBase.redeliverUnacknowledgedMessages(messageIds);
}
toggle();
timeout = client.timer().newTimeout(this, ackTimeoutMillis, TimeUnit.MILLISECONDS);
}
}, ackTimeoutMillis, TimeUnit.MILLISECONDS);
Expand Down

0 comments on commit 0ab2325

Please sign in to comment.