diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 5c7646921fbb4..20f3d3f74d819 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -474,7 +474,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckNormal(CommandAck ack, Map producer = pulsarClient + .newProducer() + .topic(topicName) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .enableBatching(true) + .create(); + + @Cleanup + Consumer consumer1 = pulsarClient + .newConsumer() + .topic(topicName) + .consumerName("consumer-1") + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .isAckReceiptEnabled(true) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Consumer consumer2 = pulsarClient + .newConsumer() + .topic(topicName) + .consumerName("consumer-2") + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .isAckReceiptEnabled(true) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + for (int i = 0; i < 5; i++) { + producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync(); + } + + // consume-1 receive 5 batch messages + List list = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + list.add(consumer1.receive().getMessageId()); + } + + // consumer-1 redeliver the batch messages + consumer1.negativeAcknowledge(list.get(0)); + + // consumer-2 will receive the messages that the consumer-1 redelivered + for (int i = 0; i < 5; i++) { + consumer2.receive().getMessageId(); + } + + // consumer1 ack two messages in the batch message + consumer1.acknowledge(list.get(1)); + consumer1.acknowledge(list.get(2)); + + // consumer-2 redeliver the rest of the messages + consumer2.negativeAcknowledge(list.get(1)); + + // consume-1 close will redeliver the rest messages to consumer-2 + consumer1.close(); + + // consumer-2 can receive the rest of 3 messages + for (int i = 0; i < 3; i++) { + consumer2.acknowledge(consumer2.receive().getMessageId()); + } + + // consumer-2 can't receive any messages, all the messages in batch has been acked + Message message = consumer2.receive(1, TimeUnit.SECONDS); + assertNull(message); + + // the number of consumer-2's unacked messages is 0 + Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false) + .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0); + } }