Skip to content

Commit

Permalink
Fix PersistentAcknowledgmentsGroupingTracker set bitSet issue. (apach…
Browse files Browse the repository at this point in the history
…e#14260)

When consumers set `enableBatchIndexAcknowledgment=true`, client will execute PersistentAcknowledgmentsGroupingTracker#doIndividualBatchAckAsync :
https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PersistentAcknowledgmentsGroupingTracker.java#L357-L372

There is an error in line 367, it should be
`value.set(0, batchMessageId.getBatchSize()); `

But batchMessageId.getBatchSize() always return acker.getBatchSize():
https://github.com/apache/pulsar/blob/8928c3496a61c588b50461d6adaab089dd421619/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageIdImpl.java#L137-L139

If line 362 is false, BatchMessageIdImpl only has acker with BatchMessageAckerDisabled which batch is always 0.
So I have added `getOriginalBatchSize` to return the user-specified batch size.

Then, when print logs in line 556, `pendingIndividualBatchIndexAcks` is always empty. Should replace with `entriesToAck`

(cherry picked from commit 816eaed)
  • Loading branch information
Technoboy- authored and michaeljmarshall committed Feb 23, 2022
1 parent 0c3139f commit a482307
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ public int getBatchSize() {
return acker.getBatchSize();
}

public int getOriginalBatchSize() {
return this.batchSize;
}

public MessageIdImpl prevBatchMessageId() {
return new MessageIdImpl(
ledgerId, entryId - 1, partitionIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ private void doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, batchMessageId.getBatchIndex());
value.set(0, batchMessageId.getOriginalBatchSize());
}
return value;
});
Expand Down Expand Up @@ -546,8 +546,9 @@ private void flushAsync(ClientCnx cnx) {

if (shouldFlush) {
if (log.isDebugEnabled()) {
log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {} -- individual-batch-index-acks: {}",
consumer, lastCumulativeAck, pendingIndividualAcks, pendingIndividualBatchIndexAcks);
log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}"
+ " -- individual-batch-index-acks: {}",
consumer, lastCumulativeAck, pendingIndividualAcks, entriesToAck);
}
cnx.ctx().flush();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,27 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -381,6 +386,36 @@ public void testBatchAckTrackerMultiAck(boolean isNeedReceipt) throws Exception
tracker.close();
}

@Test
public void testDoIndividualBatchAckAsync() throws Exception{
ConsumerConfigurationData<?> conf = new ConsumerConfigurationData<>();
AcknowledgmentsGroupingTracker tracker = new PersistentAcknowledgmentsGroupingTracker(consumer, conf, eventLoopGroup);
MessageId messageId1 = new BatchMessageIdImpl(5, 1, 0, 3, 10, BatchMessageAckerDisabled.INSTANCE);
BitSet bitSet = new BitSet(20);
for(int i = 0; i < 20; i ++) {
bitSet.set(i, true);
}
MessageId messageId2 = new BatchMessageIdImpl(3, 2, 0, 5, 20, BatchMessageAcker.newAcker(bitSet));
Method doIndividualBatchAckAsync = PersistentAcknowledgmentsGroupingTracker.class
.getDeclaredMethod("doIndividualBatchAckAsync", BatchMessageIdImpl.class);
doIndividualBatchAckAsync.setAccessible(true);
doIndividualBatchAckAsync.invoke(tracker, messageId1);
doIndividualBatchAckAsync.invoke(tracker, messageId2);
Field pendingIndividualBatchIndexAcks = PersistentAcknowledgmentsGroupingTracker.class.getDeclaredField("pendingIndividualBatchIndexAcks");
pendingIndividualBatchIndexAcks.setAccessible(true);
ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> batchIndexAcks =
(ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable>) pendingIndividualBatchIndexAcks.get(tracker);
MessageIdImpl position1 = new MessageIdImpl(5, 1, 0);
MessageIdImpl position2 = new MessageIdImpl(3, 2, 0);
assertTrue(batchIndexAcks.containsKey(position1));
assertNotNull(batchIndexAcks.get(position1));
assertEquals(batchIndexAcks.get(position1).cardinality(), 9);
assertTrue(batchIndexAcks.containsKey(position2));
assertNotNull(batchIndexAcks.get(position2));
assertEquals(batchIndexAcks.get(position2).cardinality(), 19);
tracker.close();
}

public class ClientCnxTest extends ClientCnx {

public ClientCnxTest(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
Expand Down

0 comments on commit a482307

Please sign in to comment.