From 5262e6c8b4d2a98ac7f73a94a30f001630b2be28 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Sat, 13 Aug 2022 10:29:26 +0800 Subject: [PATCH] [fix][broker] fix broker unackmessages number reduce error (#17003) --- .../pulsar/broker/service/Consumer.java | 13 +-- .../BatchMessageWithBatchIndexLevelTest.java | 82 +++++++++++++++++++ 2 files changed, 89 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java index 5c7646921fbb4..20f3d3f74d819 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java @@ -474,7 +474,7 @@ private CompletableFuture individualAckNormal(CommandAck ack, Map individualAckNormal(CommandAck ack, Map producer = pulsarClient + .newProducer() + .topic(topicName) + .batchingMaxPublishDelay(1, TimeUnit.SECONDS) + .enableBatching(true) + .create(); + + @Cleanup + Consumer consumer1 = pulsarClient + .newConsumer() + .topic(topicName) + .consumerName("consumer-1") + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .isAckReceiptEnabled(true) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + @Cleanup + Consumer consumer2 = pulsarClient + .newConsumer() + .topic(topicName) + .consumerName("consumer-2") + .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS) + .isAckReceiptEnabled(true) + .subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared) + .enableBatchIndexAcknowledgment(true) + .subscribe(); + + for (int i = 0; i < 5; i++) { + producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync(); + } + + // consume-1 receive 5 batch messages + List list = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + list.add(consumer1.receive().getMessageId()); + } + + // consumer-1 redeliver the batch messages + consumer1.negativeAcknowledge(list.get(0)); + + // consumer-2 will receive the messages that the consumer-1 redelivered + for (int i = 0; i < 5; i++) { + consumer2.receive().getMessageId(); + } + + // consumer1 ack two messages in the batch message + consumer1.acknowledge(list.get(1)); + consumer1.acknowledge(list.get(2)); + + // consumer-2 redeliver the rest of the messages + consumer2.negativeAcknowledge(list.get(1)); + + // consume-1 close will redeliver the rest messages to consumer-2 + consumer1.close(); + + // consumer-2 can receive the rest of 3 messages + for (int i = 0; i < 3; i++) { + consumer2.acknowledge(consumer2.receive().getMessageId()); + } + + // consumer-2 can't receive any messages, all the messages in batch has been acked + Message message = consumer2.receive(1, TimeUnit.SECONDS); + assertNull(message); + + // the number of consumer-2's unacked messages is 0 + Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false) + .get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0); + } }