From a337e0482e4f20a3b2b0ff21cf614741ce295777 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 8 Mar 2019 03:05:35 -0800 Subject: [PATCH] Set the dedup cursor as "inactive" after recovery (#3612) --- .../bookkeeper/mledger/ManagedCursor.java | 6 +++++ .../mledger/impl/ManagedCursorImpl.java | 12 +++++++++- .../mledger/impl/ManagedLedgerImpl.java | 12 +++++----- .../impl/ManagedCursorContainerTest.java | 4 ++++ .../mledger/impl/ManagedCursorTest.java | 15 +++++++++++++ .../admin/impl/PersistentTopicsBase.java | 6 +++-- .../persistent/CompactorSubscription.java | 3 +++ .../persistent/MessageDeduplication.java | 22 +------------------ 8 files changed, 50 insertions(+), 30 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java index a1187b82556d6..e1bcec7feee23 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java @@ -518,6 +518,12 @@ Set asyncReplayEntries( */ void setInactive(); + /** + * A cursor that is set as always-inactive will never trigger the caching of + * entries. + */ + void setAlwaysInactive(); + /** * Checks if cursor is active or not. * diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 1d5fe670cd398..86de8096786e6 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -132,6 +132,8 @@ public class ManagedCursorImpl implements ManagedCursor { private RateLimiter markDeleteLimiter; + private boolean alwaysInactive = false; + class MarkDeleteEntry { final PositionImpl newPosition; final MarkDeleteCallback callback; @@ -785,7 +787,9 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate @Override public void setActive() { - ledger.activateCursor(this); + if (!alwaysInactive) { + ledger.activateCursor(this); + } } @Override @@ -798,6 +802,12 @@ public void setInactive() { ledger.deactivateCursor(this); } + @Override + public void setAlwaysInactive() { + setInactive(); + this.alwaysInactive = true; + } + @Override public Position getFirstPosition() { Long firstLedgerId = ledger.getLedgersInfo().firstKey(); diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index e1376f9b63d0f..fa82bdfd017c2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -169,7 +169,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { final Map> uninitializedCursors; final EntryCache entryCache; - + private ScheduledFuture timeoutTask; /** @@ -334,7 +334,7 @@ public void operationFailed(MetaStoreException e) { } } }); - + scheduleTimeoutTask(); } @@ -1174,7 +1174,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c closeAllCursors(callback, ctx); }, null); - + if (this.timeoutTask != null) { this.timeoutTask.cancel(false); } @@ -1698,7 +1698,7 @@ private boolean checkCallbackCompleted(Object ctx) { // if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and // assigned to different request boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount; - // consider callback is completed if: Callback is already recycled or read-complete flag is true + // consider callback is completed if: Callback is already recycled or read-complete flag is true return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE); } @@ -3010,7 +3010,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) { /** * Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback * with TimeoutException. - * + * * @param bookKeeper * @param config * @param digestType @@ -3038,7 +3038,7 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf /** * check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger - * + * * @param rc * @param lh * @param ctx diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java index 3e404d9be0900..458ca4c0f8270 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainerTest.java @@ -222,6 +222,10 @@ public Position getFirstPosition() { return null; } + @Override + public void setAlwaysInactive() { + } + @Override public List replayEntries(Set positions) throws InterruptedException, ManagedLedgerException { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 5b64468be6e74..ef9bfb01077a8 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2831,5 +2831,20 @@ public void operationFailed(ManagedLedgerException exception) { }); } + @Test + void testAlwaysInactive() throws Exception { + ManagedLedger ml = factory.open("testAlwaysInactive"); + ManagedCursor cursor = ml.openCursor("c1"); + + assertTrue(cursor.isActive()); + + cursor.setAlwaysInactive(); + + assertFalse(cursor.isActive()); + + cursor.setActive(); + assertFalse(cursor.isActive()); + } + private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 2a117ec1df5db..2890b0d63cf39 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -400,7 +400,7 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori protected void internalCreateNonPartitionedTopic(boolean authoritative) { validateAdminAccessForTenant(topicName.getTenant()); - + try { getOrCreateTopic(topicName); log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName); @@ -705,7 +705,7 @@ protected PartitionedTopicInternalStats internalGetPartitionedStatsInternal(bool } return stats; } - + protected void internalDeleteSubscription(String subName, boolean authoritative) { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); @@ -967,6 +967,8 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl PersistentSubscription subscription = (PersistentSubscription) topic .createSubscription(subscriptionName, InitialPosition.Latest).get(); + // Mark the cursor as "inactive" as it was created without a real consumer connected + subscription.deactivateCursor(); subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get(); log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName, subscriptionName, messageId); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java index 6fe74bfa93916..316ebc593c523 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/CompactorSubscription.java @@ -45,6 +45,9 @@ public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopi checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)); this.compactedTopic = compactedTopic; + // Avoid compactor cursor to cause entries to be cached + this.cursor.setAlwaysInactive(); + Map properties = cursor.getProperties(); if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) { long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java index 443d29bde7e90..daf9646c9d3aa 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java @@ -166,26 +166,6 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { }, null); } - public CompletableFuture initialize() { - // Check whether the dedup cursor was already present - for (ManagedCursor cursor : managedLedger.getCursors()) { - if (cursor.getName().equals(PersistentTopic.DEDUPLICATION_CURSOR_NAME)) { - // Deduplication was enabled before - this.status = Status.Recovering; - this.managedCursor = cursor; - break; - } - } - - if (status == Status.Recovering) { - // Recover the current cursor and then check the configuration - return recoverSequenceIdsMap().thenCompose(v -> checkStatus()); - } else { - // No-op - return CompletableFuture.completedFuture(null); - } - } - public Status getStatus() { return status; } @@ -238,7 +218,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) { @Override public void openCursorComplete(ManagedCursor cursor, Object ctx) { // We don't want to retain cache for this cursor - cursor.setInactive(); + cursor.setAlwaysInactive(); managedCursor = cursor; recoverSequenceIdsMap().thenRun(() -> { status = Status.Enabled;