diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java index e5465b2c44f7a..80174b74bfce2 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ZeroQueueSizeTest.java @@ -20,10 +20,13 @@ import static org.testng.Assert.assertEquals; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.collect.Lists; @@ -376,4 +379,40 @@ public void testZeroQueueSizeMessageRedeliveryForListener() throws Exception { consumer.close(); producer.close(); } + + @Test + public void testZeroQueueSizeMessageRedeliveryForAsyncReceive() throws PulsarClientException, ExecutionException, InterruptedException { + final String topic = "persistent://prop/ns-abc/testZeroQueueSizeMessageRedeliveryForAsyncReceive"; + Consumer consumer = pulsarClient.newConsumer(Schema.INT32) + .topic(topic) + .receiverQueueSize(0) + .subscriptionName("sub") + .subscriptionType(SubscriptionType.Shared) + .ackTimeout(1, TimeUnit.SECONDS) + .subscribe(); + + final int messages = 10; + Producer producer = pulsarClient.newProducer(Schema.INT32) + .topic(topic) + .enableBatching(false) + .create(); + + for (int i = 0; i < messages; i++) { + producer.send(i); + } + + Set receivedMessages = new HashSet<>(); + List>> futures = new ArrayList<>(20); + for (int i = 0; i < messages * 2; i++) { + futures.add(consumer.receiveAsync()); + } + for (CompletableFuture> future : futures) { + receivedMessages.add(future.get().getValue()); + } + + Assert.assertEquals(receivedMessages.size(), messages); + + consumer.close(); + producer.close(); + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index c7211d2f2bd28..33f05192aee33 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -977,6 +977,7 @@ void notifyPendingReceivedCallback(final Message message, Exception exception if (conf.getReceiverQueueSize() == 0) { // call interceptor and complete received callback + trackMessage(message); interceptAndComplete(message, receivedFuture); return; }