Skip to content

Commit

Permalink
[Transaction] Txn ack abort process on subscription (apache#7979)
Browse files Browse the repository at this point in the history
Fixes https://github.com/streamnative/pulsar/issues/1314

### Motivation

Currently, the transaction ack abort process is not well.

### Modifications

Make some changes for the transaction ack abort process.
  • Loading branch information
gaoran10 authored Sep 8, 2020
1 parent 37a1d87 commit 8f2540f
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1748,8 +1748,9 @@ protected void handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription
final long txnidLeastBits = command.getTxnidLeastBits();
final String topic = command.getSubscription().getTopic();
final String subName = command.getSubscription().getSubscription();
final int txnAction = command.getTxnAction().getNumber();

service.getTopics().get(command.getSubscription().getTopic())
service.getTopics().get(TopicName.get(command.getSubscription().getTopic()).toString())
.thenAccept(optionalTopic -> {
if (!optionalTopic.isPresent()) {
log.error("The topic {} is not exist in broker.", command.getSubscription().getTopic());
Expand All @@ -1771,7 +1772,7 @@ protected void handleEndTxnOnSubscription(PulsarApi.CommandEndTxnOnSubscription
}

CompletableFuture<Void> completableFuture =
subscription.endTxn(txnidMostBits, txnidLeastBits, command.getTxnAction().getNumber());
subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction);
completableFuture.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle end txn on subscription failed for request {}", requestId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,12 @@ public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, i
if (PulsarApi.TxnAction.COMMIT.getNumber() == txnAction) {
completableFuture = commitTxn(txnID, Collections.emptyMap());
} else if (PulsarApi.TxnAction.ABORT.getNumber() == txnAction) {
completableFuture = abortTxn(txnID, null);
Consumer redeliverConsumer = null;
if (getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer)
getDispatcher()).getActiveConsumer();
}
completableFuture = abortTxn(txnID, redeliverConsumer);
} else {
completableFuture.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBuffer;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
Expand All @@ -48,7 +49,6 @@
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.impl.PartitionedProducerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
Expand All @@ -75,7 +75,10 @@ public class TransactionProduceTest extends TransactionTestBase {

private final static String TENANT = "tnx";
private final static String NAMESPACE1 = TENANT + "/ns1";
private final static String TOPIC_OUTPUT = NAMESPACE1 + "/output";
private final static String PRODUCE_COMMIT_TOPIC = NAMESPACE1 + "/produce-commit";
private final static String PRODUCE_ABORT_TOPIC = NAMESPACE1 + "/produce-abort";
private final static String ACK_COMMIT_TOPIC = NAMESPACE1 + "/ack-commit";
private final static String ACK_ABORT_TOPIC = NAMESPACE1 + "/ack-abort";

@BeforeMethod
protected void setup() throws Exception {
Expand All @@ -87,7 +90,10 @@ protected void setup() throws Exception {
admin.tenants().createTenant(TENANT,
new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
admin.namespaces().createNamespace(NAMESPACE1);
admin.topics().createPartitionedTopic(TOPIC_OUTPUT, 3);
admin.topics().createPartitionedTopic(PRODUCE_COMMIT_TOPIC, 3);
admin.topics().createPartitionedTopic(PRODUCE_ABORT_TOPIC, 3);
admin.topics().createPartitionedTopic(ACK_COMMIT_TOPIC, 3);
admin.topics().createPartitionedTopic(ACK_ABORT_TOPIC, 3);

admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME)));
Expand Down Expand Up @@ -123,7 +129,7 @@ public void produceAndCommitTest() throws Exception {
@Cleanup
PartitionedProducerImpl<byte[]> outProducer = (PartitionedProducerImpl<byte[]>) pulsarClientImpl
.newProducer()
.topic(TOPIC_OUTPUT)
.topic(PRODUCE_COMMIT_TOPIC)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
Expand All @@ -143,7 +149,7 @@ public void produceAndCommitTest() throws Exception {

// the target topic hasn't the commit marker before commit
for (int i = 0; i < TOPIC_PARTITION; i++) {
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(TOPIC_OUTPUT, i);
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(PRODUCE_COMMIT_TOPIC, i);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
originTopicCursor.close();
Expand All @@ -157,9 +163,11 @@ public void produceAndCommitTest() throws Exception {
// the messageId callback should be called after commit
checkMessageId(futureList, true);

Thread.sleep(1000);

for (int i = 0; i < TOPIC_PARTITION; i++) {
// the target topic partition received the commit marker
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(TOPIC_OUTPUT, i);
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(PRODUCE_COMMIT_TOPIC, i);
Assert.assertNotNull(originTopicCursor);
Assert.assertTrue(originTopicCursor.hasMoreEntries());
List<Entry> entries = originTopicCursor.readEntries((int) originTopicCursor.getNumberOfEntries());
Expand All @@ -171,7 +179,7 @@ public void produceAndCommitTest() throws Exception {

// the target topic transactionBuffer should receive the transaction messages,
// committing marker and commit marker
ReadOnlyCursor tbTopicCursor = getTBTopicCursor(TOPIC_OUTPUT, i);
ReadOnlyCursor tbTopicCursor = getTBTopicCursor(PRODUCE_COMMIT_TOPIC, i);
Assert.assertNotNull(tbTopicCursor);
Assert.assertTrue(tbTopicCursor.hasMoreEntries());
long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
Expand Down Expand Up @@ -209,21 +217,21 @@ public void produceAndCommitTest() throws Exception {

@Test
public void produceAndAbortTest() throws Exception {
String topic = NAMESPACE1 + "/produce-abort-test";
PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient;
TransactionImpl txn = (TransactionImpl) pulsarClientImpl.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();

@Cleanup
ProducerImpl<byte[]> outProducer = (ProducerImpl<byte[]>) pulsarClientImpl
PartitionedProducerImpl<byte[]> outProducer = (PartitionedProducerImpl<byte[]>) pulsarClientImpl
.newProducer()
.topic(topic)
.topic(PRODUCE_ABORT_TOPIC)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();

int messageCnt = 10;
int messageCntPerPartition = 3;
int messageCnt = TOPIC_PARTITION * messageCntPerPartition;
Set<String> messageSet = new HashSet<>();
List<CompletableFuture<MessageId>> futureList = new ArrayList<>();
for (int i = 0; i < messageCnt; i++) {
Expand All @@ -235,10 +243,12 @@ public void produceAndAbortTest() throws Exception {
}

// the target topic hasn't the abort marker before commit
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(topic, -1);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
originTopicCursor.close();
for (int i = 0; i < TOPIC_PARTITION; i++) {
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(PRODUCE_ABORT_TOPIC, i);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
originTopicCursor.close();
}

// the messageId callback can't be called before commit
checkMessageId(futureList, false);
Expand All @@ -249,35 +259,39 @@ public void produceAndAbortTest() throws Exception {
checkMessageId(futureList, true);

// the target topic partition doesn't have any entries
originTopicCursor = getOriginTopicCursor(topic, -1);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
for (int i = 0; i < TOPIC_PARTITION; i++) {
ReadOnlyCursor originTopicCursor = getOriginTopicCursor(PRODUCE_ABORT_TOPIC, i);
Assert.assertNotNull(originTopicCursor);
Assert.assertFalse(originTopicCursor.hasMoreEntries());
}

// the target topic transactionBuffer should receive the transaction messages,
// committing marker and commit marker
ReadOnlyCursor tbTopicCursor = getTBTopicCursor(topic, -1);
Assert.assertNotNull(tbTopicCursor);
Assert.assertTrue(tbTopicCursor.hasMoreEntries());
long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
log.info("transaction buffer entries count: {}", tbEntriesCnt);
Assert.assertEquals(tbEntriesCnt, messageCnt + 1);

PulsarApi.MessageMetadata messageMetadata;
List<Entry> entries = tbTopicCursor.readEntries((int) tbEntriesCnt);
// check the messages
for (int i = 0; i < messageCnt; i++) {
messageMetadata = Commands.parseMessageMetadata(entries.get(i).getDataBuffer());
Assert.assertEquals(messageMetadata.getTxnidMostBits(), txn.getTxnIdMostBits());
Assert.assertEquals(messageMetadata.getTxnidLeastBits(), txn.getTxnIdLeastBits());
for (int i = 0; i < TOPIC_PARTITION; i++) {
ReadOnlyCursor tbTopicCursor = getTBTopicCursor(PRODUCE_ABORT_TOPIC, i);
Assert.assertNotNull(tbTopicCursor);
Assert.assertTrue(tbTopicCursor.hasMoreEntries());
long tbEntriesCnt = tbTopicCursor.getNumberOfEntries();
log.info("transaction buffer entries count: {}", tbEntriesCnt);
Assert.assertEquals(tbEntriesCnt, messageCntPerPartition + 1);

byte[] bytes = new byte[entries.get(i).getDataBuffer().readableBytes()];
entries.get(i).getDataBuffer().readBytes(bytes);
Assert.assertTrue(messageSet.remove(new String(bytes)));
}
PulsarApi.MessageMetadata messageMetadata;
List<Entry> entries = tbTopicCursor.readEntries((int) tbEntriesCnt);
// check the messages
for (int j = 0; j < messageCntPerPartition; j++) {
messageMetadata = Commands.parseMessageMetadata(entries.get(j).getDataBuffer());
Assert.assertEquals(messageMetadata.getTxnidMostBits(), txn.getTxnIdMostBits());
Assert.assertEquals(messageMetadata.getTxnidLeastBits(), txn.getTxnIdLeastBits());

// check abort marker
messageMetadata = Commands.parseMessageMetadata(entries.get(messageCnt).getDataBuffer());
Assert.assertEquals(PulsarMarkers.MarkerType.TXN_ABORT_VALUE, messageMetadata.getMarkerType());
byte[] bytes = new byte[entries.get(j).getDataBuffer().readableBytes()];
entries.get(j).getDataBuffer().readBytes(bytes);
Assert.assertTrue(messageSet.remove(new String(bytes)));
}

// check abort marker
messageMetadata = Commands.parseMessageMetadata(entries.get(messageCntPerPartition).getDataBuffer());
Assert.assertEquals(PulsarMarkers.MarkerType.TXN_ABORT_VALUE, messageMetadata.getMarkerType());
}

Assert.assertEquals(0, messageSet.size());
log.info("finish test produceAndAbortTest.");
Expand All @@ -289,7 +303,7 @@ private void checkMessageId(List<CompletableFuture<MessageId>> futureList, boole
MessageId messageId = messageIdFuture.get(1, TimeUnit.SECONDS);
if (isFinished) {
Assert.assertNotNull(messageId);
log.info("Tnx commit success! messageId: {}", messageId);
log.info("Tnx finished success! messageId: {}", messageId);
} else {
Assert.fail("MessageId shouldn't be get before txn abort.");
}
Expand Down Expand Up @@ -350,7 +364,7 @@ public void ackCommitTest() throws Exception {
log.info("init transaction {}.", txn);

Producer<byte[]> incomingProducer = pulsarClient.newProducer()
.topic(TOPIC_OUTPUT)
.topic(ACK_COMMIT_TOPIC)
.batchingMaxMessages(1)
.roundRobinRouterBatchingPartitionSwitchFrequency(1)
.create();
Expand All @@ -361,7 +375,7 @@ public void ackCommitTest() throws Exception {
log.info("prepare incoming messages finished.");

MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(TOPIC_OUTPUT)
.topic(ACK_COMMIT_TOPIC)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
Expand All @@ -377,7 +391,7 @@ public void ackCommitTest() throws Exception {
Thread.sleep(1000);

// The pending messages count should be the incomingMessageCnt
Assert.assertEquals(getPendingAckCount(subscriptionName), incomingMessageCnt);
Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), incomingMessageCnt);

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < incomingMessageCnt; i++) {
Expand All @@ -386,14 +400,14 @@ public void ackCommitTest() throws Exception {
}

// The pending messages count should be the incomingMessageCnt
Assert.assertEquals(getPendingAckCount(subscriptionName), incomingMessageCnt);
Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), incomingMessageCnt);

txn.commit().get();

Thread.sleep(1000);

// After commit, the pending messages count should be 0
Assert.assertEquals(getPendingAckCount(subscriptionName), 0);
Assert.assertEquals(getPendingAckCount(ACK_COMMIT_TOPIC, subscriptionName), 0);

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < incomingMessageCnt; i++) {
Expand All @@ -404,15 +418,80 @@ public void ackCommitTest() throws Exception {
log.info("finish test ackCommitTest");
}

private int getPendingAckCount(String subscriptionName) throws Exception {
@Test
public void ackAbortTest() throws Exception {
final String subscriptionName = "ackAbortTest";
Transaction txn = ((PulsarClientImpl) pulsarClient)
.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
log.info("init transaction {}.", txn);

Producer<byte[]> incomingProducer = pulsarClient.newProducer()
.topic(ACK_ABORT_TOPIC)
.batchingMaxMessages(1)
.roundRobinRouterBatchingPartitionSwitchFrequency(1)
.create();
int incomingMessageCnt = 10;
for (int i = 0; i < incomingMessageCnt; i++) {
incomingProducer.newMessage().value("Hello Txn.".getBytes()).sendAsync();
}
log.info("prepare incoming messages finished.");

MultiTopicsConsumerImpl<byte[]> consumer = (MultiTopicsConsumerImpl<byte[]>) pulsarClient.newConsumer()
.topic(ACK_ABORT_TOPIC)
.subscriptionName(subscriptionName)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.enableBatchIndexAcknowledgment(true)
.subscriptionType(SubscriptionType.Shared)
.subscribe();

for (int i = 0; i < incomingMessageCnt; i++) {
Message<byte[]> message = consumer.receive();
log.info("receive messageId: {}", message.getMessageId());
consumer.acknowledgeAsync(message.getMessageId(), txn);
}

Thread.sleep(1000);

// The pending messages count should be the incomingMessageCnt
Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), incomingMessageCnt);

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < incomingMessageCnt; i++) {
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNull(message);
}

// The pending messages count should be the incomingMessageCnt
Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), incomingMessageCnt);

txn.abort().get();

Thread.sleep(1000);

// After commit, the pending messages count should be 0
Assert.assertEquals(getPendingAckCount(ACK_ABORT_TOPIC, subscriptionName), 0);

consumer.redeliverUnacknowledgedMessages();
for (int i = 0; i < incomingMessageCnt; i++) {
Message<byte[]> message = consumer.receive(2, TimeUnit.SECONDS);
Assert.assertNotNull(message);
log.info("second receive messageId: {}", message.getMessageId());
}

log.info("finish test ackAbortTest");
}

private int getPendingAckCount(String topic, String subscriptionName) throws Exception {
Class<PersistentSubscription> clazz = PersistentSubscription.class;
Field field = clazz.getDeclaredField("pendingAckMessages");
field.setAccessible(true);

int pendingAckCount = 0;
for (PulsarService pulsarService : getPulsarServiceList()) {
for (String key : pulsarService.getBrokerService().getTopics().keys()) {
if (key.startsWith("persistent://" + TOPIC_OUTPUT)) {
if (key.contains(topic)) {
PersistentSubscription subscription =
(PersistentSubscription) pulsarService.getBrokerService()
.getTopics().get(key).get().get().getSubscription(subscriptionName);
Expand All @@ -423,8 +502,9 @@ private int getPendingAckCount(String subscriptionName) throws Exception {
}
}
}
log.info("pendingAckCount: {}", pendingAckCount);
log.info("subscriptionName: {}, pendingAckCount: {}", subscriptionName, pendingAckCount);
return pendingAckCount;
}


}
Loading

0 comments on commit 8f2540f

Please sign in to comment.