Skip to content

Commit

Permalink
[Transaction] Transaction pending ack lazy init. (apache#11091)
Browse files Browse the repository at this point in the history
## Motivation
now, in `broker.conf` `transactionCoordinatorEnabled=true` MLPendingAck will init manageLedger, some ack will not use transaction, so don't need to init manageLedger. When this sub use transaction, we can lazy init `PendingAckHandle`.

## implement
When this sub use transaction, we can lazy init `PendingAckHandle`.
  • Loading branch information
congbobo184 authored Oct 9, 2021
1 parent e29f720 commit 8b50af5
Show file tree
Hide file tree
Showing 15 changed files with 343 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;

import org.apache.bookkeeper.common.annotation.InterfaceAudience;
import org.apache.bookkeeper.common.annotation.InterfaceStability;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,11 +499,11 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
public CompletableFuture<Void> transactionIndividualAcknowledge(
TxnID txnId,
List<MutablePair<PositionImpl, Integer>> positions) {
return pendingAckHandle.individualAcknowledgeMessage(txnId, positions);
return pendingAckHandle.individualAcknowledgeMessage(txnId, positions, false);
}

public CompletableFuture<Void> transactionCumulativeAcknowledge(TxnID txnId, List<PositionImpl> positions) {
return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions);
return pendingAckHandle.cumulativeAcknowledgeMessage(txnId, positions, false);
}

private final MarkDeleteCallback markDeleteCallback = new MarkDeleteCallback() {
Expand Down Expand Up @@ -1173,14 +1173,14 @@ public void processReplicatedSubscriptionSnapshot(ReplicatedSubscriptionsSnapsho
public CompletableFuture<Void> endTxn(long txnidMostBits, long txnidLeastBits, int txnAction, long lowWaterMark) {
TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
if (TxnAction.COMMIT.getValue() == txnAction) {
return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark);
return pendingAckHandle.commitTxn(txnID, Collections.emptyMap(), lowWaterMark, false);
} else if (TxnAction.ABORT.getValue() == txnAction) {
Consumer redeliverConsumer = null;
if (getDispatcher() instanceof PersistentDispatcherSingleActiveConsumer) {
redeliverConsumer = ((PersistentDispatcherSingleActiveConsumer)
getDispatcher()).getActiveConsumer();
}
return pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark);
return pendingAckHandle.abortTxn(txnID, redeliverConsumer, lowWaterMark, false);
} else {
return FutureUtil.failedFuture(new NotAllowedException("Unsupported txnAction " + txnAction));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,16 @@ public interface PendingAckHandle {
* Client will not send batch size to server, we get the batch size from consumer pending ack. When we get the Batch
* size, we can accurate batch ack of this position.
*
* @param txnID {@link TxnID}TransactionID of an ongoing transaction trying to sck message.
* @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message.
* @param positions {@link MutablePair} the pair of positions and these batch size.
* @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not.
* @return the future of this operation.
* @throws TransactionConflictException if the ack with transaction is conflict with pending ack.
* @throws NotAllowedException if Use this method incorrectly eg. not use
* PositionImpl or cumulative ack with a list of positions.
*/
CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePair<PositionImpl,
Integer>> positions);
Integer>> positions, boolean isInCacheRequest);

/**
* Acknowledge message(s) for an ongoing transaction.
Expand All @@ -73,14 +74,16 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
* If an ongoing transaction cumulative acked a message and then try to ack single message which is
* greater than that one it cumulative acked, it'll succeed.
*
* @param txnID {@link TxnID}TransactionID of an ongoing transaction trying to sck message.
* @param txnID {@link TxnID} TransactionID of an ongoing transaction trying to sck message.
* @param positions {@link MutablePair} the pair of positions and these batch size.
* @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not.
* @return the future of this operation.
* @throws TransactionConflictException if the ack with transaction is conflict with pending ack.
* @throws NotAllowedException if Use this method incorrectly eg. not use
* PositionImpl or cumulative ack with a list of positions.
*/
CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions);
CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions,
boolean isInCacheRequest);

/**
* Commit a transaction.
Expand All @@ -89,19 +92,22 @@ CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID, List<MutablePa
* @param properties Additional user-defined properties that can be
* associated with a particular cursor position.
* @param lowWaterMark the low water mark of this transaction
* @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not.
* @return the future of this operation.
*/
CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark);
CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties,
long lowWaterMark, boolean isInCacheRequest);

/**
* Abort a transaction.
*
* @param txnId {@link TxnID} to identify the transaction.
* @param consumer {@link Consumer} which aborting transaction.
* @param lowWaterMark the low water mark of this transaction
* @param isInCacheRequest {@link Boolean} the boolean of the request in cache whether or not.
* @return the future of this operation.
*/
CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark);
CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark, boolean isInCacheRequest);

/**
* Sync the position ack set, in order to clean up the cache of this position for pending ack handle.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,13 @@ static TransactionPendingAckStoreProvider newProvider(String providerClassName)
*/
CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription);

/**
* Check pending ack store has been initialized before.
*
* @param subscription {@link PersistentSubscription}
* @return a future represents the result of the operation.
* an instance of {@link Boolean} is returned
* if the operation succeeds.
*/
CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription);
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public class InMemoryPendingAckStoreProvider implements TransactionPendingAckSto
public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
return CompletableFuture.completedFuture(new InMemoryPendingAckStore());
}

@Override
public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription) {
return CompletableFuture.completedFuture(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@ public MLPendingAckReplyCallBack(PendingAckHandleImpl pendingAckHandle) {

@Override
public void replayComplete() {
log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());

if (pendingAckHandle.changeToReadyState()) {
pendingAckHandle.completeHandleFuture();
synchronized (pendingAckHandle) {
log.info("Topic name : [{}], SubName : [{}] pending ack state reply success!",
pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
} else {
log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());

if (pendingAckHandle.changeToReadyState()) {
pendingAckHandle.completeHandleFuture();
log.info("Topic name : [{}], SubName : [{}] pending ack handle cache request success!",
pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
} else {
log.error("Topic name : [{}], SubName : [{}] pending ack state reply fail!",
pendingAckHandle.getTopicName(), pendingAckHandle.getSubName());
}
}
pendingAckHandle.handleCacheRequest();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,13 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
});
return pendingAckStoreFuture;
}

@Override
public CompletableFuture<Boolean> checkInitializedBefore(PersistentSubscription subscription) {
PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
return originPersistentTopic.getBrokerService().getManagedLedgerFactory()
.asyncExists(TopicName.get(pendingAckTopicName).getPersistenceNamingEncoding());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,26 @@ public class PendingAckHandleDisabled implements PendingAckHandle {

@Override
public CompletableFuture<Void> individualAcknowledgeMessage(TxnID txnID,
List<MutablePair<PositionImpl, Integer>> positions) {
List<MutablePair<PositionImpl, Integer>> positions,
boolean isInCacheRequest) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions) {
public CompletableFuture<Void> cumulativeAcknowledgeMessage(TxnID txnID, List<PositionImpl> positions,
boolean isInCacheRequest) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark) {
public CompletableFuture<Void> commitTxn(TxnID txnID, Map<String, Long> properties, long lowWaterMark,
boolean isInCacheRequest) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

@Override
public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark) {
public CompletableFuture<Void> abortTxn(TxnID txnId, Consumer consumer, long lowWaterMark,
boolean isInCacheRequest) {
return FutureUtil.failedFuture(new NotAllowedException("The transaction is disabled"));
}

Expand Down
Loading

0 comments on commit 8b50af5

Please sign in to comment.