From 14e3b7ae05e84ca13eefa16026288a384a961e45 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Thu, 16 Jul 2020 06:20:16 -0700 Subject: [PATCH] =?UTF-8?q?[Broker]=20Timeout=20opening=20managed=20ledger?= =?UTF-8?q?=20operation=20=E2=80=A6=20(#7506)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit *Motivation* Currently, broker has a timeout mechanism on loading topics. However, the underlying managed ledger library doesn't provide a timeout mechanism. This will get into a situation that a TopicLoad operation times out after 30 seconds. But the CompletableFuture of opening a managed ledger is still kept in the cache of managed ledger factory. The completable future will never return. So any sub-sequent topic lookups will fail because any attempts to load a topic will never attempt to re-open a managed ledger. *Modification* Introduce a timeout mechanism in the managed ledger factory. If a managed ledger is not open within a given timeout period, the CompletableFuture will be removed. This allows any subsequent attempts to load topics that can try to open the managed ledger again. *Tests* This problem can be constantly reproduced in a chaos test in Kubernetes by killing k8s worker nodes. It can cause producer stuck forever until the owner broker pod is restarted. The change has been verified in a chaos testing environment. --- .../mledger/impl/ManagedCursorImpl.java | 16 +++-- .../impl/ManagedLedgerFactoryImpl.java | 70 ++++++++++++++++--- 2 files changed, 72 insertions(+), 14 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 3e0d5839b1aa5..26f1bf3f97d93 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -64,6 +64,7 @@ import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.DeleteCallback; +import org.apache.bookkeeper.client.AsyncCallback.OpenCallback; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.client.LedgerEntry; @@ -323,9 +324,9 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac // a new ledger and write the position into it ledger.mbean.startCursorLedgerOpenOp(); long ledgerId = info.getCursorsLedgerId(); - bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), (rc, lh, ctx) -> { - if (log.isDebugEnabled()) { - log.debug("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); + OpenCallback openCallback = (rc, lh, ctx) -> { + if (log.isInfoEnabled()) { + log.info("[{}] Opened ledger {} for consumer {}. rc={}", ledger.getName(), ledgerId, name, rc); } if (isBkErrorNotRecoverable(rc)) { log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", ledger.getName(), ledgerId, name, @@ -399,7 +400,14 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac recoveredCursor(position, recoveredProperties, lh); callback.operationComplete(); }, null); - }, null); + }; + try { + bookkeeper.asyncOpenLedger(ledgerId, digestType, config.getPassword(), openCallback, null); + } catch (Throwable t) { + log.error("[{}] Encountered error on opening cursor ledger {} for cursor {}", + ledger.getName(), ledgerId, name, t); + openCallback.openComplete(BKException.Code.UnexpectedConditionException, null, null); + } } private void recoverIndividualDeletedMessages(List individualDeletedMessagesList) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 517c62cdf7ff0..8b470c21f9056 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -50,6 +50,7 @@ import org.apache.bookkeeper.common.util.OrderedScheduler; import org.apache.bookkeeper.conf.ClientConfiguration; import org.apache.bookkeeper.mledger.AsyncCallbacks; +import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; @@ -101,6 +102,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { protected final ManagedLedgerFactoryMBeanImpl mbean; protected final ConcurrentHashMap> ledgers = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap pendingInitializeLedgers = + new ConcurrentHashMap<>(); private final EntryCacheManager entryCacheManager; private long lastStatTimestamp = System.nanoTime(); @@ -111,6 +114,18 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private static final int StatsPeriodSeconds = 60; + private static class PendingInitializeManagedLedger { + + private final ManagedLedgerImpl ledger; + private final long createTimeMs; + + PendingInitializeManagedLedger(ManagedLedgerImpl ledger) { + this.ledger = ledger; + this.createTimeMs = System.currentTimeMillis(); + } + + } + public ManagedLedgerFactoryImpl(ClientConfiguration bkClientConfiguration, String zkConnection) throws Exception { this(bkClientConfiguration, zkConnection, new ManagedLedgerFactoryConfig()); } @@ -320,18 +335,32 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final // If the ledger state is bad, remove it from the map. CompletableFuture existingFuture = ledgers.get(name); - if (existingFuture != null && existingFuture.isDone()) { - try { - ManagedLedgerImpl l = existingFuture.get(); - if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) { - // Managed ledger is in unusable state. Recreate it. - log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name, + if (existingFuture != null) { + if (existingFuture.isDone()) { + try { + ManagedLedgerImpl l = existingFuture.get(); + if (l.getState().equals(State.Fenced.toString()) || l.getState().equals(State.Closed.toString())) { + // Managed ledger is in unusable state. Recreate it. + log.warn("[{}] Attempted to open ledger in {} state. Removing from the map to recreate it", name, l.getState()); - ledgers.remove(name, existingFuture); + ledgers.remove(name, existingFuture); + } + } catch (Exception e) { + // Unable to get the future + log.warn("[{}] Got exception while trying to retrieve ledger", name, e); } - } catch (Exception e) { - // Unable to get the future - log.warn("[{}] Got exception while trying to retrieve ledger", name, e); + } else { + PendingInitializeManagedLedger pendingLedger = pendingInitializeLedgers.get(name); + if (null != pendingLedger) { + long pendingMs = System.currentTimeMillis() - pendingLedger.createTimeMs; + if (pendingMs > TimeUnit.SECONDS.toMillis(config.getMetadataOperationsTimeoutSeconds())) { + log.warn("[{}] Managed ledger has been pending in initialize state more than {} milliseconds," + + " remove it from cache to retry ...", name, pendingMs); + ledgers.remove(name, existingFuture); + pendingInitializeLedgers.remove(name, pendingLedger); + } + } + } } @@ -345,16 +374,37 @@ public void asyncOpen(final String name, final ManagedLedgerConfig config, final config.getBookKeeperEnsemblePlacementPolicyProperties())), store, config, scheduledExecutor, orderedExecutor, name, mlOwnershipChecker); + PendingInitializeManagedLedger pendingLedger = new PendingInitializeManagedLedger(newledger); + pendingInitializeLedgers.put(name, pendingLedger); newledger.initialize(new ManagedLedgerInitializeLedgerCallback() { @Override public void initializeComplete() { + log.info("[{}] Successfully initialize managed ledger", name); + pendingInitializeLedgers.remove(name, pendingLedger); future.complete(newledger); } @Override public void initializeFailed(ManagedLedgerException e) { + log.error("[{}] Failed to initialize managed ledger: {}", name, e.getMessage()); + // Clean the map if initialization fails ledgers.remove(name, future); + + if (pendingInitializeLedgers.remove(name, pendingLedger)) { + pendingLedger.ledger.asyncClose(new CloseCallback() { + @Override + public void closeComplete(Object ctx) { + // no-op + } + + @Override + public void closeFailed(ManagedLedgerException exception, Object ctx) { + log.warn("[{}] Failed to a pending initialization managed ledger", name, exception); + } + }, null); + } + future.completeExceptionally(e); } }, null);