From ef18e4d6c93455518023ca71ed481a70f1771fde Mon Sep 17 00:00:00 2001 From: Bruce Schuchardt Date: Mon, 3 Jun 2019 08:57:31 -0700 Subject: [PATCH] GEODE-6823 Hang in ElderInitProcessor.init() This corrects elder init processing to use the isCloseInProgress to check for shutdown. A coding error during refactoring caused it to check the isCloseInProgress() method, which did more than just return the value of the isCloseInProgress variable and was incorrectly reporting a close in progress during startup operations. I've renamed the old isCloseInProgress() method to avoid similar coding errors in the future and added a new implementation that merely returns the value of the field, as you'd expect it to do. While writing tests I found that the ClusterElderManagerTest was leaving blocked threads behind because the waitForElder() method in ClusterElderManager was not interruptable. I've changed that method to be interruptable. We don't interrupt message-processing threads so this should be a safe change. --- .../internal/ClusterDistributionManager.java | 27 ++++++------ .../internal/ClusterElderManager.java | 27 +++++------- .../internal/DistributionManager.java | 2 +- .../internal/locks/ElderInitProcessor.java | 38 +++++++++-------- .../locks/GrantorRequestProcessor.java | 18 ++++++-- .../gms/mgr/GMSMembershipManager.java | 2 +- .../internal/ClusterElderManagerTest.java | 41 +++++++++++++++---- 7 files changed, 95 insertions(+), 60 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java index 316d0991d57a..0b7baed30d8a 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterDistributionManager.java @@ -968,7 +968,7 @@ private void runUntilShutdown(Runnable r) { throw err; } catch (Throwable t) { SystemFailure.checkFailure(); - if (isCloseInProgress()) { + if (shouldInhibitMembershipWarnings()) { logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t); } else { logger.warn("Task failed with exception", t); @@ -1975,18 +1975,18 @@ public void removeAllMembershipListener(MembershipListener l) { } } - /** - * Returns true if this DM or the DistributedSystem owned by it is closing or is closed. - */ - boolean isCloseInProgress() { - if (closeInProgress) { + private boolean shouldInhibitMembershipWarnings() { + if (isCloseInProgress()) { return true; } InternalDistributedSystem ds = getSystem(); return ds != null && ds.isDisconnecting(); } - public boolean isShutdownStarted() { + /** + * Returns true if this distribution manager has initiated shutdown + */ + public boolean isCloseInProgress() { return closeInProgress; } @@ -2051,7 +2051,7 @@ && isClosed()) { membershipEventQueue.take(); handleMemberEvent(ev); } catch (InterruptedException e) { - if (isCloseInProgress()) { + if (shouldInhibitMembershipWarnings()) { if (logger.isTraceEnabled()) { logger.trace("MemberEventInvoker: InterruptedException during shutdown"); } @@ -2062,7 +2062,7 @@ && isClosed()) { } catch (DistributedSystemDisconnectedException e) { break; } catch (CancelException e) { - if (isCloseInProgress()) { + if (shouldInhibitMembershipWarnings()) { if (logger.isTraceEnabled()) { logger.trace("MemberEventInvoker: cancelled"); } @@ -2653,7 +2653,7 @@ public void handleManagerDeparture(InternalDistributedMember theId, boolean p_cr stats.incNodes(-1); } String msg; - if (p_crashed && !isCloseInProgress()) { + if (p_crashed && !shouldInhibitMembershipWarnings()) { msg = "Member at {} unexpectedly left the distributed cache: {}"; addMemberEvent(new MemberCrashedEvent(theId, p_reason)); @@ -2881,7 +2881,7 @@ public boolean isLoner() { } @Override - public ElderState getElderState(boolean waitToBecomeElder) { + public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException { return clusterElderManager.getElderState(waitToBecomeElder); } @@ -2890,7 +2890,8 @@ public ElderState getElderState(boolean waitToBecomeElder) { * * @return true if newElder is the elder; false if it is no longer a member or we are the elder. */ - public boolean waitForElder(final InternalDistributedMember desiredElder) { + public boolean waitForElder(final InternalDistributedMember desiredElder) + throws InterruptedException { return clusterElderManager.waitForElder(desiredElder); } @@ -3476,7 +3477,7 @@ private void handleEvent(ClusterDistributionManager manager, try { handleEvent(manager, listener); } catch (CancelException e) { - if (manager.isCloseInProgress()) { + if (manager.shouldInhibitMembershipWarnings()) { if (logger.isTraceEnabled()) { logger.trace("MemberEventInvoker: cancelled"); } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java index f18ea4584768..debe3e188e22 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/ClusterElderManager.java @@ -82,7 +82,7 @@ public boolean isElder() { return clusterDistributionManager.getId().equals(getElderCandidate()); } - public ElderState getElderState(boolean waitToBecomeElder) { + public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException { if (waitToBecomeElder) { // This should always return true. waitForElder(clusterDistributionManager.getId()); @@ -121,22 +121,16 @@ private ElderState initializeElderState() { * @return true if desiredElder is the elder; false if it is no longer a member or the local * member is the elder */ - public boolean waitForElder(final InternalDistributedMember desiredElder) { + public boolean waitForElder(final InternalDistributedMember desiredElder) + throws InterruptedException { MembershipChangeListener changeListener = new MembershipChangeListener(); clusterDistributionManager.addMembershipListener(changeListener); - boolean interrupted = false; InternalDistributedMember currentElder; try { - if (logger.isDebugEnabled()) { - currentElder = getElderCandidate(); - logger.debug("Waiting for Elder to change. Expecting Elder to be {}, is {}.", - desiredElder, currentElder); - } - while (true) { if (clusterDistributionManager.isCloseInProgress()) { return false; @@ -145,6 +139,10 @@ public boolean waitForElder(final InternalDistributedMember desiredElder) { if (desiredElder.equals(currentElder)) { return true; } + if (logger.isDebugEnabled()) { + logger.debug("Expecting Elder to be {} but it is {}.", + desiredElder, currentElder); + } if (!clusterDistributionManager.isCurrentMember(desiredElder)) { return false; // no longer present } @@ -155,18 +153,13 @@ public boolean waitForElder(final InternalDistributedMember desiredElder) { return false; } - try { - changeListener.waitForMembershipChange(); - } catch (InterruptedException e) { - interrupted = true; + if (logger.isDebugEnabled()) { + logger.debug("Waiting for membership to change"); } + changeListener.waitForMembershipChange(); } } finally { clusterDistributionManager.removeMembershipListener(changeListener); - - if (interrupted) { - Thread.currentThread().interrupt(); - } } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java index 0dd633007c58..2e52900ab553 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/DistributionManager.java @@ -166,7 +166,7 @@ Set addMembershipListenerAndGetDistributionManagerIds * @throws IllegalStateException if elder try lock fails * @since GemFire 4.0 */ - ElderState getElderState(boolean force); + ElderState getElderState(boolean force) throws InterruptedException; /** * Returns the membership port of the underlying distribution manager used for communication. diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java index a693f9f562dc..8c56e2fbcf54 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/ElderInitProcessor.java @@ -169,21 +169,27 @@ protected void process(ClusterDistributionManager dm) { ArrayList grantorVersions = new ArrayList(); // grantor versions ArrayList grantorSerialNumbers = new ArrayList(); // serial numbers of grantor svcs ArrayList nonGrantors = new ArrayList(); // svc names non-grantor for - if (dm.waitForElder(this.getSender())) { - GrantorRequestProcessor.readyForElderRecovery(dm.getSystem(), this.getSender(), null); - DLockService.recoverRmtElder(grantors, grantorVersions, grantorSerialNumbers, nonGrantors); - reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors); - } else if (dm.getOtherNormalDistributionManagerIds().isEmpty()) { - // Either we're alone (and received a message from an unknown member) or else we haven't - // yet processed a view. In either case, we clearly don't have any grantors, - // so we return empty lists. - - logger.info(LogMarker.DLS_MARKER, - "{}: returning empty lists because I know of no other members.", - this); - reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors); - } else { - logger.info(LogMarker.DLS_MARKER, "{}: disregarding request from departed member.", this); + try { + if (dm.waitForElder(this.getSender())) { + GrantorRequestProcessor.readyForElderRecovery(dm.getSystem(), this.getSender(), null); + DLockService + .recoverRmtElder(grantors, grantorVersions, grantorSerialNumbers, nonGrantors); + reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors); + } else if (dm.getOtherNormalDistributionManagerIds().isEmpty()) { + // Either we're alone (and received a message from an unknown member) or else we haven't + // yet processed a view. In either case, we clearly don't have any grantors, + // so we return empty lists. + + logger.info(LogMarker.DLS_MARKER, + "{}: returning empty lists because I know of no other members.", + this); + reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors); + } else { + logger.info(LogMarker.DLS_MARKER, "{}: disregarding request from departed member.", this); + } + } catch (InterruptedException e) { + // shutting down + logger.info("Elder initialization interrupted - will not send a reply"); } } @@ -207,7 +213,7 @@ public void toData(DataOutput out) throws IOException { @Override public String toString() { StringBuffer buff = new StringBuffer(); - buff.append("ElderInitMessage (processorId='").append(this.processorId).append(")"); + buff.append("ElderInitMessage (processorId=").append(this.processorId).append(")"); return buff.toString(); } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java index cc8dbee62a28..0edf3388b451 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/locks/GrantorRequestProcessor.java @@ -209,7 +209,8 @@ private static void elderSyncWait(InternalDistributedSystem sys, * Sets currentElder to the memberId of the current elder if elder is remote; null if elder is in * our vm. */ - private static ElderState startElderCall(InternalDistributedSystem sys, DLockService dls) { + private static ElderState startElderCall(InternalDistributedSystem sys, DLockService dls) + throws InterruptedException { InternalDistributedMember elder; ElderState es = null; @@ -328,7 +329,12 @@ private static GrantorInfo basicOp(long grantorVersion, String serviceName, DLoc try { do { tryNewElder = false; - final ElderState es = startElderCall(system, service); + ElderState es = null; + try { + es = startElderCall(system, service); + } catch (InterruptedException e) { + interrupted = true; + } dm.throwIfDistributionStopped(); try { if (es != null) { @@ -491,7 +497,13 @@ protected void process(ClusterDistributionManager dm) { protected void basicProcess(final DistributionManager dm) { // we should be in the elder - ElderState es = dm.getElderState(true); + final ElderState es; + try { + es = dm.getElderState(true); + } catch (InterruptedException e) { + logger.info("Interrupted while processing {}", this); + return; + } switch (this.opCode) { case GET_OP: replyGrantorInfo(dm, es.getGrantor(this.serviceName, getSender(), this.dlsSerialNumber)); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java index f9352eacff37..46110e25824b 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/mgr/GMSMembershipManager.java @@ -2620,7 +2620,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) { @Override public boolean isShutdownStarted() { ClusterDistributionManager dm = listener.getDM(); - return shutdownInProgress || (dm != null && dm.isShutdownStarted()); + return shutdownInProgress || (dm != null && dm.isCloseInProgress()); } @Override diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java index 8f9cad9d29c5..48b0a1fae1c9 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/ClusterElderManagerTest.java @@ -123,7 +123,7 @@ public void isNotElderIfOldestMember() { } @Test - public void waitForElderReturnsTrueIfAnotherMemberIsElder() { + public void waitForElderReturnsTrueIfAnotherMemberIsElder() throws InterruptedException { ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager); when(clusterDistributionManager.getId()).thenReturn(member0); when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0)); @@ -131,7 +131,7 @@ public void waitForElderReturnsTrueIfAnotherMemberIsElder() { } @Test - public void waitForElderReturnsFalseIfWeAreElder() { + public void waitForElderReturnsFalseIfWeAreElder() throws InterruptedException { ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager); when(clusterDistributionManager.getId()).thenReturn(member0); when(clusterDistributionManager.isCurrentMember(eq(member1))).thenReturn(true); @@ -140,7 +140,8 @@ public void waitForElderReturnsFalseIfWeAreElder() { } @Test - public void waitForElderReturnsFalseIfDesiredElderIsNotACurrentMember() { + public void waitForElderReturnsFalseIfDesiredElderIsNotACurrentMember() + throws InterruptedException { ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager); when(clusterDistributionManager.getId()).thenReturn(member0); when(clusterDistributionManager.getViewMembers()) @@ -154,8 +155,24 @@ public void waitForElderWaits() { when(clusterDistributionManager.getId()).thenReturn(member0); when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0)); when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true); + when(clusterDistributionManager.isCloseInProgress()).thenReturn(false); + + assertThatInterruptableRunnableWaits(() -> { + try { + clusterElderManager.waitForElder(member0); + } catch (InterruptedException e) { + } + }); + } - assertThatRunnableWaits(() -> clusterElderManager.waitForElder(member0)); + @Test + public void waitForElderDoesNotWaitIfShuttingDown() throws InterruptedException { + ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager); + when(clusterDistributionManager.getId()).thenReturn(member0); + when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0)); + when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true); + when(clusterDistributionManager.isCloseInProgress()).thenReturn(true); + assertThat(clusterElderManager.waitForElder(member0)).isFalse(); } @Test @@ -193,7 +210,7 @@ public void waitForElderStopsWaitingWhenUpdated() { } @Test - public void getElderStateAsElder() { + public void getElderStateAsElder() throws InterruptedException { Supplier elderStateSupplier = mock(Supplier.class); ElderState elderState = mock(ElderState.class); when(elderStateSupplier.get()).thenReturn(elderState); @@ -207,7 +224,7 @@ public void getElderStateAsElder() { } @Test - public void getElderStateGetsBuiltOnceAsElder() { + public void getElderStateGetsBuiltOnceAsElder() throws InterruptedException { Supplier elderStateSupplier = mock(Supplier.class); ElderState elderState = mock(ElderState.class); when(elderStateSupplier.get()).thenReturn(elderState); @@ -244,7 +261,7 @@ public void getElderStateFromMultipleThreadsAsElder() { } @Test - public void getElderStateNotAsElder() { + public void getElderStateNotAsElder() throws InterruptedException { Supplier elderStateSupplier = mock(Supplier.class); ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager, elderStateSupplier); @@ -264,12 +281,17 @@ public void getElderStateWaitsToBecomeElder() { when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0)); when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true); - assertThatRunnableWaits(() -> clusterElderManager.getElderState(true)); + assertThatInterruptableRunnableWaits(() -> { + try { + clusterElderManager.getElderState(true); + } catch (InterruptedException e) { + } + }); verify(elderStateSupplier, times(0)).get(); } - private void assertThatRunnableWaits(Runnable runnable) { + private void assertThatInterruptableRunnableWaits(Runnable runnable) { Thread waitThread = new Thread(runnable); waitThread.start(); @@ -281,6 +303,7 @@ private void assertThatRunnableWaits(Runnable runnable) { .until(() -> waitingStates.contains(waitThread.getState())); } finally { waitThread.interrupt(); + await().until(() -> !waitThread.isAlive()); } } }