Skip to content

Commit

Permalink
fix no response to client when handleSubscribe because PendingAckHand…
Browse files Browse the repository at this point in the history
…leImpl init fail (apache#13655)

Fixes apache#13654 

### Modifications

When the initialization of `PendingAckHandleImpl` fails, `pendingAckHandleCompletableFuture` will not be exception or complete, then `org.apache.pulsar.broker.service.persistent.PersistentSubscription#addConsumer` will not return any response to the client.

```
public CompletableFuture<Void> addConsumer(Consumer consumer) {
        return pendingAckHandle.pendingAckHandleFuture().thenCompose(future -> ...)
}
```
  • Loading branch information
wenbingshen authored Jan 14, 2022
1 parent a9f810c commit 528c972
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,10 @@ public interface PendingAckReplyCallBack {
* @param pendingAckMetadataEntry {@link PendingAckMetadataEntry} the metadata entry of pending ack
*/
void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry);

/**
* Pending ack replay failed callback for pending ack store.
*/
void replayFailed(Throwable t);

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ public void replayComplete() {
pendingAckHandle.handleCacheRequest();
}

@Override
public void replayFailed(Throwable t) {
synchronized (pendingAckHandle) {
pendingAckHandle.exceptionHandleFuture(t);
}
}

@Override
public void handleMetadataEntry(PendingAckMetadataEntry pendingAckMetadataEntry) {
TxnID txnID = new TxnID(pendingAckMetadataEntry.getTxnidMostBits(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,8 @@ class PendingAckReplay implements Runnable {
public void run() {
try {
if (cursor.isClosed()) {
pendingAckReplyCallBack.replayFailed(new ManagedLedgerException
.CursorAlreadyClosedException("MLPendingAckStore cursor have been closed."));
log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.",
cursor.getManagedLedger().getName());
return;
Expand Down Expand Up @@ -350,6 +352,7 @@ public void run() {
}
}
} catch (Exception e) {
pendingAckReplyCallBack.replayFailed(e);
log.error("[{}] Pending ack recover fail!", subManagedCursor.getManagedLedger().getName(), e);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ private void initPendingAckStore() {
}).exceptionally(e -> {
acceptQueue.clear();
changeToErrorState();
exceptionHandleFuture(e.getCause());
log.error("PendingAckHandleImpl init fail! TopicName : {}, SubName: {}", topicName, subName, e);
return null;
});
Expand Down Expand Up @@ -889,6 +890,12 @@ public synchronized void completeHandleFuture() {
}
}

public synchronized void exceptionHandleFuture(Throwable t) {
if (!this.pendingAckHandleCompletableFuture.isDone()) {
this.pendingAckHandleCompletableFuture.completeExceptionally(t);
}
}

@Override
public TransactionInPendingAckStats getTransactionInPendingAckStats(TxnID txnID) {
TransactionInPendingAckStats transactionInPendingAckStats = new TransactionInPendingAckStats();
Expand Down

0 comments on commit 528c972

Please sign in to comment.