Skip to content

Commit

Permalink
Add config to lazily recover cursors when recovering a managed ledger. (
Browse files Browse the repository at this point in the history
apache#7858)

Fixes apache#7404

### Motivation
Currently, we have to wait until all cursors to be recovered before completing opening a managed ledger. This results in very long recovery time for the managed ledger. Pulsar is a messaging/streaming system. Write availability is critical to a lot of core business applications. We need to make it always writable. Hence we should decouple loading cursors from loading managed ledger. This ensures producers can move forward immediately once the data ledgers are ready.

### Modifications
Add config to lazily recover cursors when recovering a managed ledger.
Will initial cursor recovery and put it into uninitializedCursors map.
This can speed up managed ledger recovery and write availability,
but with the caveat that when topic is recovered we're not sure if all old cursor can be recovered or not.
  • Loading branch information
MarvinCai authored Sep 9, 2020
1 parent fe621a6 commit 22b8923
Show file tree
Hide file tree
Showing 8 changed files with 136 additions and 26 deletions.
6 changes: 6 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,12 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

# Whether to recover cursors lazily when trying to recover a managed ledger backing a persistent topic.
# It can improve write availability of topics.
# The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark
# delete position can be recovered or not.
lazyCursorRecovery=false

# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60

Expand Down
6 changes: 6 additions & 0 deletions deployment/terraform-ansible/templates/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,12 @@ managedLedgerMaxUnackedRangesToPersistInZooKeeper=1000
# corrupted at bookkeeper and managed-cursor is stuck at that ledger.
autoSkipNonRecoverableData=false

# Whether to recover cursors lazily when trying to recover a managed ledger backing a persistent topic.
# It can improve write availability of topics.
# The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark
# delete position can be recovered or not.
lazyCursorRecovery=false

# operation timeout while updating managed-ledger metadata.
managedLedgerMetadataOperationsTimeoutSeconds=60

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ManagedLedgerConfig {
private long retentionTimeMs = 0;
private long retentionSizeInMB = 0;
private boolean autoSkipNonRecoverableData;
private boolean lazyCursorRecovery = false;
private long metadataOperationsTimeoutSeconds = 60;
private long readEntryTimeoutSeconds = 120;
private long addEntryTimeoutSeconds = 120;
Expand All @@ -82,6 +83,25 @@ public ManagedLedgerConfig setCreateIfMissing(boolean createIfMissing) {
return this;
}

/**
* @return the lazyCursorRecovery
*/
public boolean isLazyCursorRecovery() {
return lazyCursorRecovery;
}

/**
* Whether to recover cursors lazily when trying to recover a
* managed ledger backing a persistent topic. It can improve write availability of topics.
* The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark
* delete position can be recovered or not.
* @param lazyCursorRecovery if enable lazy cursor recovery.
*/
public ManagedLedgerConfig setLazyCursorRecovery(boolean lazyCursorRecovery) {
this.lazyCursorRecovery = lazyCursorRecovery;
return this;
}

/**
* @return the maxEntriesPerLedger
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,34 +465,70 @@ public void operationComplete(List<String> consumers, Stat s) {
return;
}

for (final String cursorName : consumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] Loading cursor {}", name, cursorName);
}
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName);

cursor.recover(new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
cursors.add(cursor);

if (cursorCount.decrementAndGet() == 0) {
// The initialization is now completed, register the jmx mbean
callback.initializeComplete();
if (!ManagedLedgerImpl.this.config.isLazyCursorRecovery()) {
log.debug("[{}] Loading cursor {}", name);

for (final String cursorName : consumers) {
log.info("[{}] Loading cursor {}", name, cursorName);
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName);

cursor.recover(new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
cursors.add(cursor);

if (cursorCount.decrementAndGet() == 0) {
// The initialization is now completed, register the jmx mbean
callback.initializeComplete();
}
}
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Recovery for cursor {} failed", name, cursorName, exception);
cursorCount.set(-1);
callback.initializeFailed(exception);
@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Recovery for cursor {} failed", name, cursorName, exception);
cursorCount.set(-1);
callback.initializeFailed(exception);
}
});
}
} else {
// Lazily recover cursors by put them to uninitializedCursors map.
for (final String cursorName : consumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] Recovering cursor {} lazily" , name, cursorName);
}
});
final ManagedCursorImpl cursor;
cursor = new ManagedCursorImpl(bookKeeper, config, ManagedLedgerImpl.this, cursorName);
CompletableFuture<ManagedCursor> cursorRecoveryFuture = new CompletableFuture<>();
uninitializedCursors.put(cursorName, cursorRecoveryFuture);

cursor.recover(new VoidCallback() {
@Override
public void operationComplete() {
log.info("[{}] Lazy recovery for cursor {} completed. pos={} -- todo={}", name, cursorName,
cursor.getMarkDeletedPosition(), cursorCount.get() - 1);
cursor.setActive();
synchronized (ManagedLedgerImpl.this) {
cursors.add(cursor);
uninitializedCursors.remove(cursor.getName()).complete(cursor);
}
}

@Override
public void operationFailed(ManagedLedgerException exception) {
log.warn("[{}] Lazy recovery for cursor {} failed", name, cursorName, exception);
synchronized (ManagedLedgerImpl.this) {
uninitializedCursors.remove(cursor.getName()).completeExceptionally(exception);
}
}
});
}
// Complete ledger recovery.
callback.initializeComplete();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

import io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
Expand Down Expand Up @@ -2194,6 +2196,39 @@ public void testCursorRecoveryForEmptyLedgers() throws Exception {
assertEquals(c1.getMarkDeletedPosition(), ledger.lastConfirmedEntry);
}

@Test
public void testLazyRecoverCursor() throws Exception {
ManagedLedger ledger = factory.open("testLedger");
ManagedCursor cursor = ledger.openCursor("testCursor");

ledger.addEntry("entry-1".getBytes());
Position p1 = ledger.addEntry("entry-2".getBytes());
cursor.markDelete(p1);

// Re-open from a different factory trigger recovery.
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(bkc, zkc);

// Simulating time consuming cursor recovery.
CompletableFuture<Void> future = bkc.promiseAfter(2);
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("lazyCursorRecovery"));
scheduledExecutorService.schedule(() -> {
future.complete(null);
}, 10, TimeUnit.SECONDS);

ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setLazyCursorRecovery(true);
Long startLedgerRecovery = System.currentTimeMillis();

// Check ledger recovered before time consuming cursor recovery complete.
ledger = factory2.open("testLedger", managedLedgerConfig);
assertTrue(System.currentTimeMillis() - startLedgerRecovery < 5000);

// Check cursor recovered successfully.
cursor = ledger.openCursor("testCursor");
assertEquals(cursor.getMarkDeletedPosition(), p1);
factory2.shutdown();
}

@Test
public void testConcurrentOpenCursor() throws Exception {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testConcurrentOpenCursor");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,12 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int messagePublishBufferCheckIntervalInMillis = 100;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to recover cursors lazily when trying to recover a " +
"managed ledger backing a persistent topic. It can improve write availability of topics.\n" +
"The caveat is now when recovered ledger is ready to write we're not sure if all old consumers last mark " +
"delete position can be recovered or not.")
private boolean lazyCursorRecovery = false;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Check between intervals to see if consumed ledgers need to be trimmed"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1161,6 +1161,7 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES);
managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB());
managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData());
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
OffloadPolicies offloadPolicies = policies.map(p -> p.offload_policies).orElse(null);
if (topicLevelOffloadPolicies != null) {
try {
Expand Down
2 changes: 1 addition & 1 deletion site2/docs/reference-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ subscriptionExpirationTimeMinutes | How long to delete inactive subscriptions fr
|retentionCheckIntervalInSeconds|Check between intervals to see if consumed ledgers need to be trimmed. Use 0 or negative number to disable the check.|120|
| maxMessageSize | Set the maximum size of a message. | 5242880 |
| preciseTopicPublishRateLimiterEnable | Enable precise topic publish rate limiting. | false |

| lazyCursorRecovery | Whether to recover cursors lazily when trying to recover a managed ledger backing a persistent topic. It can improve write availability of topics. The caveat is now when recovered ledger is ready to write we're not sure if all old consumers' last mark delete position(ack position) can be recovered or not. So user can make the trade off or have custom logic in application to checkpoint consumer state.| false |



Expand Down

0 comments on commit 22b8923

Please sign in to comment.