Skip to content

Commit

Permalink
[fix][txn]: fix transaction buffer recover throw cursor already close (
Browse files Browse the repository at this point in the history
…apache#14807)

### Motivation
When Transaction buffer recover fail throw CursorAlreadyClosedException, we should stop the recover op. the cursor was been closed, the transaction buffer was been closed, so we should stop the recover op, in order to release thread resources
like apache#14781
  • Loading branch information
congbobo184 authored Mar 25, 2022
1 parent 7a78b50 commit aef5f6d
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,8 @@ public Entry get() {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
if (recover.topic.getManagedLedger().getConfig().isAutoSkipNonRecoverableData()
&& exception instanceof ManagedLedgerException.NonRecoverableLedgerException
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException) {
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException
|| exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
isReadable = false;
} else {
outstandingReadsRequests.decrementAndGet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,18 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer2.getStats().state, "Ready"));
managedCursors.removeCursor("transaction-buffer-sub");

doAnswer(invocation -> {
AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1);
callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("test"), null);
return null;
}).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any());

managedCursors.add(managedCursor);
TransactionBuffer buffer3 = new TopicTransactionBuffer(persistentTopic);
Awaitility.await().atMost(30, TimeUnit.SECONDS).untilAsserted(() ->
assertEquals(buffer3.getStats().state, "Ready"));
managedCursors.removeCursor("transaction-buffer-sub");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
|| exception instanceof ManagedLedgerException.ManagedLedgerFencedException
|| exception instanceof ManagedLedgerException.CursorAlreadyClosedException) {
isReadable = false;
} else {
outstandingReadsRequests.decrementAndGet();
}
log.error("Transaction log init fail error!", exception);
}
Expand Down

0 comments on commit aef5f6d

Please sign in to comment.