Skip to content

Commit

Permalink
[Fix][Txn] Fix transaction PendingAck lowWaterMark (apache#15530)
Browse files Browse the repository at this point in the history
### Motivation
Now, PendingAckHandle use the ending transaction ID to append abort mark, but it is wrong. We should append abort mark for the first transaction in the  individualAckOfTransaction after judgment.
### Modification
Append abort mark for the first transaction in the  individualAckOfTransaction after judgment.
  • Loading branch information
liangyepianzhou authored May 13, 2022
1 parent 0d6e82a commit 498cde9
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -600,21 +600,13 @@ private void handleLowWaterMark(TxnID txnID, long lowWaterMark) {

if (firstTxn.getMostSigBits() == txnID.getMostSigBits()
&& firstTxn.getLeastSigBits() <= lowWaterMark) {
this.pendingAckStoreFuture.whenComplete((pendingAckStore, throwable) -> {
if (throwable == null) {
pendingAckStore.appendAbortMark(txnID, AckType.Individual).thenAccept(v -> {
synchronized (PendingAckHandleImpl.this) {
log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+ "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
individualAckOfTransaction.remove(firstTxn);
handleLowWaterMark(txnID, lowWaterMark);
}
}).exceptionally(e -> {
log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+ "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
return null;
});
}
abortTxn(firstTxn, null, lowWaterMark).thenRun(() -> {
log.warn("[{}] Transaction pending ack handle low water mark success! txnId : [{}], "
+ "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
}).exceptionally(e -> {
log.warn("[{}] Transaction pending ack handle low water mark fail! txnId : [{}], "
+ "lowWaterMark : [{}]", topicName, txnID, lowWaterMark);
return null;
});
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.testng.Assert.fail;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand All @@ -32,6 +34,7 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.collections4.map.LinkedMap;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
Expand All @@ -44,6 +47,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -345,4 +350,116 @@ private void testDeleteTopicThenDeletePendingAckManagedLedger() throws Exception
assertFalse(topics.contains(MLPendingAckStore.getTransactionPendingAckStoreSuffix(topic, subName2)));
assertFalse(topics.contains(topic));
}

@Test
public void testPendingAckLowWaterMarkRemoveFirstTxn() throws Exception {
String topic = TopicName.get(TopicDomain.persistent.toString(),
NamespaceName.get(NAMESPACE1), "test").toString();

String subName = "subName";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Failover)
.enableBatchIndexAcknowledgment(true)
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();

for (int i = 0; i < 5; i++) {
producer.newMessage().send();
}

Transaction transaction1 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();

Message<byte[]> message1 = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message1.getMessageId(), transaction1);
transaction1.commit().get();


Transaction transaction2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
while (transaction1.getTxnID().getMostSigBits() != transaction2.getTxnID().getMostSigBits()) {
transaction2 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
}

Transaction transaction3 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
while (transaction1.getTxnID().getMostSigBits() != transaction3.getTxnID().getMostSigBits()) {
transaction3 = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build()
.get();
}

Message<byte[]> message3 = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message3.getMessageId(), transaction2);
transaction2.commit().get();

Message<byte[]> message2 = consumer.receive(5, TimeUnit.SECONDS);

Field field = TransactionImpl.class.getDeclaredField("state");
field.setAccessible(true);
field.set(transaction1, TransactionImpl.State.OPEN);

consumer.acknowledgeAsync(message2.getMessageId(), transaction1).get();
Message<byte[]> message4 = consumer.receive(5, TimeUnit.SECONDS);
field.set(transaction2, TransactionImpl.State.OPEN);
consumer.acknowledgeAsync(message4.getMessageId(), transaction2).get();

Message<byte[]> message5 = consumer.receive(5, TimeUnit.SECONDS);
consumer.acknowledgeAsync(message5.getMessageId(), transaction3);
transaction3.commit().get();


PersistentTopic persistentTopic =
(PersistentTopic) getPulsarServiceList()
.get(0)
.getBrokerService()
.getTopic(topic, false)
.get()
.get();

PersistentSubscription persistentSubscription = persistentTopic.getSubscription(subName);
PendingAckHandleImpl pendingAckHandle = new PendingAckHandleImpl(persistentSubscription);

Method method = PendingAckHandleImpl.class.getDeclaredMethod("initPendingAckStore");
method.setAccessible(true);
method.invoke(pendingAckHandle);

Field field1 = PendingAckHandleImpl.class.getDeclaredField("pendingAckStoreFuture");
field1.setAccessible(true);
CompletableFuture<PendingAckStore> completableFuture =
(CompletableFuture<PendingAckStore>) field1.get(pendingAckHandle);

Awaitility.await().until(() -> {
completableFuture.get();
return true;
});

Field field2 = PendingAckHandleImpl.class.getDeclaredField("individualAckOfTransaction");
field2.setAccessible(true);
LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>> individualAckOfTransaction =
(LinkedMap<TxnID, HashMap<PositionImpl, PositionImpl>>) field2.get(pendingAckHandle);

assertFalse(individualAckOfTransaction.containsKey(transaction1.getTxnID()));
assertFalse(individualAckOfTransaction.containsKey(transaction2.getTxnID()));

}
}

0 comments on commit 498cde9

Please sign in to comment.