Skip to content

Commit

Permalink
[Transaction] Fix transaction messages order error and deduplication …
Browse files Browse the repository at this point in the history
…error (apache#9024)

### Motivation

Currently, the transaction messages would be produced in the wrong order, and the deduplication check is not work well.

### Modifications

*Describe the modifications you've done.*

### Verifying this change

This change added tests and can be verified as follows:

- *org.apache.pulsar.client.impl.TransactionEndToEndTest#produceTxnMessageOrderTest*
  • Loading branch information
gaoran10 authored Dec 23, 2020
1 parent 816da8d commit a5c9c15
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
Expand Down Expand Up @@ -54,14 +55,18 @@ public CompletableFuture<TransactionMeta> getTransactionMeta(TxnID txnID) {
@Override
public CompletableFuture<Position> appendBufferToTxn(TxnID txnId, long sequenceId, ByteBuf buffer) {
CompletableFuture<Position> completableFuture = new CompletableFuture<>();
topic.publishMessage(buffer, (e, ledgerId, entryId) -> {
if (e != null) {
log.error("Failed to append buffer to txn {}", txnId, e);
completableFuture.completeExceptionally(e);
return;
topic.getManagedLedger().asyncAddEntry(buffer, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
completableFuture.complete(position);
}
completableFuture.complete(PositionImpl.get(ledgerId, entryId));
});

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Failed to append buffer to txn {}", txnId, exception);
completableFuture.completeExceptionally(exception);
}
}, null);
return completableFuture;
}

Expand All @@ -78,14 +83,19 @@ public CompletableFuture<Void> commitTxn(TxnID txnID, List<MessageIdData> sendMe

ByteBuf commitMarker = Markers.newTxnCommitMarker(-1L, txnID.getMostSigBits(),
txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
topic.publishMessage(commitMarker, (e, ledgerId, entryId) -> {
if (e != null) {
log.error("Failed to commit for txn {}", txnID, e);
completableFuture.completeExceptionally(e);
return;

topic.getManagedLedger().asyncAddEntry(commitMarker, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
completableFuture.complete(null);
}
completableFuture.complete(null);
});

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Failed to commit for txn {}", txnID, exception);
completableFuture.completeExceptionally(exception);
}
}, null);
return completableFuture;
}

Expand All @@ -98,14 +108,18 @@ public CompletableFuture<Void> abortTxn(TxnID txnID, List<MessageIdData> sendMes

ByteBuf abortMarker = Markers.newTxnAbortMarker(
-1L, txnID.getMostSigBits(), txnID.getLeastSigBits(), getMessageIdDataList(sendMessageIdList));
topic.publishMessage(abortMarker, (e, ledgerId, entryId) -> {
if (e != null) {
log.error("Failed to abort for txn {}", txnID, e);
completableFuture.completeExceptionally(e);
return;
topic.getManagedLedger().asyncAddEntry(abortMarker, new AsyncCallbacks.AddEntryCallback() {
@Override
public void addComplete(Position position, Object ctx) {
completableFuture.complete(null);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
log.error("Failed to abort for txn {}", txnID, exception);
completableFuture.completeExceptionally(exception);
}
completableFuture.complete(null);
});
}, null);
return completableFuture;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ protected void startBroker() throws Exception {
conf.setWebServicePort(Optional.of(0));
conf.setWebServicePortTls(Optional.of(0));
conf.setTransactionCoordinatorEnabled(true);
conf.setBrokerDeduplicationEnabled(true);
serviceConfigurationList.add(conf);

PulsarService pulsar = spy(new PulsarService(conf));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,7 @@ private void markDeletePositionCheck(String topic, String subName, boolean equal
@Test
public void txnMetadataHandlerRecoverTest() throws Exception {
String topic = NAMESPACE1 + "/tc-metadata-handler-recover";
@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
Expand All @@ -592,6 +593,7 @@ public void txnMetadataHandlerRecoverTest() throws Exception {
}

pulsarClient.close();
@Cleanup
PulsarClientImpl recoverPulsarClient = (PulsarClientImpl) PulsarClient.builder()
.serviceUrl(getPulsarServiceList().get(0).getBrokerServiceUrl())
.statsInterval(0, TimeUnit.SECONDS)
Expand All @@ -603,6 +605,7 @@ public void txnMetadataHandlerRecoverTest() throws Exception {
tcClient.commit(entry.getKey(), entry.getValue());
}

@Cleanup
Consumer<byte[]> consumer = recoverPulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
Expand All @@ -615,4 +618,40 @@ public void txnMetadataHandlerRecoverTest() throws Exception {
}
}

@Test
public void produceTxnMessageOrderTest() throws Exception {
String topic = NAMESPACE1 + "/txn-produce-order";

@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionName("test")
.subscribe();

@Cleanup
Producer<byte[]> producer = pulsarClient.newProducer()
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.producerName("txn-publish-order")
.create();

for (int ti = 0; ti < 10; ti++) {
Transaction txn = pulsarClient
.newTransaction()
.withTransactionTimeout(2, TimeUnit.SECONDS)
.build().get();

for (int i = 0; i < 1000; i++) {
producer.newMessage(txn).value(("" + i).getBytes()).sendAsync();
}
txn.commit().get();

for (int i = 0; i < 1000; i++) {
Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
Assert.assertEquals(Integer.valueOf(new String(message.getData())), new Integer(i));
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ private long beforeSend() {
}
msgMetadataBuilder.setTxnidLeastBits(txn.getTxnIdLeastBits());
msgMetadataBuilder.setTxnidMostBits(txn.getTxnIdMostBits());
long sequenceId = txn.nextSequenceId();
msgMetadataBuilder.setSequenceId(sequenceId);
return sequenceId;
return -1L;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.collect.Lists;
Expand Down Expand Up @@ -53,10 +55,9 @@ public class TransactionImpl implements Transaction {
private final long transactionTimeoutMs;
private final long txnIdLeastBits;
private final long txnIdMostBits;
private final AtomicLong sequenceId = new AtomicLong(0L);

private final Set<String> producedTopics;
private final Set<String> ackedTopics;
private final Map<String, CompletableFuture<Void>> registerPartitionMap;
private final Map<String, CompletableFuture<Void>> registerSubscriptionMap;
private final TransactionCoordinatorClientImpl tcClient;
private Map<ConsumerImpl<?>, Integer> cumulativeAckConsumers;

Expand All @@ -72,29 +73,27 @@ public class TransactionImpl implements Transaction {
this.txnIdLeastBits = txnIdLeastBits;
this.txnIdMostBits = txnIdMostBits;

this.producedTopics = new HashSet<>();
this.ackedTopics = new HashSet<>();
this.registerPartitionMap = new ConcurrentHashMap<>();
this.registerSubscriptionMap = new ConcurrentHashMap<>();
this.tcClient = client.getTcClient();

this.sendFutureList = new ArrayList<>();
this.ackFutureList = new ArrayList<>();
}

public long nextSequenceId() {
return sequenceId.getAndIncrement();
}

// register the topics that will be modified by this transaction
public synchronized CompletableFuture<Void> registerProducedTopic(String topic) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (producedTopics.add(topic)) {
// we need to issue the request to TC to register the produced topic
completableFuture = tcClient.addPublishPartitionToTxnAsync(
new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic));
} else {
completableFuture.complete(null);
}
return completableFuture;
// we need to issue the request to TC to register the produced topic
return registerPartitionMap.compute(topic, (key, future) -> {
if (future != null) {
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
} else {
return tcClient.addPublishPartitionToTxnAsync(
new TxnID(txnIdMostBits, txnIdLeastBits), Lists.newArrayList(topic))
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
}
});
}

public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture) {
Expand All @@ -104,14 +103,16 @@ public synchronized void registerSendOp(CompletableFuture<MessageId> sendFuture)
// register the topics that will be modified by this transaction
public synchronized CompletableFuture<Void> registerAckedTopic(String topic, String subscription) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (ackedTopics.add(topic)) {
// we need to issue the request to TC to register the acked topic
completableFuture = tcClient.addSubscriptionToTxnAsync(
new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription);
} else {
completableFuture.complete(null);
}
return completableFuture;
// we need to issue the request to TC to register the acked topic
return registerSubscriptionMap.compute(topic, (key, future) -> {
if (future != null) {
return future.thenCompose(ignored -> CompletableFuture.completedFuture(null));
} else {
return tcClient.addSubscriptionToTxnAsync(
new TxnID(txnIdMostBits, txnIdLeastBits), topic, subscription)
.thenCompose(ignored -> CompletableFuture.completedFuture(null));
}
});
}

public synchronized void registerAckOp(CompletableFuture<Void> ackFuture) {
Expand Down

0 comments on commit a5c9c15

Please sign in to comment.