Skip to content

Commit

Permalink
[Transaction] TransactionBuffer Refactor (apache#8347)
Browse files Browse the repository at this point in the history
Fix https://github.com/streamnative/pulsar/issues/1575
Fix issue apache#8378 

### Motivation

![image](https://user-images.githubusercontent.com/15029908/96908159-d38d3f80-14ce-11eb-9e52-ee066434d960.png)

Use the above approach instead of the sidecar approach.

### Modifications

1. Produce transaction messages to the topic partition.
2. The commit marker needs to record the related message-id list of its transaction.
3. When the dispatcher read a transaction marker, get the messages of the transaction by message-id list in the marker and send them to the consumer.
2. TransactionBuffer doesn't maintain any index data.
  • Loading branch information
gaoran10 authored Oct 30, 2020
1 parent 9d74007 commit 9173174
Show file tree
Hide file tree
Showing 55 changed files with 1,213 additions and 2,362 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2891,13 +2891,5 @@ private int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

public void internalInitBatchDeletedIndex(PositionImpl position, int totalNumMessageInBatch) {
batchDeletedIndexes.computeIfAbsent(position, key -> {
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, totalNumMessageInBatch);
return bitSetRecyclable;
});
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Class name for transaction buffer provider"
)
private String transactionBufferProviderClassName =
"org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBufferProvider";
"org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferProvider";

/**** --- KeyStore TLS config variables --- ****/
@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
import org.apache.pulsar.broker.stats.MetricsGenerator;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.PersistentTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl;
import org.apache.pulsar.broker.validator.MultipleListenerValidator;
import org.apache.pulsar.broker.web.WebService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.transaction.buffer.exceptions.UnsupportedTxnActionException;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TransactionBufferClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
Expand All @@ -40,10 +42,12 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

public class TransactionMetadataStoreService {

Expand Down Expand Up @@ -176,7 +180,7 @@ public CompletableFuture<Void> updateTxnStatus(TxnID txnId, TxnStatus newStatus,
return store.updateTxnStatus(txnId, newStatus, expectedStatus);
}

public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction, List<PulsarApi.MessageIdData> messageIdDataList) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
TxnStatus newStatus;
switch (txnAction) {
Expand All @@ -195,7 +199,7 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
}

completableFuture = updateTxnStatus(txnID, newStatus, TxnStatus.OPEN)
.thenCompose(ignored -> endToTB(txnID, txnAction));
.thenCompose(ignored -> endTxnInTransactionBuffer(txnID, txnAction, messageIdDataList));
if (TxnStatus.COMMITTING.equals(newStatus)) {
completableFuture = completableFuture
.thenCompose(ignored -> updateTxnStatus(txnID, TxnStatus.COMMITTED, TxnStatus.COMMITTING));
Expand All @@ -206,7 +210,8 @@ public CompletableFuture<Void> endTransaction(TxnID txnID, int txnAction) {
return completableFuture;
}

private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
private CompletableFuture<Void> endTxnInTransactionBuffer(TxnID txnID, int txnAction,
List<PulsarApi.MessageIdData> messageIdDataList) {
CompletableFuture<Void> resultFuture = new CompletableFuture<>();
List<CompletableFuture<TxnID>> completableFutureList = new ArrayList<>();
this.getTxnMeta(txnID).whenComplete((txnMeta, throwable) -> {
Expand All @@ -229,12 +234,25 @@ private CompletableFuture<Void> endToTB(TxnID txnID, int txnAction) {
completableFutureList.add(actionFuture);
});

List<MessageId> messageIdList = new ArrayList<>();
for (PulsarApi.MessageIdData messageIdData : messageIdDataList) {
messageIdList.add(new MessageIdImpl(
messageIdData.getLedgerId(), messageIdData.getEntryId(), messageIdData.getPartition()));
messageIdData.recycle();
}

txnMeta.producedPartitions().forEach(partition -> {
CompletableFuture<TxnID> actionFuture = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
actionFuture = tbClient.commitTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
messageIdList.stream().filter(
msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
TopicName.get(partition).getPartitionIndex()).collect(Collectors.toList()));
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits());
actionFuture = tbClient.abortTxnOnTopic(partition, txnID.getMostSigBits(), txnID.getLeastSigBits(),
messageIdList.stream().filter(
msg -> ((MessageIdImpl) msg).getPartitionIndex() ==
TopicName.get(partition).getPartitionIndex()).collect(Collectors.toList()));
} else {
actionFuture.completeExceptionally(new Throwable("Unsupported txnAction " + txnAction));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@

import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.service.persistent.TransactionReader;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageMetadata;
import org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.common.api.proto.PulsarMarkers.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
Expand Down Expand Up @@ -70,11 +73,13 @@ protected AbstractBaseDispatcher(Subscription subscription) {
*/
public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchSizes,
SendMessageInfo sendMessageInfo, EntryBatchIndexesAcks indexesAcks,
ManagedCursor cursor, TransactionReader transactionReader) {
ManagedCursor cursor, boolean isReplayRead) {
int totalMessages = 0;
long totalBytes = 0;
int totalChunkedMessages = 0;

boolean isAfterTxnCommitMarker = false;

for (int i = 0, entriesSize = entries.size(); i < entriesSize; i++) {
Entry entry = entries.get(i);
if (entry == null) {
Expand All @@ -86,9 +91,18 @@ public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchS
MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);

try {
if (Markers.isTxnCommitMarker(msgMetadata)) {
if (!isReplayRead && msgMetadata != null
&& msgMetadata.hasTxnidMostBits() && msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnCommitMarker(msgMetadata)) {
handleTxnCommitMarker(entry);
if (!isAfterTxnCommitMarker) {
isAfterTxnCommitMarker = true;
}
} else if (Markers.isTxnAbortMarker(msgMetadata)) {
handleTxnAbortMarker(entry);
}
entries.set(i, null);
transactionReader.addPendingTxn(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits());
entry.release();
continue;
} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
Expand All @@ -109,11 +123,11 @@ && trackDelayedDelivery(entry.getLedgerId(), entry.getEntryId(), msgMetadata)) {
entries.set(i, null);
entry.release();
continue;
}

if (entry instanceof TransactionEntryImpl) {
((TransactionEntryImpl) entry).setStartBatchIndex(
transactionReader.calculateStartBatchIndex(msgMetadata.getNumMessagesInBatch()));
} else if (isAfterTxnCommitMarker) {
addMessageToReplay(entry.getLedgerId(), entry.getEntryId());
entries.set(i, null);
entry.release();
continue;
}

int batchSize = msgMetadata.getNumMessagesInBatch();
Expand Down Expand Up @@ -172,4 +186,39 @@ protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
return key;
}

protected void addMessageToReplay(long ledgerId, long entryId) {
// No-op
}

private void handleTxnCommitMarker(Entry entry) {
ByteBuf byteBuf = entry.getDataBuffer();
Commands.skipMessageMetadata(byteBuf);
try {
PulsarMarkers.TxnCommitMarker commitMarker = Markers.parseCommitMarker(byteBuf);
for (PulsarMarkers.MessageIdData messageIdData : commitMarker.getMessageIdList()) {
addMessageToReplay(messageIdData.getLedgerId(), messageIdData.getEntryId());
}
} catch (IOException e) {
log.error("Failed to parse commit marker.", e);
}
}

private void handleTxnAbortMarker(Entry entry) {
((PersistentTopic) subscription.getTopic()).getBrokerService().getPulsar().getOrderedExecutor().execute(() -> {
ByteBuf byteBuf = entry.getDataBuffer();
Commands.skipMessageMetadata(byteBuf);
try {
List<Position> positionList = new ArrayList<>();
PulsarMarkers.TxnCommitMarker abortMarker = Markers.parseCommitMarker(byteBuf);
for (PulsarMarkers.MessageIdData messageIdData : abortMarker.getMessageIdList()) {
positionList.add(PositionImpl.get(messageIdData.getLedgerId(), messageIdData.getEntryId()));
}
subscription.acknowledgeMessage(
positionList, PulsarApi.CommandAck.AckType.Individual, Collections.emptyMap());
} catch (IOException e) {
log.error("Failed to parse abort marker.", e);
}
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
Expand Down Expand Up @@ -287,9 +286,6 @@ public ChannelPromise sendMessages(final List<Entry> entries, EntryBatchSizes ba
}

MessageIdData.Builder messageIdBuilder = MessageIdData.newBuilder();
if (entry instanceof TransactionEntryImpl) {
messageIdBuilder.setBatchIndex(((TransactionEntryImpl) entry).getStartBatchIndex());
}
MessageIdData messageId = messageIdBuilder
.setLedgerId(entry.getLedgerId())
.setEntryId(entry.getEntryId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,5 @@ default void cursorIsReset() {
default void acknowledgementWasProcessed() {
// No-op
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ public void checkEncryption() {
public void publishTxnMessage(TxnID txnID, long producerId, long sequenceId, long highSequenceId,
ByteBuf headersAndPayload, long batchSize, boolean isChunked) {
beforePublish(producerId, sequenceId, headersAndPayload, batchSize);
topic.publishTxnMessage(txnID, headersAndPayload, batchSize,
topic.publishTxnMessage(txnID, headersAndPayload,
MessagePublishContext.get(this, sequenceId, highSequenceId, msgIn,
headersAndPayload.readableBytes(), batchSize, isChunked, System.nanoTime()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1714,16 +1714,16 @@ protected void handleEndTxn(PulsarApi.CommandEndTxn command) {
final int txnAction = command.getTxnAction().getNumber();
TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());

service.pulsar().getTransactionMetadataStoreService().endTransaction(txnID, txnAction)
.thenRun(() -> {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}).exceptionally(throwable -> {
log.error("Send response error for end txn request.", throwable);
ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
return null;
});
service.pulsar().getTransactionMetadataStoreService()
.endTransaction(txnID, txnAction, command.getMessageIdList())
.thenRun(() -> {
ctx.writeAndFlush(Commands.newEndTxnResponse(requestId,
txnID.getLeastSigBits(), txnID.getMostSigBits()));
}).exceptionally(throwable -> {
log.error("Send response error for end txn request.", throwable);
ctx.writeAndFlush(Commands.newEndTxnResponse(command.getRequestId(), txnID.getMostSigBits(),
BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage()));
return null; });
}

@Override
Expand All @@ -1739,7 +1739,7 @@ protected void handleEndTxnOnPartition(PulsarApi.CommandEndTxnOnPartition comman
"Topic " + command.getTopic() + " is not found."));
return;
}
topic.get().endTxn(txnID, txnAction)
topic.get().endTxn(txnID, txnAction, command.getMessageIdList())
.whenComplete((ignored, throwable) -> {
if (throwable != null) {
log.error("Handle endTxnOnPartition {} failed.", command.getTopic(), throwable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.service;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand All @@ -31,6 +32,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.policies.data.BacklogQuota;
Expand Down Expand Up @@ -239,10 +241,9 @@ default boolean isSystemTopic() {
*
* @param txnID Transaction Id
* @param headersAndPayload Message data
* @param batchSize messages number in a batch
* @param publishContext Publish context
*/
void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, long batchSize, PublishContext publishContext);
void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishContext publishContext);

/**
* End the transaction in this topic.
Expand All @@ -251,6 +252,6 @@ default boolean isSystemTopic() {
* @param txnAction Transaction action.
* @return
*/
CompletableFuture<Void> endTxn(TxnID txnID, int txnAction);
CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, List<MessageIdData> sendMessageIdList);

}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public void sendMessages(List<Entry> entries) {
if (consumer != null) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, null);
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
consumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public void sendMessages(List<Entry> entries) {
if (currentConsumer != null && currentConsumer.getAvailablePermits() > 0 && currentConsumer.isWritable()) {
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, null);
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false);
currentConsumer.sendMessages(entries, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void sendMessages(List<Entry> entries) {

SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, null);
filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false);
consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), getRedeliveryTracker());
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
Expand Down
Loading

0 comments on commit 9173174

Please sign in to comment.