From a962137f530cd2d6c2315749270a1d2cae8b1cc2 Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Fri, 17 Dec 2021 22:28:29 +0800 Subject: [PATCH] [Transaction]stop TP replaying with Exception (#12700) ### Motivation When MLPendingAckStore replaying, if any ledger was deleted from bookkeeper, or ManagerLedger was fenced, MLPendingAckStore will not stop recovering and continue to report the exception. ### Modifications End replaying when there is no ledger to read or the managerLedger is fenced. ### Verifying this change Add a unit test. --- .../bookkeeper/mledger/ManagedCursor.java | 6 ++ .../mledger/impl/ManagedCursorImpl.java | 1 + .../impl/ManagedCursorContainerTest.java | 5 ++ .../pendingack/impl/MLPendingAckStore.java | 25 +++++--- .../broker/transaction/TransactionTest.java | 61 +++++++++++++++++++ .../impl/MLTransactionLogImpl.java | 2 +- 6 files changed, 89 insertions(+), 11 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index 72ee1a1f9c3ea..d1fb90a3ca60e 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -710,4 +710,10 @@ Set asyncReplayEntries( * @return if read position changed */ boolean checkAndUpdateReadPositionChanged(); + + /** + * Checks if the cursor is closed. + * @return whether this cursor is closed. + */ + public boolean isClosed(); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index d2c0e6f7fc514..0b445cbefd2e7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -837,6 +837,7 @@ private void checkForNewEntries(OpReadEntry op, ReadEntriesCallback callback, Ob } } + @Override public boolean isClosed() { return state == State.Closed || state == State.Closing; } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 57e1964a2c332..af30c9c372136 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -385,6 +385,11 @@ public List readEntriesOrWait(int maxEntries, long maxSizeBytes) public boolean checkAndUpdateReadPositionChanged() { return false; } + + @Override + public boolean isClosed() { + return false; + } } @Test diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index fb88878d8c826..1592318cf616a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -34,7 +34,6 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; @@ -303,13 +302,12 @@ class PendingAckReplay implements Runnable { @Override public void run() { try { - while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0) { - if (((ManagedCursorImpl) cursor).isClosed()) { - log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.", - cursor.getManagedLedger().getName()); - return; - } - fillEntryQueueCallback.fillQueue(); + if (cursor.isClosed()) { + log.warn("[{}] MLPendingAckStore cursor have been closed, close replay thread.", + cursor.getManagedLedger().getName()); + return; + } + while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) { Entry entry = entryQueue.poll(); if (entry != null) { ByteBuf buffer = entry.getDataBuffer(); @@ -361,15 +359,17 @@ public void run() { class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { + private volatile boolean isReadable = true; private final AtomicLong outstandingReadsRequests = new AtomicLong(0); - void fillQueue() { + boolean fillQueue() { if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) { if (cursor.hasMoreEntries()) { outstandingReadsRequests.incrementAndGet(); readAsync(100, this); } } + return isReadable; } @Override @@ -389,7 +389,12 @@ public Entry get() { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - log.error("MLPendingAckStore stat reply fail!", exception); + if (managedLedger.getConfig().isAutoSkipNonRecoverableData() + && exception instanceof ManagedLedgerException.NonRecoverableLedgerException + || exception instanceof ManagedLedgerException.ManagedLedgerFencedException) { + isReadable = false; + } + log.error("MLPendingAckStore of topic [{}] stat reply fail!", managedLedger.getName(), exception); outstandingReadsRequests.decrementAndGet(); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 5d5157347071f..0010a947e23d5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -50,6 +50,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; +import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; @@ -62,8 +63,10 @@ import org.apache.pulsar.broker.transaction.buffer.impl.TopicTransactionBufferState; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; import org.apache.pulsar.broker.transaction.buffer.matadata.TransactionBufferSnapshot; +import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; +import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -78,6 +81,7 @@ import org.apache.pulsar.client.api.transaction.Transaction; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.client.impl.MessageIdImpl; +import org.apache.pulsar.common.api.proto.CommandSubscribe; import org.apache.pulsar.client.impl.transaction.TransactionImpl; import org.apache.pulsar.common.events.EventsTopicNames; import org.apache.pulsar.common.naming.NamespaceName; @@ -551,6 +555,63 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ assertEquals(buffer2.getStats().state, "Ready")); } + @Test + public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{ + String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable"; + admin.topics().createNonPartitionedTopic(topic); + @Cleanup + Producer producer = pulsarClient.newProducer(Schema.STRING) + .producerName("test") + .enableBatching(false) + .sendTimeout(0, TimeUnit.SECONDS) + .topic(topic) + .create(); + producer.newMessage().send(); + + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic(topic, false).get().get(); + persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData(true); + PersistentSubscription persistentSubscription = (PersistentSubscription) persistentTopic + .createSubscription("test", + CommandSubscribe.InitialPosition.Earliest, false).get(); + + ManagedCursorImpl managedCursor = mock(ManagedCursorImpl.class); + doReturn(true).when(managedCursor).hasMoreEntries(); + doReturn(false).when(managedCursor).isClosed(); + doReturn(new PositionImpl(-1, -1)).when(managedCursor).getMarkDeletedPosition(); + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.NonRecoverableLedgerException("No ledger exist"), + null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + + TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class); + doReturn(CompletableFuture.completedFuture( + new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null))) + .when(pendingAckStoreProvider).newPendingAckStore(any()); + doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any()); + + Class pulsarServiceClass = PulsarService.class; + Field field = pulsarServiceClass.getDeclaredField("transactionPendingAckStoreProvider"); + field.setAccessible(true); + field.set(getPulsarServiceList().get(0), pendingAckStoreProvider); + + PendingAckHandleImpl pendingAckHandle1 = new PendingAckHandleImpl(persistentSubscription); + Awaitility.await().untilAsserted(() -> + assertEquals(pendingAckHandle1.getStats().state, "Ready")); + + doAnswer(invocation -> { + AsyncCallbacks.ReadEntriesCallback callback = invocation.getArgument(1); + callback.readEntriesFailed(new ManagedLedgerException.ManagedLedgerFencedException(), null); + return null; + }).when(managedCursor).asyncReadEntries(anyInt(), any(), any(), any()); + + PendingAckHandleImpl pendingAckHandle2 = new PendingAckHandleImpl(persistentSubscription); + Awaitility.await().untilAsserted(() -> + assertEquals(pendingAckHandle2.getStats().state, "Ready")); + } + @Test public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{ String topic = NAMESPACE1 + "/testEndTBRecoveringWhenManagerLedgerDisReadable"; diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java index e154bb840d7c6..8bf2ebf4b911a 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImpl.java @@ -240,7 +240,7 @@ public void start() { class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { private final AtomicLong outstandingReadsRequests = new AtomicLong(0); - private boolean isReadable = true; + private volatile boolean isReadable = true; boolean fillQueue() { if (entryQueue.size() < entryQueue.capacity() && outstandingReadsRequests.get() == 0) {