Skip to content

Commit

Permalink
[fix][txn] Transaction cumulative ack redeliver change (apache#14371)
Browse files Browse the repository at this point in the history
apache#10478

### Motivation
since apache#10478 merged, we should change the cumulative ack with transaction abort redeliver logic. We can't redeliver unCumulativeAck message by the server because the client will receive the new message and ack then will receive the old message they abort. 

in this case:
1. we have 5 message
2. cumulative ack 3 messages with the transaction
3. we abort this transaction
4. server redeliver message by the current consumer_epoch
5. the client will not filter the 4 or 5 messages, because in apache#10478 we don't change the client consumer epoch
6. client cumulative ack 4 5 with transaction and commit will lose the 1 2 3 messages and the consume message, not in order.
### Modifications
don't redeliver any cumulative ack messages, it will do by user self
  • Loading branch information
congbobo184 authored Jul 13, 2022
1 parent a19ed1a commit e6396bb
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.andAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.compareToWithAckSet;
import static org.apache.bookkeeper.mledger.util.PositionAckSetUtil.isAckSetOverlap;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -528,9 +527,10 @@ public CompletableFuture<Void> internalAbortTxn(TxnID txnId, Consumer consumer,
if (cumulativeAckOfTransaction.getKey().equals(txnId)) {
cumulativeAckOfTransaction = null;
}
//TODO: pendingAck handle next pr will fix
persistentSubscription.redeliverUnacknowledgedMessages(consumer, DEFAULT_CONSUMER_EPOCH);
abortFuture.complete(null);

// in cumulative ack with transaction, don't depend on server redeliver message,
// it will cause the messages to be out of order
}).exceptionally(e -> {
log.error("[{}] Transaction pending ack store abort txnId : [{}] fail!",
topicName, txnId, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import io.netty.channel.ChannelHandlerContext;
Expand Down Expand Up @@ -1066,6 +1067,74 @@ public void testTxnTimeOutInClient() throws Exception{
}
}

@Test
public void testCumulativeAckRedeliverMessages() throws Exception {
String topic = NAMESPACE1 + "/testCumulativeAckRedeliverMessages";

int count = 5;
int transactionCumulativeAck = 3;
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.create();

// send 5 messages
for (int i = 0; i < count; i++) {
producer.send((i + "").getBytes(UTF_8));
}

Transaction transaction = getTxn();
Transaction invalidTransaction = getTxn();

Message<byte[]> message = null;
for (int i = 0; i < transactionCumulativeAck; i++) {
message = consumer.receive();
}

// receive transaction in order
assertEquals(message.getValue(), (transactionCumulativeAck - 1 + "").getBytes(UTF_8));

// ack the last message
consumer.acknowledgeCumulativeAsync(message.getMessageId(), transaction).get();

// another ack will throw TransactionConflictException
try {
consumer.acknowledgeCumulativeAsync(message.getMessageId(), invalidTransaction).get();
fail();
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof PulsarClientException.TransactionConflictException);
// abort transaction then redeliver messages
transaction.abort().get();
// consumer redeliver messages
consumer.redeliverUnacknowledgedMessages();
}

// receive the rest of the message
for (int i = 0; i < count; i++) {
message = consumer.receive();
}

Transaction commitTransaction = getTxn();

// receive the first message
assertEquals(message.getValue(), (count - 1 + "").getBytes(UTF_8));
// ack the end of the message
consumer.acknowledgeCumulativeAsync(message.getMessageId(), commitTransaction).get();

commitTransaction.commit().get();

// then redeliver will not receive any message
message = consumer.receive(3, TimeUnit.SECONDS);
assertNull(message);
}

@Test
public void testSendTxnMessageTimeout() throws Exception {
String topic = NAMESPACE1 + "/testSendTxnMessageTimeout";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,14 +626,6 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageId
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {

// it is okay that we register acked topic after sending the acknowledgements. because
// the transactional ack will not be visiable for consumers until the transaction is
// committed
if (ackType == AckType.Cumulative) {
txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
}

ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
// register the ackFuture as part of the transaction
Expand All @@ -649,13 +641,6 @@ protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, AckT
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && (this instanceof ConsumerImpl)) {
// it is okay that we register acked topic after sending the acknowledgements. because
// the transactional ack will not be visiable for consumers until the transaction is
// committed
if (ackType == AckType.Cumulative) {
txn.registerCumulativeAckConsumer((ConsumerImpl<?>) this);
}

ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageId, ackType, properties, txn));
// register the ackFuture as part of the transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -37,7 +36,6 @@
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.InvalidTxnStatusException;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClientException.TransactionNotFoundException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.util.FutureUtil;

Expand All @@ -62,7 +60,6 @@ public class TransactionImpl implements Transaction , TimerTask {
private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<Pair<String, String>, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;

private final ArrayList<CompletableFuture<MessageId>> sendFutureList;
private final ArrayList<CompletableFuture<Void>> ackFutureList;
Expand Down Expand Up @@ -122,9 +119,8 @@ public CompletableFuture<Void> registerProducedTopic(String topic) {
}
});
}
} else {
return completableFuture;
}
return completableFuture;
}

public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
Expand All @@ -147,22 +143,14 @@ public CompletableFuture<Void> registerAckedTopic(String topic, String subscript
}
});
}
} else {
return completableFuture;
}
return completableFuture;
}

public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
ackFutureList.add(ackFuture);
}

public synchronized void registerCumulativeAckConsumer(ConsumerImpl<?> consumer) {
if (this.cumulativeAckConsumers == null) {
this.cumulativeAckConsumers = new HashMap<>();
}
cumulativeAckConsumers.put(consumer, 0);
}

@Override
public CompletableFuture<Void> commit() {
timeout.cancel();
Expand Down Expand Up @@ -202,16 +190,7 @@ public CompletableFuture<Void> abort() {
if (e != null) {
log.error(e.getMessage());
}
if (cumulativeAckConsumers != null) {
cumulativeAckConsumers.forEach((consumer, integer) ->
cumulativeAckConsumers
.putIfAbsent(consumer, consumer.clearIncomingMessagesAndGetMessageNumber()));
}
tcClient.abortAsync(new TxnID(txnIdMostBits, txnIdLeastBits)).whenComplete((vx, ex) -> {
if (cumulativeAckConsumers != null) {
cumulativeAckConsumers.forEach(ConsumerImpl::increaseAvailablePermits);
cumulativeAckConsumers.clear();
}

if (ex != null) {
if (ex instanceof TransactionNotFoundException
Expand Down

0 comments on commit e6396bb

Please sign in to comment.