From 8b50af517255c314c773c6fe0c2530e09fafa312 Mon Sep 17 00:00:00 2001 From: congbo <39078850+congbobo184@users.noreply.github.com> Date: Sat, 9 Oct 2021 11:25:09 +0800 Subject: [PATCH] [Transaction] Transaction pending ack lazy init. (#11091) ## Motivation now, in `broker.conf` `transactionCoordinatorEnabled=true` MLPendingAck will init manageLedger, some ack will not use transaction, so don't need to init manageLedger. When this sub use transaction, we can lazy init `PendingAckHandle`. ## implement When this sub use transaction, we can lazy init `PendingAckHandle`. --- .../mledger/ManagedLedgerFactory.java | 1 - .../persistent/PersistentSubscription.java | 8 +- .../pendingack/PendingAckHandle.java | 18 +- .../TransactionPendingAckStoreProvider.java | 9 + .../impl/InMemoryPendingAckStoreProvider.java | 5 + .../impl/MLPendingAckReplyCallBack.java | 19 +- .../impl/MLPendingAckStoreProvider.java | 9 + .../impl/PendingAckHandleDisabled.java | 12 +- .../pendingack/impl/PendingAckHandleImpl.java | 245 ++++++++++++++++-- .../impl/PendingAckHandleState.java | 17 +- .../admin/v3/AdminApiTransactionTest.java | 30 ++- .../PersistentSubscriptionTest.java | 5 + .../broker/stats/TransactionMetricsTest.java | 28 +- .../TopicTransactionBufferRecoverTest.java | 2 +- .../buffer/TransactionBufferClientTest.java | 8 - 15 files changed, 343 insertions(+), 73 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index 40fdfd723c39d..682ce9008f10b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -20,7 +20,6 @@ import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; - import org.apache.bookkeeper.common.annotation.InterfaceAudience; import org.apache.bookkeeper.common.annotation.InterfaceStability; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 212f61a68a5ec..acfd9ee192cb5 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -499,11 +499,11 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) { public CompletableFuture transactionIndividualAcknowledge( TxnID txnId, List> positions) { - return pendingAckHandle.individualAcknowledgeMessage(txnId, positions); + return pendingAckHandle.individualAcknowledgeMessage(txnId, positions, false); } public CompletableFuture transactionCumulativeAcknowledge(TxnID txnId, List positions) { - return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions); + return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions, false); } private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() { @@ -1173,14 +1173,14 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho public CompletableFuture endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark) { TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); if (TxnAction.COMMIT.getValue() == txnAction) { - return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark); + return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark, false); } else if (TxnAction.ABORT.getValue() == txnAction) { Consumer redeliverConsumer = null; if (getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) { redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer) getDispatcher()).getActiveConsumer(); } - return pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark); + return pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark, false); } else { return FutureUtil.failedFuture(new NotAllowedException("Unsupported txnAction " + txnAction)); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java index c65e51d86dfc9..3664c5d046f6d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckHandle.java @@ -48,15 +48,16 @@ public interface PendingAckHandle { * Client will not send batch size to server, we get the batch size from consumer pending ack. When we get the Batch * size, we can accurate batch ack of this position. * - * @param txnID {@link TxnID}TransactionID of an ongoing transaction trying to sck message. + * @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message. * @param positions {@link MutablePair} the pair of positions and these batch size. + * @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not. * @return the future of this operation. * @throws TransactionConflictException if the ack with transaction is conflict with pending ack. * @throws NotAllowedException if Use this method incorrectly eg. not use * PositionImpl or cumulative ack with a list of positions. */ CompletableFuture individualAcknowledgeMessage(TxnID txnID, List> positions); + Integer>> positions, boolean isInCacheRequest); /** * Acknowledge message(s) for an ongoing transaction. @@ -73,14 +74,16 @@ CompletableFuture individualAcknowledgeMessage(TxnID txnID, List cumulativeAcknowledgeMessage(TxnID txnID, List positions); + CompletableFuture cumulativeAcknowledgeMessage(TxnID txnID, List positions, + boolean isInCacheRequest); /** * Commit a transaction. @@ -89,9 +92,11 @@ CompletableFuture individualAcknowledgeMessage(TxnID txnID, List commitTxn(TxnID txnID, Map properties, long lowWaterMark); + CompletableFuture commitTxn(TxnID txnID, Map properties, + long lowWaterMark, boolean isInCacheRequest); /** * Abort a transaction. @@ -99,9 +104,10 @@ CompletableFuture individualAcknowledgeMessage(TxnID txnID, List abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark); + CompletableFuture abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark, boolean isInCacheRequest); /** * Sync the position ack set, in order to clean up the cache of this position for pending ack handle. diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java index 408be115a026a..27c8d20e0b3c7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/TransactionPendingAckStoreProvider.java @@ -59,4 +59,13 @@ static TransactionPendingAckStoreProvider newProvider(String providerClassName) */ CompletableFuture newPendingAckStore(PersistentSubscription subscription); + /** + * Check pending ack store has been initialized before. + * + * @param subscription {@link PersistentSubscription} + * @return a future represents the result of the operation. + * an instance of {@link Boolean} is returned + * if the operation succeeds. + */ + CompletableFuture checkInitializedBefore(PersistentSubscription subscription); } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStoreProvider.java index 1958236c05e53..bd7e31f80316d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/InMemoryPendingAckStoreProvider.java @@ -29,4 +29,9 @@ public class InMemoryPendingAckStoreProvider implements TransactionPendingAckSto public CompletableFuture newPendingAckStore(PersistentSubscription subscription) { return CompletableFuture.completedFuture(new InMemoryPendingAckStore()); } + + @Override + public CompletableFuture checkInitializedBefore(PersistentSubscription subscription) { + return CompletableFuture.completedFuture(true); + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java index dd7989d6da2ce..6bcc344ddf392 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckReplyCallBack.java @@ -44,17 +44,20 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl pendingAckHandle) { @Override public void replayComplete() { - log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!", - pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); - - if (pendingAckHandle.changeToReadyState()) { - pendingAckHandle.completeHandleFuture(); + synchronized (pendingAckHandle) { log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!", pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); - } else { - log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!", - pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); + + if (pendingAckHandle.changeToReadyState()) { + pendingAckHandle.completeHandleFuture(); + log.info("Topic name : [{}], SubName : [{}] pending ack handle cache request success!", + pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); + } else { + log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!", + pendingAckHandle.getTopicName(), pendingAckHandle.getSubName()); + } } + pendingAckHandle.handleCacheRequest(); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index 4dbee1922cdea..548dd8a434a90 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -112,4 +112,13 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { }); return pendingAckStoreFuture; } + + @Override + public CompletableFuture checkInitializedBefore(PersistentSubscription subscription) { + PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic(); + String pendingAckTopicName = MLPendingAckStore + .getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName()); + return originPersistentTopic.getBrokerService().getManagedLedgerFactory() + .asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding()); + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java index 1aecaf470eba1..cf6b5c82366a7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleDisabled.java @@ -42,22 +42,26 @@ public class PendingAckHandleDisabled implements PendingAckHandle { @Override public CompletableFuture individualAcknowledgeMessage(TxnID txnID, - List> positions) { + List> positions, + boolean isInCacheRequest) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } @Override - public CompletableFuture cumulativeAcknowledgeMessage(TxnID txnID, List positions) { + public CompletableFuture cumulativeAcknowledgeMessage(TxnID txnID, List positions, + boolean isInCacheRequest) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } @Override - public CompletableFuture commitTxn(TxnID txnID, Map properties, long lowWaterMark) { + public CompletableFuture commitTxn(TxnID txnID, Map properties, long lowWaterMark, + boolean isInCacheRequest) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } @Override - public CompletableFuture abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) { + public CompletableFuture abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark, + boolean isInCacheRequest) { return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled")); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java index bf6e5748883d2..78bab966b92e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleImpl.java @@ -26,8 +26,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.LinkedBlockingDeque; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.Position; @@ -101,36 +103,92 @@ public class PendingAckHandleImpl extends PendingAckHandleState implements Pendi private final PersistentSubscription persistentSubscription; - private final CompletableFuture pendingAckStoreFuture; + private CompletableFuture pendingAckStoreFuture; private final CompletableFuture pendingAckHandleCompletableFuture = new CompletableFuture<>(); + private final TransactionPendingAckStoreProvider pendingAckStoreProvider; + + private final BlockingQueue acceptQueue = new LinkedBlockingDeque<>(); + public PendingAckHandleImpl(PersistentSubscription persistentSubscription) { super(State.None); this.topicName = persistentSubscription.getTopicName(); this.subName = persistentSubscription.getName(); this.persistentSubscription = persistentSubscription; - TransactionPendingAckStoreProvider pendingAckStoreProvider = - ((PersistentTopic) this.persistentSubscription.getTopic()) + this.pendingAckStoreProvider = ((PersistentTopic) this.persistentSubscription.getTopic()) .getBrokerService().getPulsar().getTransactionPendingAckStoreProvider(); - this.pendingAckStoreFuture = - pendingAckStoreProvider.newPendingAckStore(persistentSubscription); - - this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { - changeToInitializingState(); - pendingAckStore.replayAsync(this, - ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService() - .getPulsar().getTransactionReplayExecutor()); - }).exceptionally(e -> { - log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); - return null; + pendingAckStoreProvider.checkInitializedBefore(persistentSubscription).thenAccept(init -> { + if (init) { + initPendingAckStore(); + } else { + completeHandleFuture(); + } }); } + private void initPendingAckStore() { + if (changeToInitializingState()) { + synchronized (PendingAckHandleImpl.this) { + if (!checkIfClose()) { + this.pendingAckStoreFuture = + pendingAckStoreProvider.newPendingAckStore(persistentSubscription); + this.pendingAckStoreFuture.thenAccept(pendingAckStore -> { + pendingAckStore.replayAsync(this, + ((PersistentTopic) persistentSubscription.getTopic()).getBrokerService() + .getPulsar().getTransactionReplayExecutor()); + }).exceptionally(e -> { + acceptQueue.clear(); + changeToErrorState(); + log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e); + return null; + }); + } + } + } + } + + private void addIndividualAcknowledgeMessageRequest(TxnID txnID, + List> positions, + CompletableFuture completableFuture) { + acceptQueue.add(() -> individualAcknowledgeMessage(txnID, positions, true).thenAccept(v -> + completableFuture.complete(null)).exceptionally(e -> { + completableFuture.completeExceptionally(e); + return null; + })); + } + @Override public CompletableFuture individualAcknowledgeMessage(TxnID txnID, - List> positions) { + List> positions, + boolean isInCacheRequest) { + + if (!checkIfReady()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + synchronized (PendingAckHandleImpl.this) { + switch (state) { + case Initializing: + addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); + return completableFuture; + case None: + addIndividualAcknowledgeMessageRequest(txnID, positions, completableFuture); + initPendingAckStore(); + return completableFuture; + case Error: + completableFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); + return completableFuture; + case Close: + completableFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); + return completableFuture; + default: + break; + } + } + } + if (txnID == null) { return FutureUtil.failedFuture(new NotAllowedException("TransactionID can not be null.")); } @@ -233,9 +291,45 @@ && isAckSetOverlap(individualAckPositions return completableFuture; } + private void addCumulativeAcknowledgeMessageRequest(TxnID txnID, + List positions, + CompletableFuture completableFuture) { + acceptQueue.add(() -> cumulativeAcknowledgeMessage(txnID, positions, true).thenAccept(v -> + completableFuture.complete(null)).exceptionally(e -> { + completableFuture.completeExceptionally(e); + return null; + })); + } + @Override public CompletableFuture cumulativeAcknowledgeMessage(TxnID txnID, - List positions) { + List positions, + boolean isInCacheRequest) { + if (!checkIfReady()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + synchronized (PendingAckHandleImpl.this) { + switch (state) { + case Initializing: + addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + return completableFuture; + case None: + addCumulativeAcknowledgeMessageRequest(txnID, positions, completableFuture); + initPendingAckStore(); + return completableFuture; + case Error: + completableFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); + return completableFuture; + case Close: + completableFuture.completeExceptionally( + new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); + return completableFuture; + default: + break; + + } + } + } if (txnID == null) { return FutureUtil.failedFuture(new NotAllowedException("TransactionID can not be null.")); @@ -300,11 +394,52 @@ public CompletableFuture cumulativeAcknowledgeMessage(TxnID txnID, return completableFuture; } + private void addCommitTxnRequest(TxnID txnId, Map properties, long lowWaterMark, + CompletableFuture completableFuture) { + acceptQueue.add(() -> commitTxn(txnId, properties, lowWaterMark, true).thenAccept(v -> + completableFuture.complete(null)).exceptionally(e -> { + completableFuture.completeExceptionally(e); + return null; + })); + } + @Override public synchronized CompletableFuture commitTxn(TxnID txnID, Map properties, - long lowWaterMark) { + long lowWaterMark, boolean isInCacheRequest) { if (!checkIfReady()) { - return FutureUtil.failedFuture(new ServiceUnitNotReadyException("PendingAckHandle not replay complete!")); + synchronized (PendingAckHandleImpl.this) { + if (state == State.Initializing) { + CompletableFuture completableFuture = new CompletableFuture<>(); + addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); + return completableFuture; + } else if (state == State.None) { + CompletableFuture completableFuture = new CompletableFuture<>(); + addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); + initPendingAckStore(); + return completableFuture; + } else if (checkIfReady()) { + + } else { + if (state == State.Error) { + return FutureUtil.failedFuture( + new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); + } else { + return FutureUtil.failedFuture( + new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); + } + + } + } + } + + if (!acceptQueue.isEmpty() && !isInCacheRequest) { + synchronized (PendingAckHandleImpl.this) { + if (!acceptQueue.isEmpty()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + addCommitTxnRequest(txnID, properties, lowWaterMark, completableFuture); + return completableFuture; + } + } } CompletableFuture commitFuture = new CompletableFuture<>(); @@ -367,11 +502,54 @@ public synchronized CompletableFuture commitTxn(TxnID txnID, Map completableFuture) { + acceptQueue.add(() -> abortTxn(txnId, consumer, lowWaterMark, true).thenAccept(v -> + completableFuture.complete(null)).exceptionally(e -> { + completableFuture.completeExceptionally(e); + return null; + })); + } + @Override - public synchronized CompletableFuture abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) { + public synchronized CompletableFuture abortTxn(TxnID txnId, Consumer consumer, + long lowWaterMark, boolean isInCacheRequest) { if (!checkIfReady()) { - return FutureUtil.failedFuture(new ServiceUnitNotReadyException("PendingAckHandle not replay complete!")); + synchronized (PendingAckHandleImpl.this) { + if (state == State.Initializing) { + CompletableFuture completableFuture = new CompletableFuture<>(); + addAbortTxnRequest(txnId, consumer, lowWaterMark, completableFuture); + return completableFuture; + } else if (state == State.None) { + CompletableFuture completableFuture = new CompletableFuture<>(); + addAbortTxnRequest(txnId, consumer, lowWaterMark, completableFuture); + initPendingAckStore(); + return completableFuture; + } else if (checkIfReady()) { + + } else { + if (state == State.Error) { + return FutureUtil.failedFuture( + new ServiceUnitNotReadyException("PendingAckHandle not replay error!")); + } else { + return FutureUtil.failedFuture( + new ServiceUnitNotReadyException("PendingAckHandle have been closed!")); + } + } + } } + + + if (!acceptQueue.isEmpty() && !isInCacheRequest) { + synchronized (PendingAckHandleImpl.this) { + if (!acceptQueue.isEmpty()) { + CompletableFuture completableFuture = new CompletableFuture<>(); + addAbortTxnRequest(txnId, consumer, lowWaterMark, completableFuture); + return completableFuture; + } + } + } + CompletableFuture abortFuture = new CompletableFuture<>(); if (this.cumulativeAckOfTransaction != null) { pendingAckStoreFuture.thenAccept(pendingAckStore -> @@ -705,8 +883,10 @@ public TransactionPendingAckStats getStats() { return transactionPendingAckStats; } - public void completeHandleFuture() { - this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this); + public synchronized void completeHandleFuture() { + if (!this.pendingAckHandleCompletableFuture.isDone()) { + this.pendingAckHandleCompletableFuture.complete(PendingAckHandleImpl.this); + } } @Override @@ -732,7 +912,14 @@ public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) @Override public CompletableFuture close() { - return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync); + changeToCloseState(); + synchronized (PendingAckHandleImpl.this) { + if (this.pendingAckStoreFuture != null) { + return this.pendingAckStoreFuture.thenAccept(PendingAckStore::closeAsync); + } else { + return CompletableFuture.completedFuture(null); + } + } } public CompletableFuture getStoreManageLedger() { @@ -749,4 +936,16 @@ public CompletableFuture getStoreManageLedger() { return FutureUtil.failedFuture(new ServiceUnitNotReadyException("Pending ack have not init success!")); } } + + protected void handleCacheRequest() { + while (true) { + Runnable runnable = acceptQueue.poll(); + + if (runnable != null) { + runnable.run(); + } else { + break; + } + } + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleState.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleState.java index 352962c33fa84..278f9ff106420 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleState.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/PendingAckHandleState.java @@ -32,6 +32,7 @@ public enum State { None, Initializing, Ready, + Error, Close } @@ -39,7 +40,7 @@ public enum State { AtomicReferenceFieldUpdater.newUpdater(PendingAckHandleState.class, State.class, "state"); @SuppressWarnings("unused") - private volatile State state = null; + protected volatile State state = null; public PendingAckHandleState(State state) { STATE_UPDATER.set(this, state); @@ -54,16 +55,22 @@ protected boolean changeToInitializingState() { return STATE_UPDATER.compareAndSet(this, State.None, State.Initializing); } - protected boolean changeToCloseState() { - return (STATE_UPDATER.compareAndSet(this, State.Ready, State.Close) - || STATE_UPDATER.compareAndSet(this, State.None, State.Close) - || STATE_UPDATER.compareAndSet(this, State.Initializing, State.Close)); + protected void changeToCloseState() { + STATE_UPDATER.set(this, State.Close); + } + + protected void changeToErrorState() { + STATE_UPDATER.set(this, State.Error); } public boolean checkIfReady() { return STATE_UPDATER.get(this) == State.Ready; } + public boolean checkIfClose() { + return STATE_UPDATER.get(this) == State.Close; + } + public State getState() { return STATE_UPDATER.get(this); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 6f9a1890626d8..93a2d0e188e17 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -53,6 +53,7 @@ import org.awaitility.Awaitility; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -276,18 +277,39 @@ public void testGetTransactionBufferStats() throws Exception { assertTrue(transactionBufferStats.lastSnapshotTimestamps > currentTime); } - @Test(timeOut = 20000) - public void testGetPendingAckStats() throws Exception { + @DataProvider(name = "ackType") + public static Object[] ackType() { + return new Object[] { "cumulative", "individual"}; + } + + @Test(timeOut = 20000, dataProvider = "ackType") + public void testGetPendingAckStats(String ackType) throws Exception { initTransaction(2); final String topic = "persistent://public/default/testGetPendingAckStats"; final String subName = "test1"; admin.topics().createNonPartitionedTopic(topic); - pulsarClient.newConsumer(Schema.BYTES).topic(topic) - .subscriptionName(subName).subscribe(); + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .sendTimeout(0, TimeUnit.SECONDS).topic(topic).create(); + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES).topic(topic) + .subscriptionName(subName).subscribe(); TransactionPendingAckStats transactionPendingAckStats = admin.transactions(). getPendingAckStatsAsync(topic, subName).get(); + assertEquals(transactionPendingAckStats.state, "None"); + + producer.newMessage().value("Hello pulsar!".getBytes()).send(); + + TransactionImpl transaction = (TransactionImpl) getTransaction(); + if (ackType.equals("individual")) { + consumer.acknowledgeAsync(consumer.receive().getMessageId(), transaction); + } else { + consumer.acknowledgeCumulativeAsync(consumer.receive().getMessageId(), transaction); + } + transaction.commit().get(); + + transactionPendingAckStats = admin.transactions(). + getPendingAckStatsAsync(topic, subName).get(); assertEquals(transactionPendingAckStats.state, "Ready"); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java index c6859d73f1406..8be9b81a1c28c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentSubscriptionTest.java @@ -163,6 +163,11 @@ public CompletableFuture appendAbortMark(TxnID txnID, AckType ackType) { } }); } + + @Override + public CompletableFuture checkInitializedBefore(PersistentSubscription subscription) { + return CompletableFuture.completedFuture(true); + } }).when(pulsarMock).getTransactionPendingAckStoreProvider(); doReturn(svcConfig).when(pulsarMock).getConfiguration(); doReturn(mock(Compactor.class)).when(pulsarMock).getCompactor(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java index a209d897dd9b1..cb8e4305ddf1e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/TransactionMetricsTest.java @@ -30,6 +30,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.api.proto.TxnAction; import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; @@ -66,6 +68,7 @@ protected void setup() throws Exception { .allowedClusters(Sets.newHashSet("test")) .build()); admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString()); + admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); } @AfterMethod(alwaysRun = true) @@ -77,7 +80,6 @@ protected void cleanup() throws Exception { @Test public void testTransactionCoordinatorMetrics() throws Exception{ long timeout = 10000; - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 2); admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); TransactionCoordinatorID transactionCoordinatorIDTwo = TransactionCoordinatorID.get(1); @@ -109,27 +111,36 @@ public void testTransactionCoordinatorMetrics() throws Exception{ @Test public void testTransactionCoordinatorRateMetrics() throws Exception{ - long timeout = 10000; int txnCount = 120; String ns1 = "prop/ns-abc1"; admin.namespaces().createNamespace(ns1); String topic = "persistent://" + ns1 + "/test_coordinator_metrics"; String subName = "test_coordinator_metrics"; - - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); - admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); - TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(1); + TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); + admin.lookups().lookupPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne); admin.topics().createNonPartitionedTopic(topic); admin.topics().createSubscription(topic, subName, MessageId.earliest); Awaitility.await().atMost(2000, TimeUnit.MILLISECONDS).until(() -> pulsar.getTransactionMetadataStoreService().getStores().size() == 1); + + Consumer consumer = pulsarClient.newConsumer() + .subscriptionName(subName).topic(topic).subscribe(); + List list = new ArrayList<>(); + pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build(); for (int i = 0; i < txnCount; i++) { - TxnID txnID = pulsar.getTransactionMetadataStoreService().getStores() - .get(transactionCoordinatorIDOne).newTransaction(timeout).get(); + TransactionImpl transaction = + (TransactionImpl) pulsarClient.newTransaction() + .withTransactionTimeout(10, TimeUnit.SECONDS).build().get(); + TxnID txnID = new TxnID(transaction.getTxnIdMostBits(), transaction.getTxnIdLeastBits()); list.add(txnID); + if (i == 1) { + pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).enableTransaction(true).build(); + consumer.acknowledgeAsync(new MessageIdImpl(1000, 1000, -1), transaction).get(); + continue; + } if (i % 2 == 0) { pulsar.getTransactionMetadataStoreService().addProducedPartitionToTxn(list.get(i), Collections.singletonList(topic)).get(); @@ -206,7 +217,6 @@ public void testManagedLedgerMetrics() throws Exception{ String topic = "persistent://" + ns1 + "/test_managed_ledger_metrics"; String subName = "test_managed_ledger_metrics"; admin.topics().createNonPartitionedTopic(topic); - admin.topics().createPartitionedTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString(), 1); admin.lookups().lookupTopic(TopicName.TRANSACTION_COORDINATOR_ASSIGN.toString()); TransactionCoordinatorID transactionCoordinatorIDOne = TransactionCoordinatorID.get(0); pulsar.getTransactionMetadataStoreService().handleTcClientConnect(transactionCoordinatorIDOne).get(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java index 956b86e251d21..b16dace98a81e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TopicTransactionBufferRecoverTest.java @@ -186,7 +186,7 @@ private void recoverTest(String testTopic) throws Exception { // can't receive message message = consumer.receive(2, TimeUnit.SECONDS); assertNull(message); - admin.topics().unload(RECOVER_COMMIT); + admin.topics().unload(testTopic); Awaitility.await().until(() -> { for (int i = 0; i < getPulsarServiceList().size(); i++) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java index 08de910ad4b2e..0082d81f5fe41 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/buffer/TransactionBufferClientTest.java @@ -84,9 +84,6 @@ protected void setup() throws Exception { new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet(CLUSTER_NAME))); admin.namespaces().createNamespace(namespace, 10); admin.topics().createPartitionedTopic(partitionedTopicName.getPartitionedTopicName(), partitions); - pulsarClient.newConsumer() - .topic(partitionedTopicName.getPartitionedTopicName()) - .subscriptionName("test").subscribe(); tbClient = TransactionBufferClientImpl.create(pulsarClient, new HashedWheelTimer(new DefaultThreadFactory("transaction-buffer"))); } @@ -241,9 +238,6 @@ public void testTransactionBufferLookUp() throws Exception { admin.topics().createNonPartitionedTopic(commitTopic); admin.topics().createSubscription(commitTopic, subName, MessageId.earliest); - waitPendingAckInit(abortTopic, subName); - waitPendingAckInit(commitTopic, subName); - tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get(); tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get(); @@ -276,8 +270,6 @@ public void testTransactionBufferHandlerSemaphore() throws Exception { admin.topics().createNonPartitionedTopic(commitTopic); admin.topics().createSubscription(commitTopic, subName, MessageId.earliest); - waitPendingAckInit(abortTopic, subName); - waitPendingAckInit(commitTopic, subName); tbClient.abortTxnOnSubscription(abortTopic, "test", 1L, 1L, -1L).get(); tbClient.commitTxnOnSubscription(commitTopic, "test", 1L, 1L, -1L).get();