Skip to content

Commit

Permalink
[Broker] Timeout opening managed ledger operation … (apache#7506)
Browse files Browse the repository at this point in the history
*Motivation*

Currently, broker has a timeout mechanism on loading topics. However, the underlying managed ledger library
doesn't provide a timeout mechanism. This will get into a situation that a TopicLoad operation times out
after 30 seconds. But the CompletableFuture of opening a managed ledger is still kept in the cache of managed ledger
factory. The completable future will never return. So any sub-sequent topic lookups will fail because any
attempts to load a topic will never attempt to re-open a managed ledger.

*Modification*

Introduce a timeout mechanism in the managed ledger factory. If a managed ledger is not open within a given timeout
period, the CompletableFuture will be removed. This allows any subsequent attempts to load topics that can try to
open the managed ledger again.

*Tests*

This problem can be constantly reproduced in a chaos test in Kubernetes by killing k8s worker nodes. It can cause
producer stuck forever until the owner broker pod is restarted. The change has been verified in a chaos testing environment.
  • Loading branch information
sijie authored Jul 16, 2020
1 parent 4e358ef commit 14e3b7a
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@

import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback;
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
Expand Down Expand Up @@ -323,9 +324,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
// a new ledger and write the position into it
ledger.mbean.startCursorLedgerOpenOp();
long ledgerId = info.getCursorsLedgerId();
bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
OpenCallback openCallback = (rc, lh, ctx) -> {
if (log.isInfoEnabled()) {
log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc);
}
if (isBkErrorNotRecoverable(rc)) {
log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name,
Expand Down Expand Up @@ -399,7 +400,14 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
recoveredCursor(position, recoveredProperties, lh);
callback.operationComplete();
}, null);
}, null);
};
try {
bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null);
} catch (Throwable t) {
log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}",
ledger.getName(), ledgerId, name, t);
openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null);
}
}

private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
Expand Down Expand Up @@ -101,6 +102,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
protected final ManagedLedgerFactoryMBeanImpl mbean;

protected final ConcurrentHashMap<String, CompletableFuture<ManagedLedgerImpl>> ledgers = new ConcurrentHashMap<>();
protected final ConcurrentHashMap<String, PendingInitializeManagedLedger> pendingInitializeLedgers =
new ConcurrentHashMap<>();
private final EntryCacheManager entryCacheManager;

private long lastStatTimestamp = System.nanoTime();
Expand All @@ -111,6 +114,18 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {

private static final int StatsPeriodSeconds = 60;

private static class PendingInitializeManagedLedger {

private final ManagedLedgerImpl ledger;
private final long createTimeMs;

PendingInitializeManagedLedger(ManagedLedgerImpl ledger) {
this.ledger = ledger;
this.createTimeMs = System.currentTimeMillis();
}

}

public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, String zkConnection) throws Exception {
this(bkClientConfiguration, zkConnection, new ManagedLedgerFactoryConfig());
}
Expand Down Expand Up @@ -320,18 +335,32 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final

// If the ledger state is bad, remove it from the map.
CompletableFuture<ManagedLedgerImpl> existingFuture = ledgers.get(name);
if (existingFuture != null && existingFuture.isDone()) {
try {
ManagedLedgerImpl l = existingFuture.get();
if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
// Managed ledger is in unusable state. Recreate it.
log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
if (existingFuture != null) {
if (existingFuture.isDone()) {
try {
ManagedLedgerImpl l = existingFuture.get();
if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) {
// Managed ledger is in unusable state. Recreate it.
log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name,
l.getState());
ledgers.remove(name, existingFuture);
ledgers.remove(name, existingFuture);
}
} catch (Exception e) {
// Unable to get the future
log.warn("[{}] Got exception while trying to retrieve ledger", name, e);
}
} catch (Exception e) {
// Unable to get the future
log.warn("[{}] Got exception while trying to retrieve ledger", name, e);
} else {
PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name);
if (null != pendingLedger) {
long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs;
if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) {
log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds,"
+ " remove it from cache to retry ...", name, pendingMs);
ledgers.remove(name, existingFuture);
pendingInitializeLedgers.remove(name, pendingLedger);
}
}

}
}

Expand All @@ -345,16 +374,37 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final
config.getBookKeeperEnsemblePlacementPolicyProperties())),
store, config, scheduledExecutor,
orderedExecutor, name, mlOwnershipChecker);
PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger);
pendingInitializeLedgers.put(name, pendingLedger);
newledger.initialize(new ManagedLedgerInitializeLedgerCallback() {
@Override
public void initializeComplete() {
log.info("[{}] Successfully initialize managed ledger", name);
pendingInitializeLedgers.remove(name, pendingLedger);
future.complete(newledger);
}

@Override
public void initializeFailed(ManagedLedgerException e) {
log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage());

// Clean the map if initialization fails
ledgers.remove(name, future);

if (pendingInitializeLedgers.remove(name, pendingLedger)) {
pendingLedger.ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
// no-op
}

@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to a pending initialization managed ledger", name, exception);
}
}, null);
}

future.completeExceptionally(e);
}
}, null);
Expand Down

0 comments on commit 14e3b7a

Please sign in to comment.