Skip to content

Commit

Permalink
Set the dedup cursor as "inactive" after recovery (apache#3612)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored and massakam committed Mar 8, 2019
1 parent c5f4a69 commit a337e04
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,12 @@ Set<? extends Position> asyncReplayEntries(
*/
void setInactive();

/**
* A cursor that is set as always-inactive will never trigger the caching of
* entries.
*/
void setAlwaysInactive();

/**
* Checks if cursor is active or not.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ public class ManagedCursorImpl implements ManagedCursor {

private RateLimiter markDeleteLimiter;

private boolean alwaysInactive = false;

class MarkDeleteEntry {
final PositionImpl newPosition;
final MarkDeleteCallback callback;
Expand Down Expand Up @@ -785,7 +787,9 @@ public void asyncFindNewestMatching(FindPositionConstraint constraint, Predicate

@Override
public void setActive() {
ledger.activateCursor(this);
if (!alwaysInactive) {
ledger.activateCursor(this);
}
}

@Override
Expand All @@ -798,6 +802,12 @@ public void setInactive() {
ledger.deactivateCursor(this);
}

@Override
public void setAlwaysInactive() {
setInactive();
this.alwaysInactive = true;
}

@Override
public Position getFirstPosition() {
Long firstLedgerId = ledger.getLedgersInfo().firstKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
final Map<String, CompletableFuture<ManagedCursor>> uninitializedCursors;

final EntryCache entryCache;

private ScheduledFuture<?> timeoutTask;

/**
Expand Down Expand Up @@ -334,7 +334,7 @@ public void operationFailed(MetaStoreException e) {
}
}
});

scheduleTimeoutTask();
}

Expand Down Expand Up @@ -1174,7 +1174,7 @@ public synchronized void asyncClose(final CloseCallback callback, final Object c

closeAllCursors(callback, ctx);
}, null);

if (this.timeoutTask != null) {
this.timeoutTask.cancel(false);
}
Expand Down Expand Up @@ -1698,7 +1698,7 @@ private boolean checkCallbackCompleted(Object ctx) {
// if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
// assigned to different request
boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount;
// consider callback is completed if: Callback is already recycled or read-complete flag is true
// consider callback is completed if: Callback is already recycled or read-complete flag is true
return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE);
}

Expand Down Expand Up @@ -3010,7 +3010,7 @@ public static ManagedLedgerException createManagedLedgerException(Throwable t) {
/**
* Create ledger async and schedule a timeout task to check ledger-creation is complete else it fails the callback
* with TimeoutException.
*
*
* @param bookKeeper
* @param config
* @param digestType
Expand Down Expand Up @@ -3038,7 +3038,7 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf

/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
*
*
* @param rc
* @param lh
* @param ctx
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,10 @@ public Position getFirstPosition() {
return null;
}

@Override
public void setAlwaysInactive() {
}

@Override
public List<Entry> replayEntries(Set<? extends Position> positions)
throws InterruptedException, ManagedLedgerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2831,5 +2831,20 @@ public void operationFailed(ManagedLedgerException exception) {
});
}

@Test
void testAlwaysInactive() throws Exception {
ManagedLedger ml = factory.open("testAlwaysInactive");
ManagedCursor cursor = ml.openCursor("c1");

assertTrue(cursor.isActive());

cursor.setAlwaysInactive();

assertFalse(cursor.isActive());

cursor.setActive();
assertFalse(cursor.isActive());
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ protected void internalCreatePartitionedTopic(int numPartitions, boolean authori

protected void internalCreateNonPartitionedTopic(boolean authoritative) {
validateAdminAccessForTenant(topicName.getTenant());

try {
getOrCreateTopic(topicName);
log.info("[{}] Successfully created non-partitioned topic {}", clientAppId(), topicName);
Expand Down Expand Up @@ -705,7 +705,7 @@ protected PartitionedTopicInternalStats internalGetPartitionedStatsInternal(bool
}
return stats;
}

protected void internalDeleteSubscription(String subName, boolean authoritative) {
if (topicName.isGlobal()) {
validateGlobalNamespaceOwnership(namespaceName);
Expand Down Expand Up @@ -967,6 +967,8 @@ protected void internalCreateSubscription(String subscriptionName, MessageIdImpl

PersistentSubscription subscription = (PersistentSubscription) topic
.createSubscription(subscriptionName, InitialPosition.Latest).get();
// Mark the cursor as "inactive" as it was created without a real consumer connected
subscription.deactivateCursor();
subscription.resetCursor(PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId())).get();
log.info("[{}][{}] Successfully created subscription {} at message id {}", clientAppId(), topicName,
subscriptionName, messageId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ public CompactorSubscription(PersistentTopic topic, CompactedTopic compactedTopi
checkArgument(subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION));
this.compactedTopic = compactedTopic;

// Avoid compactor cursor to cause entries to be cached
this.cursor.setAlwaysInactive();

Map<String, Long> properties = cursor.getProperties();
if (properties.containsKey(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY)) {
long compactedLedgerId = properties.get(Compactor.COMPACTED_TOPIC_LEDGER_PROPERTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,26 +166,6 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}, null);
}

public CompletableFuture<Void> initialize() {
// Check whether the dedup cursor was already present
for (ManagedCursor cursor : managedLedger.getCursors()) {
if (cursor.getName().equals(PersistentTopic.DEDUPLICATION_CURSOR_NAME)) {
// Deduplication was enabled before
this.status = Status.Recovering;
this.managedCursor = cursor;
break;
}
}

if (status == Status.Recovering) {
// Recover the current cursor and then check the configuration
return recoverSequenceIdsMap().thenCompose(v -> checkStatus());
} else {
// No-op
return CompletableFuture.completedFuture(null);
}
}

public Status getStatus() {
return status;
}
Expand Down Expand Up @@ -238,7 +218,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
// We don't want to retain cache for this cursor
cursor.setInactive();
cursor.setAlwaysInactive();
managedCursor = cursor;
recoverSequenceIdsMap().thenRun(() -> {
status = Status.Enabled;
Expand Down

0 comments on commit a337e04

Please sign in to comment.