Skip to content

Commit

Permalink
[Transaction] Support consume transaction messages. (apache#7781)
Browse files Browse the repository at this point in the history
Master Issue: apache#2664 

Fix https://github.com/streamnative/pulsar/issues/1304

### Motivation

Currently, the consumer can't receive transaction messages.

### Modifications

Support process the commit marker in the topic partition and fetch transaction messages from TransactionBuffer.
  • Loading branch information
gaoran10 authored Aug 14, 2020
1 parent 4f7c9ab commit 6e7d1a8
Show file tree
Hide file tree
Showing 17 changed files with 497 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@

package org.apache.pulsar.broker.service;

import com.google.common.collect.Queues;
import io.netty.buffer.ByteBuf;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -32,6 +34,7 @@
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
Expand All @@ -45,9 +48,11 @@
public abstract class AbstractBaseDispatcher implements Dispatcher {

protected final Subscription subscription;
protected final ConcurrentLinkedQueue<TxnID> pendingTxnQueue;

protected AbstractBaseDispatcher(Subscription subscription) {
this.subscription = subscription;
this.pendingTxnQueue = Queues.newConcurrentLinkedQueue();
}

/**
Expand Down Expand Up @@ -87,7 +92,11 @@ public void filterEntriesForConsumer(List<Entry> entries, EntryBatchSizes batchS
MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1);

try {
if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
if (Markers.isTxnCommitMarker(msgMetadata)) {
entries.set(i, null);
pendingTxnQueue.add(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()));
continue;
} else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

Expand Down Expand Up @@ -163,4 +172,16 @@ protected byte[] peekStickyKey(ByteBuf metadataAndPayload) {
metadata.recycle();
return key;
}

public boolean havePendingTxnToRead() {
return pendingTxnQueue.size() > 0;
}

public Subscription getSubscription() {
return this.subscription;
}

public ConcurrentLinkedQueue<TxnID> getPendingTxnQueue() {
return this.pendingTxnQueue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
protected final ServiceConfiguration serviceConfig;
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

private TransactionReader transactionReader;

enum ReadType {
Normal, Replay
}
Expand All @@ -120,6 +122,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
this.transactionReader = new TransactionReader(this);
}

@Override
Expand Down Expand Up @@ -351,6 +354,8 @@ public void readMoreEntries() {
} else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name,
totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription());
} else if (havePendingTxnToRead()) {
transactionReader.read(messagesToRead, ReadType.Normal, this);
} else if (!havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp

private final RedeliveryTracker redeliveryTracker;

private TransactionReader transactionReader;

public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
Expand All @@ -84,6 +86,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
this.transactionReader = new TransactionReader(this);
}

protected void scheduleReadOnActiveConsumer() {
Expand Down Expand Up @@ -453,7 +456,9 @@ protected void readMoreEntries(Consumer consumer) {
}
havePendingRead = true;

if (consumer.readCompacted()) {
if (havePendingTxnToRead()) {
transactionReader.read(messagesToRead, consumer, this);
} else if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.service.AbstractBaseDispatcher;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.client.api.transaction.TxnID;

/**
* Used to read transaction messages for dispatcher.
*/
@Slf4j
public class TransactionReader {

private final AbstractBaseDispatcher dispatcher;
private volatile TransactionBuffer transactionBuffer;
private volatile long startSequenceId = 0;
private volatile CompletableFuture<TransactionBufferReader> transactionBufferReader;

public TransactionReader(AbstractBaseDispatcher abstractBaseDispatcher) {
this.dispatcher = abstractBaseDispatcher;
}

/**
* Get ${@link TransactionBuffer} lazily and read transaction messages.
*
* @param readMessageNum messages num to read
* @param ctx context object
* @param readEntriesCallback ReadEntriesCallback
*/
public void read(int readMessageNum, Object ctx, AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
if (transactionBuffer == null) {
dispatcher.getSubscription().getTopic()
.getTransactionBuffer(false).whenComplete((tb, throwable) -> {
if (throwable != null) {
log.error("Get transactionBuffer failed.", throwable);
readEntriesCallback.readEntriesFailed(
ManagedLedgerException.getManagedLedgerException(throwable), ctx);
return;
}
transactionBuffer = tb;
internalRead(readMessageNum, ctx, readEntriesCallback);
});
} else {
internalRead(readMessageNum, ctx, readEntriesCallback);
}
}

/**
* Read specify number transaction messages by ${@link TransactionBufferReader}.
*
* @param readMessageNum messages num to read
* @param ctx context object
* @param readEntriesCallback ReadEntriesCallback
*/
private void internalRead(int readMessageNum, Object ctx, AsyncCallbacks.ReadEntriesCallback readEntriesCallback) {
final TxnID txnID = getValidTxn();
if (txnID == null) {
log.error("No valid txn to read.");
readEntriesCallback.readEntriesFailed(
ManagedLedgerException.getManagedLedgerException(new Exception("No valid txn to read.")), ctx);
return;
}
if (transactionBufferReader == null) {
transactionBufferReader = transactionBuffer.openTransactionBufferReader(txnID, startSequenceId);
}
transactionBufferReader.thenAccept(reader -> {
reader.readNext(readMessageNum).whenComplete((transactionEntries, throwable) -> {
if (throwable != null) {
log.error("Read transaction messages failed.", throwable);
readEntriesCallback.readEntriesFailed(
ManagedLedgerException.getManagedLedgerException(throwable), ctx);
return;
}
if (transactionEntries == null || transactionEntries.size() < readMessageNum) {
startSequenceId = 0;
dispatcher.getPendingTxnQueue().remove(txnID);
transactionBufferReader = null;
reader.close();
}
List<Entry> entryList = new ArrayList<>(transactionEntries.size());
for (int i = 0; i < transactionEntries.size(); i++) {
if (i == (transactionEntries.size() -1)) {
startSequenceId = transactionEntries.get(i).sequenceId();
}
entryList.add(transactionEntries.get(i).getEntry());
}
readEntriesCallback.readEntriesComplete(entryList, ctx);
});
}).exceptionally(throwable -> {
log.error("Open transactionBufferReader failed.", throwable);
readEntriesCallback.readEntriesFailed(
ManagedLedgerException.getManagedLedgerException(throwable), ctx);
return null;
});
}

private TxnID getValidTxn() {
TxnID txnID;
do {
txnID = dispatcher.getPendingTxnQueue().peek();
if (txnID == null) {
if (log.isDebugEnabled()) {
log.debug("Peek null txnID from dispatcher pendingTxnQueue.");
}
dispatcher.getPendingTxnQueue().poll();
if (dispatcher.getPendingTxnQueue().size() <= 0) {
break;
}
}
} while (txnID == null);
return txnID;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.common.annotations.Beta;
import io.netty.buffer.ByteBuf;

import org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.client.api.transaction.TxnID;

/**
Expand Down Expand Up @@ -58,11 +59,11 @@ public interface TransactionEntry extends AutoCloseable {
long committedAtEntryId();

/**
* Returns the entry buffer.
* Returns the entry saved in {@link TransactionBuffer}.
*
* @return the entry buffer.
* @return the {@link Entry}
*/
ByteBuf getEntryBuffer();
Entry getEntry();

/**
* Close the entry to release the resource that it holds.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.CompletableFuture;

import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException;
Expand Down Expand Up @@ -67,7 +69,7 @@ public synchronized CompletableFuture<List<TransactionEntry>> readNext(int numEn
TransactionEntry txnEntry = new TransactionEntryImpl(
txnId,
entry.getKey(),
entry.getValue(),
EntryImpl.create(-1L, -1L, entry.getValue()),
committedAtLedgerId,
committedAtEntryId
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ private CompletableFuture<List<TransactionEntry>> readEntry(SortedMap<Long, Posi
tmpFuture.completeExceptionally(throwable);
} else {
TransactionEntry txnEntry = new TransactionEntryImpl(meta.id(), longPositionEntry.getKey(),
entry.getDataBuffer(),
meta.committedAtLedgerId(),
meta.committedAtEntryId());
entry, meta.committedAtLedgerId(), meta.committedAtEntryId());
synchronized (txnEntries) {
txnEntries.add(txnEntry);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.transaction.buffer.impl;

import io.netty.buffer.ByteBuf;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.pulsar.broker.transaction.buffer.TransactionEntry;
import org.apache.pulsar.client.api.transaction.TxnID;

Expand All @@ -31,16 +33,16 @@ public class TransactionEntryImpl implements TransactionEntry {
private final long sequenceId;
private final long committedAtLedgerId;
private final long committedAtEntryId;
private final ByteBuf entryBuf;
private final Entry entry;

public TransactionEntryImpl(TxnID txnId,
long sequenceId,
ByteBuf entryBuf,
Entry entry,
long committedAtLedgerId,
long committedAtEntryId) {
this.txnId = txnId;
this.sequenceId = sequenceId;
this.entryBuf = entryBuf;
this.entry = entry;
this.committedAtLedgerId = committedAtLedgerId;
this.committedAtEntryId = committedAtEntryId;
}
Expand All @@ -66,14 +68,15 @@ public long committedAtEntryId() {
}

@Override
public ByteBuf getEntryBuffer() {
return entryBuf;
public Entry getEntry() {
return entry;
}

@Override
public void close() {
if (null != entryBuf) {
entryBuf.release();
if (null != entry) {
entry.getDataBuffer().release();
entry.release();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public CompletableFuture<SortedMap<Long, Position>> readEntries(int num, long st

SortedMap<Long, Position> readEntries = entries;
if (startSequenceId != PersistentTransactionBufferReader.DEFAULT_START_SEQUENCE_ID) {
readEntries = entries.tailMap(startSequenceId);
readEntries = entries.tailMap(startSequenceId + 1);
}

if (readEntries.isEmpty()) {
Expand Down
Loading

0 comments on commit 6e7d1a8

Please sign in to comment.