Skip to content

Commit

Permalink
[fix][broker] fix broker unackmessages number reduce error (apache#17003
Browse files Browse the repository at this point in the history
)
  • Loading branch information
congbobo184 authored Aug 13, 2022
1 parent 16adb61 commit 5262e6c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
ackSets[j] = msgId.getAckSetAt(j);
}
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets);
ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer);
if (isTransactionEnabled()) {
//sync the batch position bit set point, in order to delete the position in pending acks
if (Subscription.isIndividualAckMode(subType)) {
Expand All @@ -484,7 +484,7 @@ private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String,
}
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position);
ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer);
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
Expand Down Expand Up @@ -592,20 +592,21 @@ private long getBatchSize(MessageIdData msgId) {
return batchSize;
}

private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position) {
private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) {
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET);
return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, consumer);
}
}
return batchSize;
}

private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets) {
private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets,
Consumer consumer) {
long ackedCount = 0;
if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)
&& pendingAcks.get(position.getLedgerId(), position.getEntryId()) != null) {
&& consumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) != null) {
long[] cursorAckSet = getCursorAckSet(position);
if (cursorAckSet != null) {
BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,13 @@
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNull;

@Test(groups = "broker")
public class BatchMessageWithBatchIndexLevelTest extends BatchMessageTest {
Expand Down Expand Up @@ -195,4 +197,84 @@ public void testBatchMessageMultiNegtiveAck() throws Exception{
assertEquals(unackedMessages, 10);
});
}

@Test
public void testAckMessageWithNotOwnerConsumerUnAckMessageCount() throws Exception {
final String subName = "test";
final String topicName = "persistent://prop/ns-abc/testAckMessageWithNotOwnerConsumerUnAckMessageCount-"
+ UUID.randomUUID();

@Cleanup
Producer<byte[]> producer = pulsarClient
.newProducer()
.topic(topicName)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.enableBatching(true)
.create();

@Cleanup
Consumer<byte[]> consumer1 = pulsarClient
.newConsumer()
.topic(topicName)
.consumerName("consumer-1")
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
.isAckReceiptEnabled(true)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Consumer<byte[]> consumer2 = pulsarClient
.newConsumer()
.topic(topicName)
.consumerName("consumer-2")
.negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
.isAckReceiptEnabled(true)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.subscribe();

for (int i = 0; i < 5; i++) {
producer.newMessage().value(("Hello Pulsar - " + i).getBytes()).sendAsync();
}

// consume-1 receive 5 batch messages
List<MessageId> list = new ArrayList<>();
for (int i = 0; i < 5; i++) {
list.add(consumer1.receive().getMessageId());
}

// consumer-1 redeliver the batch messages
consumer1.negativeAcknowledge(list.get(0));

// consumer-2 will receive the messages that the consumer-1 redelivered
for (int i = 0; i < 5; i++) {
consumer2.receive().getMessageId();
}

// consumer1 ack two messages in the batch message
consumer1.acknowledge(list.get(1));
consumer1.acknowledge(list.get(2));

// consumer-2 redeliver the rest of the messages
consumer2.negativeAcknowledge(list.get(1));

// consume-1 close will redeliver the rest messages to consumer-2
consumer1.close();

// consumer-2 can receive the rest of 3 messages
for (int i = 0; i < 3; i++) {
consumer2.acknowledge(consumer2.receive().getMessageId());
}

// consumer-2 can't receive any messages, all the messages in batch has been acked
Message<byte[]> message = consumer2.receive(1, TimeUnit.SECONDS);
assertNull(message);

// the number of consumer-2's unacked messages is 0
Awaitility.await().until(() -> getPulsar().getBrokerService().getTopic(topicName, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages() == 0);
}
}

0 comments on commit 5262e6c

Please sign in to comment.