Skip to content

Commit

Permalink
[fix][txn] fix ack with txn compute ackedCount error (apache#17016)
Browse files Browse the repository at this point in the history
Co-authored-by: congbobo184 <[email protected]>
  • Loading branch information
congbobo184 and congbobo184 authored Sep 1, 2022
1 parent 50fd2a1 commit 176b0d6
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -531,25 +531,28 @@ private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) {
LongAdder totalAckCount = new LongAdder();
for (int i = 0; i < ack.getMessageIdsCount(); i++) {
MessageIdData msgId = ack.getMessageIdAt(i);
PositionImpl position;
PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
// acked count at least one
long ackedCount = 0;
long batchSize = getBatchSize(msgId);
long batchSize = 0;
if (msgId.hasBatchSize()) {
batchSize = msgId.getBatchSize();
// ack batch messages set ackeCount = batchSize
ackedCount = msgId.getBatchSize();
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
// ack no batch message set ackedCount = 1
ackedCount = 1;
positionsAcked.add(new MutablePair<>(position, (int) batchSize));
}
Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId());
if (msgId.getAckSetsCount() > 0) {
long[] ackSets = new long[msgId.getAckSetsCount()];
for (int j = 0; j < msgId.getAckSetsCount(); j++) {
ackSets[j] = msgId.getAckSetAt(j);
}
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets);
position.setAckSet(ackSets);
ackedCount = getAckedCountForTransactionAck(batchSize, ackSets);
} else {
position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId());
ackedCount = batchSize;
}
if (msgId.hasBatchSize()) {
positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize()));
} else {
positionsAcked.add(new MutablePair<>(position, (int) batchSize));
}

addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1172,6 +1172,52 @@ public void testSendTxnMessageTimeout() throws Exception {
}
}

@Test
public void testAckWithTransactionReduceUnackCountNotInPendingAcks() throws Exception {
final String topic = "persistent://" + NAMESPACE1 + "/testAckWithTransactionReduceUnackCountNotInPendingAcks";
final String subName = "test";
@Cleanup
ProducerImpl<byte[]> producer = (ProducerImpl<byte[]>) pulsarClient.newProducer()
.topic(topic)
.batchingMaxPublishDelay(1, TimeUnit.SECONDS)
.sendTimeout(1, TimeUnit.SECONDS)
.create();

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName(subName)
.subscribe();

// send 5 messages with one batch
for (int i = 0; i < 5; i++) {
producer.sendAsync((i + "").getBytes(UTF_8));
}

List<MessageId> messageIds = new ArrayList<>();

// receive the batch messages add to a list
for (int i = 0; i < 5; i++) {
messageIds.add(consumer.receive().getMessageId());
}

MessageIdImpl messageId = (MessageIdImpl) messageIds.get(0);


// remove the message from the pendingAcks, in fact redeliver will remove the messageId from the pendingAck
getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getPendingAcks()
.remove(messageId.ledgerId, messageId.entryId);

Transaction txn = getTxn();
consumer.acknowledgeAsync(messageIds.get(1), txn).get();

// ack one message, the unack count is 4
assertEquals(getPulsarServiceList().get(0).getBrokerService().getTopic(topic, false)
.get().get().getSubscription(subName).getConsumers().get(0).getUnackedMessages(), 4);
}

@Test
public void testSendTxnAckMessageToDLQ() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnAckMessageToDLQ";
Expand Down

0 comments on commit 176b0d6

Please sign in to comment.