From 6e7d1a83c3c2737610f01cb372f61e2b830a62f7 Mon Sep 17 00:00:00 2001 From: ran Date: Fri, 14 Aug 2020 09:21:44 +0800 Subject: [PATCH] [Transaction] Support consume transaction messages. (#7781) Master Issue: #2664 Fix https://github.com/streamnative/pulsar/issues/1304 ### Motivation Currently, the consumer can't receive transaction messages. ### Modifications Support process the commit marker in the topic partition and fetch transaction messages from TransactionBuffer. --- .../service/AbstractBaseDispatcher.java | 23 ++- ...PersistentDispatcherMultipleConsumers.java | 5 + ...sistentDispatcherSingleActiveConsumer.java | 7 +- .../service/persistent/TransactionReader.java | 140 +++++++++++++++ .../transaction/buffer/TransactionEntry.java | 7 +- .../impl/InMemTransactionBufferReader.java | 4 +- .../PersistentTransactionBufferReader.java | 4 +- .../buffer/impl/TransactionEntryImpl.java | 17 +- .../buffer/impl/TransactionMetaImpl.java | 2 +- .../transaction/TransactionConsumeTest.java | 163 ++++++++++++++++++ ...nTest.java => TransactionProduceTest.java} | 4 +- .../InMemTransactionBufferReaderTest.java | 2 +- .../PersistentTransactionBufferTest.java | 7 +- .../buffer/TransactionBufferTest.java | 2 +- .../buffer/TransactionEntryImplTest.java | 5 +- .../TransactionMetaStoreTestBase.java | 2 +- .../client/transaction/EndToEndTest.java | 130 ++++++++++++++ 17 files changed, 497 insertions(+), 27 deletions(-) create mode 100644 pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java rename pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/{PulsarClientTransactionTest.java => TransactionProduceTest.java} (98%) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java index 7cf9793c676d3..4ee66495445b1 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java @@ -19,11 +19,13 @@ package org.apache.pulsar.broker.service; +import com.google.common.collect.Queues; import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import lombok.extern.slf4j.Slf4j; @@ -32,6 +34,7 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.compression.CompressionCodec; import org.apache.pulsar.common.compression.CompressionCodecProvider; @@ -45,9 +48,11 @@ public abstract class AbstractBaseDispatcher implements Dispatcher { protected final Subscription subscription; + protected final ConcurrentLinkedQueue pendingTxnQueue; protected AbstractBaseDispatcher(Subscription subscription) { this.subscription = subscription; + this.pendingTxnQueue = Queues.newConcurrentLinkedQueue(); } /** @@ -87,7 +92,11 @@ public void filterEntriesForConsumer(List entries, EntryBatchSizes batchS MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription.toString(), -1); try { - if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) { + if (Markers.isTxnCommitMarker(msgMetadata)) { + entries.set(i, null); + pendingTxnQueue.add(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits())); + continue; + } else if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) { PositionImpl pos = (PositionImpl) entry.getPosition(); // Message metadata was corrupted or the messages was a server-only marker @@ -163,4 +172,16 @@ protected byte[] peekStickyKey(ByteBuf metadataAndPayload) { metadata.recycle(); return key; } + + public boolean havePendingTxnToRead() { + return pendingTxnQueue.size() > 0; + } + + public Subscription getSubscription() { + return this.subscription; + } + + public ConcurrentLinkedQueue getPendingTxnQueue() { + return this.pendingTxnQueue; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index a6d6582e8019f..37f34361271ff 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -104,6 +104,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul protected final ServiceConfiguration serviceConfig; protected Optional dispatchRateLimiter = Optional.empty(); + private TransactionReader transactionReader; + enum ReadType { Normal, Replay } @@ -120,6 +122,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); + this.transactionReader = new TransactionReader(this); } @Override @@ -351,6 +354,8 @@ public void readMoreEntries() { } else if (BLOCKED_DISPATCHER_ON_UNACKMSG_UPDATER.get(this) == TRUE) { log.warn("[{}] Dispatcher read is blocked due to unackMessages {} reached to max {}", name, totalUnackedMessages, topic.getMaxUnackedMessagesOnSubscription()); + } else if (havePendingTxnToRead()) { + transactionReader.read(messagesToRead, ReadType.Normal, this); } else if (!havePendingRead) { if (log.isDebugEnabled()) { log.debug("[{}] Schedule read of {} messages for {} consumers", name, messagesToRead, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 58a63bd044e7e..893025dc3fb0d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -73,6 +73,8 @@ public final class PersistentDispatcherSingleActiveConsumer extends AbstractDisp private final RedeliveryTracker redeliveryTracker; + private TransactionReader transactionReader; + public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex, PersistentTopic topic, Subscription subscription) { super(subscriptionType, partitionIndex, topic.getName(), subscription); @@ -84,6 +86,7 @@ public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType su this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.initializeDispatchRateLimiterIfNeeded(Optional.empty()); + this.transactionReader = new TransactionReader(this); } protected void scheduleReadOnActiveConsumer() { @@ -453,7 +456,9 @@ protected void readMoreEntries(Consumer consumer) { } havePendingRead = true; - if (consumer.readCompacted()) { + if (havePendingTxnToRead()) { + transactionReader.read(messagesToRead, consumer, this); + } else if (consumer.readCompacted()) { topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer); } else { cursor.asyncReadEntriesOrWait(messagesToRead, serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java new file mode 100644 index 0000000000000..93f56a9e4b8a9 --- /dev/null +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/TransactionReader.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service.persistent; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; + +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.Entry; +import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.broker.service.AbstractBaseDispatcher; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; +import org.apache.pulsar.client.api.transaction.TxnID; + +/** + * Used to read transaction messages for dispatcher. + */ +@Slf4j +public class TransactionReader { + + private final AbstractBaseDispatcher dispatcher; + private volatile TransactionBuffer transactionBuffer; + private volatile long startSequenceId = 0; + private volatile CompletableFuture transactionBufferReader; + + public TransactionReader(AbstractBaseDispatcher abstractBaseDispatcher) { + this.dispatcher = abstractBaseDispatcher; + } + + /** + * Get ${@link TransactionBuffer} lazily and read transaction messages. + * + * @param readMessageNum messages num to read + * @param ctx context object + * @param readEntriesCallback ReadEntriesCallback + */ + public void read(int readMessageNum, Object ctx, AsyncCallbacks.ReadEntriesCallback readEntriesCallback) { + if (transactionBuffer == null) { + dispatcher.getSubscription().getTopic() + .getTransactionBuffer(false).whenComplete((tb, throwable) -> { + if (throwable != null) { + log.error("Get transactionBuffer failed.", throwable); + readEntriesCallback.readEntriesFailed( + ManagedLedgerException.getManagedLedgerException(throwable), ctx); + return; + } + transactionBuffer = tb; + internalRead(readMessageNum, ctx, readEntriesCallback); + }); + } else { + internalRead(readMessageNum, ctx, readEntriesCallback); + } + } + + /** + * Read specify number transaction messages by ${@link TransactionBufferReader}. + * + * @param readMessageNum messages num to read + * @param ctx context object + * @param readEntriesCallback ReadEntriesCallback + */ + private void internalRead(int readMessageNum, Object ctx, AsyncCallbacks.ReadEntriesCallback readEntriesCallback) { + final TxnID txnID = getValidTxn(); + if (txnID == null) { + log.error("No valid txn to read."); + readEntriesCallback.readEntriesFailed( + ManagedLedgerException.getManagedLedgerException(new Exception("No valid txn to read.")), ctx); + return; + } + if (transactionBufferReader == null) { + transactionBufferReader = transactionBuffer.openTransactionBufferReader(txnID, startSequenceId); + } + transactionBufferReader.thenAccept(reader -> { + reader.readNext(readMessageNum).whenComplete((transactionEntries, throwable) -> { + if (throwable != null) { + log.error("Read transaction messages failed.", throwable); + readEntriesCallback.readEntriesFailed( + ManagedLedgerException.getManagedLedgerException(throwable), ctx); + return; + } + if (transactionEntries == null || transactionEntries.size() < readMessageNum) { + startSequenceId = 0; + dispatcher.getPendingTxnQueue().remove(txnID); + transactionBufferReader = null; + reader.close(); + } + List entryList = new ArrayList<>(transactionEntries.size()); + for (int i = 0; i < transactionEntries.size(); i++) { + if (i == (transactionEntries.size() -1)) { + startSequenceId = transactionEntries.get(i).sequenceId(); + } + entryList.add(transactionEntries.get(i).getEntry()); + } + readEntriesCallback.readEntriesComplete(entryList, ctx); + }); + }).exceptionally(throwable -> { + log.error("Open transactionBufferReader failed.", throwable); + readEntriesCallback.readEntriesFailed( + ManagedLedgerException.getManagedLedgerException(throwable), ctx); + return null; + }); + } + + private TxnID getValidTxn() { + TxnID txnID; + do { + txnID = dispatcher.getPendingTxnQueue().peek(); + if (txnID == null) { + if (log.isDebugEnabled()) { + log.debug("Peek null txnID from dispatcher pendingTxnQueue."); + } + dispatcher.getPendingTxnQueue().poll(); + if (dispatcher.getPendingTxnQueue().size() <= 0) { + break; + } + } + } while (txnID == null); + return txnID; + } + +} diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntry.java index 168b47832108c..8c9630bf0429c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntry.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntry.java @@ -21,6 +21,7 @@ import com.google.common.annotations.Beta; import io.netty.buffer.ByteBuf; +import org.apache.bookkeeper.mledger.Entry; import org.apache.pulsar.client.api.transaction.TxnID; /** @@ -58,11 +59,11 @@ public interface TransactionEntry extends AutoCloseable { long committedAtEntryId(); /** - * Returns the entry buffer. + * Returns the entry saved in {@link TransactionBuffer}. * - * @return the entry buffer. + * @return the {@link Entry} */ - ByteBuf getEntryBuffer(); + Entry getEntry(); /** * Close the entry to release the resource that it holds. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java index 8a989eb0014fe..d4926e39c678a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/InMemTransactionBufferReader.java @@ -24,6 +24,8 @@ import java.util.List; import java.util.Map.Entry; import java.util.concurrent.CompletableFuture; + +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.pulsar.broker.transaction.buffer.TransactionBufferReader; import org.apache.pulsar.broker.transaction.buffer.TransactionEntry; import org.apache.pulsar.broker.transaction.buffer.exceptions.EndOfTransactionException; @@ -67,7 +69,7 @@ public synchronized CompletableFuture> readNext(int numEn TransactionEntry txnEntry = new TransactionEntryImpl( txnId, entry.getKey(), - entry.getValue(), + EntryImpl.create(-1L, -1L, entry.getValue()), committedAtLedgerId, committedAtEntryId ); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java index f3250cab5863e..45aff7918134d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/PersistentTransactionBufferReader.java @@ -83,9 +83,7 @@ private CompletableFuture> readEntry(SortedMap> readEntries(int num, long st SortedMap readEntries = entries; if (startSequenceId != PersistentTransactionBufferReader.DEFAULT_START_SEQUENCE_ID) { - readEntries = entries.tailMap(startSequenceId); + readEntries = entries.tailMap(startSequenceId + 1); } if (readEntries.isEmpty()) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java new file mode 100644 index 0000000000000..9734f18bb85e8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionConsumeTest.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.transaction; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Sets; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer; +import org.apache.pulsar.broker.transaction.coordinator.TransactionMetaStoreTestBase; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.PulsarApi; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.common.protocol.Commands; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Test for consuming transaction messages. + */ +@Slf4j +public class TransactionConsumeTest extends TransactionMetaStoreTestBase { + + private final static String CONSUME_TOPIC = "persistent://public/txn/txn-consume-test"; + private final static String NORMAL_MSG_CONTENT = "Normal - "; + private final static String TXN_MSG_CONTENT = "Txn - "; + + @BeforeClass + public void init() throws Exception { + BROKER_COUNT = 1; + super.setup(); + + pulsarAdmins[0].clusters().createCluster("my-cluster", new ClusterData(pulsarServices[0].getWebServiceAddress())); + pulsarAdmins[0].tenants().createTenant("public", new TenantInfo(Sets.newHashSet(), Sets.newHashSet("my-cluster"))); + pulsarAdmins[0].namespaces().createNamespace("public/txn", 10); + pulsarAdmins[0].topics().createNonPartitionedTopic(CONSUME_TOPIC); + } + + @Test + public void noSortedTest() throws Exception { + int messageCntBeforeTxn = 10; + int transactionMessageCnt = 10; + int messageCntAfterTxn = 10; + int totalMsgCnt = messageCntBeforeTxn + transactionMessageCnt + messageCntAfterTxn; + + Producer producer = pulsarClient.newProducer() + .topic(CONSUME_TOPIC) + .create(); + + Consumer exclusiveConsumer = pulsarClient.newConsumer() + .topic(CONSUME_TOPIC) + .subscriptionName("exclusive-test") + .subscribe(); + + Consumer sharedConsumer = pulsarClient.newConsumer() + .topic(CONSUME_TOPIC) + .subscriptionName("shared-test") + .subscriptionType(SubscriptionType.Shared) + .subscribe(); + + long mostSigBits = 2L; + long leastSigBits = 5L; + TxnID txnID = new TxnID(mostSigBits, leastSigBits); + + PersistentTopic persistentTopic = (PersistentTopic) pulsarServices[0].getBrokerService() + .getTopic(CONSUME_TOPIC, false).get().get(); + TransactionBuffer transactionBuffer = persistentTopic.getTransactionBuffer(true).get(); + log.info("transactionBuffer init finish."); + + sendNormalMessages(producer, 0, messageCntBeforeTxn); + // append messages to TB + appendTransactionMessages(txnID, transactionBuffer, transactionMessageCnt); + sendNormalMessages(producer, messageCntBeforeTxn, messageCntAfterTxn); + + for (int i = 0; i < totalMsgCnt; i++) { + if (i < (messageCntBeforeTxn + messageCntAfterTxn)) { + // receive normal messages successfully + Message message = exclusiveConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive exclusive normal msg: {}" + new String(message.getData(), UTF_8)); + message = sharedConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive shared normal msg: {}" + new String(message.getData(), UTF_8)); + } else { + // can't receive transaction messages before commit + Message message = exclusiveConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNull(message); + message = sharedConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNull(message); + log.info("Can't receive message before commit."); + } + } + + transactionBuffer.endTxnOnPartition(txnID, PulsarApi.TxnAction.COMMIT.getNumber()); + Thread.sleep(1000); + log.info("Commit txn."); + + for (int i = 0; i < transactionMessageCnt; i++) { + Message message = exclusiveConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive txn exclusive msg: {}", new String(message.getData())); + message = sharedConsumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + log.info("Receive txn shared msg: {}", new String(message.getData(), UTF_8)); + } + + exclusiveConsumer.close(); + sharedConsumer.close(); + } + + private void sendNormalMessages(Producer producer, int startMsgCnt, int messageCnt) + throws PulsarClientException { + for (int i = 0; i < messageCnt; i++) { + producer.newMessage().value((NORMAL_MSG_CONTENT + (startMsgCnt + i)).getBytes(UTF_8)).send(); + } + } + + private void appendTransactionMessages(TxnID txnID, TransactionBuffer tb, int transactionMsgCnt) { + for (int i = 0; i < transactionMsgCnt; i++) { + PulsarApi.MessageMetadata.Builder builder = PulsarApi.MessageMetadata.newBuilder(); + builder.setProducerName("producerName"); + builder.setSequenceId(10L); + builder.setTxnidMostBits(txnID.getMostSigBits()); + builder.setTxnidLeastBits(txnID.getLeastSigBits()); + builder.setPublishTime(System.currentTimeMillis()); + + ByteBuf headerAndPayload = Commands.serializeMetadataAndPayload( + Commands.ChecksumType.Crc32c, builder.build(), + Unpooled.copiedBuffer((TXN_MSG_CONTENT + i).getBytes(UTF_8))); + tb.appendBufferToTxn(txnID, i, headerAndPayload); + } + log.info("append messages to TB finish."); + } + +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/PulsarClientTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java similarity index 98% rename from pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/PulsarClientTransactionTest.java rename to pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java index 58e46c78e48bb..0843161beda46 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/PulsarClientTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java @@ -58,7 +58,7 @@ * Pulsar client transaction test. */ @Slf4j -public class PulsarClientTransactionTest extends TransactionTestBase { +public class TransactionProduceTest extends TransactionTestBase { private final static int TOPIC_PARTITION = 3; @@ -105,7 +105,7 @@ protected void cleanup() throws Exception { } @Test - public void produceCommitTest() throws Exception { + public void produceAndCommitTest() throws Exception { PulsarClientImpl pulsarClientImpl = (PulsarClientImpl) pulsarClient; Transaction tnx = pulsarClientImpl.newTransaction() .withTransactionTimeout(5, TimeUnit.SECONDS) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java index e9577757c0b7a..345f914636a78 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/InMemTransactionBufferReaderTest.java @@ -132,7 +132,7 @@ private void verifyAndReleaseEntries(List txnEntries, assertEquals(txnEntry.txnId(), txnID); assertEquals(txnEntry.sequenceId(), startSequenceId + i); assertEquals(new String( - ByteBufUtil.getBytes(txnEntry.getEntryBuffer()), + ByteBufUtil.getBytes(txnEntry.getEntry().getDataBuffer()), UTF_8 ), "message-" + i); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java index b0f59574251dc..26c84a625fcd3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/PersistentTransactionBufferTest.java @@ -105,6 +105,7 @@ import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -412,9 +413,9 @@ public void testOpenReaderOnCommittedTxn() throws Exception { verifyAndReleaseEntries(entries, txnID, 0L, numEntries); reader.readNext(1).get(); - + Assert.fail("Should cause the exception `EndOfTransactionException`."); } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof EndOfTransactionException); + assertTrue(ee.getCause() instanceof EndOfTransactionException); } } @@ -754,7 +755,7 @@ private void verifyAndReleaseEntries(List txnEntries, assertEquals(txnEntry.txnId(), txnID); assertEquals(txnEntry.sequenceId(), startSequenceId + i); assertEquals(new String( - ByteBufUtil.getBytes(txnEntry.getEntryBuffer()), + ByteBufUtil.getBytes(txnEntry.getEntry().getDataBuffer()), UTF_8 ), "message-" + i); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java index 952e79ece7a8d..6356668202d20 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferTest.java @@ -258,7 +258,7 @@ private void verifyAndReleaseEntries(List txnEntries, assertEquals(txnEntry.txnId(), txnID); assertEquals(txnEntry.sequenceId(), startSequenceId + i); assertEquals(new String( - ByteBufUtil.getBytes(txnEntry.getEntryBuffer()), + ByteBufUtil.getBytes(txnEntry.getEntry().getDataBuffer()), UTF_8 ), "message-" + i); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntryImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntryImplTest.java index fae4480d9cc6c..4d9f452a7161b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntryImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionEntryImplTest.java @@ -23,6 +23,7 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; +import org.apache.bookkeeper.mledger.impl.EntryImpl; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionEntryImpl; import org.apache.pulsar.client.api.transaction.TxnID; import org.testng.annotations.Test; @@ -38,11 +39,11 @@ public void testCloseShouldReleaseBuffer() { TransactionEntryImpl entry = new TransactionEntryImpl( new TxnID(1234L, 3456L), 0L, - buffer, + EntryImpl.create(12L, 23L, buffer), 33L, 44L ); - assertEquals(buffer.refCnt(), 1); + assertEquals(buffer.refCnt(), 2); entry.close(); assertEquals(buffer.refCnt(), 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java index b8df885b01d12..67091b344d7e4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/coordinator/TransactionMetaStoreTestBase.java @@ -40,7 +40,7 @@ public class TransactionMetaStoreTestBase { LocalBookkeeperEnsemble bkEnsemble; protected PulsarAdmin[] pulsarAdmins = new PulsarAdmin[BROKER_COUNT]; protected PulsarClient pulsarClient; - protected static final int BROKER_COUNT = 5; + protected static int BROKER_COUNT = 5; protected ServiceConfiguration[] configurations = new ServiceConfiguration[BROKER_COUNT]; protected PulsarService[] pulsarServices = new PulsarService[BROKER_COUNT]; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java new file mode 100644 index 0000000000000..f0fa3b61405ca --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/transaction/EndToEndTest.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.transaction; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.collect.Sets; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.transaction.Transaction; +import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; +import org.apache.pulsar.client.impl.PartitionedProducerImpl; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * End to end transaction test. + */ +@Slf4j +public class EndToEndTest extends TransactionTestBase { + + + private final static int TOPIC_PARTITION = 3; + + private final static String CLUSTER_NAME = "test"; + private final static String TENANT = "tnx"; + private final static String NAMESPACE1 = TENANT + "/ns1"; + private final static String TOPIC_OUTPUT = NAMESPACE1 + "/output"; + + @BeforeMethod + protected void setup() throws Exception { + internalSetup(); + + int webServicePort = getServiceConfigurationList().get(0).getWebServicePort().get(); + admin.clusters().createCluster(CLUSTER_NAME, new ClusterData("http://localhost:" + webServicePort)); + admin.tenants().createTenant(TENANT, + new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NAMESPACE1); + admin.topics().createPartitionedTopic(TOPIC_OUTPUT, 3); + + admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(), + new TenantInfo(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); + admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 16); + + int brokerPort = getServiceConfigurationList().get(0).getBrokerServicePort().get(); + pulsarClient = PulsarClient.builder() + .serviceUrl("pulsar://localhost:" + brokerPort) + .statsInterval(0, TimeUnit.SECONDS) + .enableTransaction(true) + .build(); + + Thread.sleep(1000 * 3); + } + + @Test + public void test() throws Exception { + Transaction txn = ((PulsarClientImpl) pulsarClient) + .newTransaction() + .withTransactionTimeout(2, TimeUnit.SECONDS) + .build() + .get(); + + @Cleanup + PartitionedProducerImpl producer = (PartitionedProducerImpl) pulsarClient + .newProducer() + .topic(TOPIC_OUTPUT) + .sendTimeout(0, TimeUnit.SECONDS) + .enableBatching(false) + .create(); + + int messageCnt = 10; + for (int i = 0; i < messageCnt; i++) { + producer.newMessage(txn).value(("Hello Txn - " + i).getBytes(UTF_8)).sendAsync(); + } + + @Cleanup + MultiTopicsConsumerImpl consumer = (MultiTopicsConsumerImpl) pulsarClient + .newConsumer() + .topic(TOPIC_OUTPUT) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscriptionName("test") + .subscribe(); + + Message message = consumer.receive(5, TimeUnit.SECONDS); + // Can't receive transaction messages before commit. + Assert.assertNull(message); + + txn.commit().get(); + + Thread.sleep(2000); + + int receiveCnt = 0; + for (int i = 0; i < messageCnt; i++) { + message = consumer.receive(2, TimeUnit.SECONDS); + Assert.assertNotNull(message); + receiveCnt ++; + } + Assert.assertEquals(messageCnt, receiveCnt); + log.info("receive transaction messages count: {}", receiveCnt); + } + +}