Skip to content

Commit

Permalink
GEODE-6823 Hang in ElderInitProcessor.init()
Browse files Browse the repository at this point in the history
This corrects elder init processing to use the isCloseInProgress to
check for shutdown.  A coding error during refactoring caused it to check the
isCloseInProgress() method, which did more than just return the value of
the isCloseInProgress variable and was incorrectly reporting a close in progress
during startup operations.

I've renamed the old isCloseInProgress() method to avoid similar coding
errors in the future and added a new implementation that merely returns
the value of the field, as you'd expect it to do.

While writing tests I found that the ClusterElderManagerTest was leaving
blocked threads behind because the waitForElder() method in
ClusterElderManager was not interruptable.  I've changed that method to
be interruptable.  We don't interrupt message-processing threads so this
should be a safe change.
  • Loading branch information
bschuchardt committed Jun 3, 2019
1 parent cd2eae3 commit ef18e4d
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ private void runUntilShutdown(Runnable r) {
throw err;
} catch (Throwable t) {
SystemFailure.checkFailure();
if (isCloseInProgress()) {
if (shouldInhibitMembershipWarnings()) {
logger.debug("Caught unusual exception during shutdown: {}", t.getMessage(), t);
} else {
logger.warn("Task failed with exception", t);
Expand Down Expand Up @@ -1975,18 +1975,18 @@ public void removeAllMembershipListener(MembershipListener l) {
}
}

/**
* Returns true if this DM or the DistributedSystem owned by it is closing or is closed.
*/
boolean isCloseInProgress() {
if (closeInProgress) {
private boolean shouldInhibitMembershipWarnings() {
if (isCloseInProgress()) {
return true;
}
InternalDistributedSystem ds = getSystem();
return ds != null && ds.isDisconnecting();
}

public boolean isShutdownStarted() {
/**
* Returns true if this distribution manager has initiated shutdown
*/
public boolean isCloseInProgress() {
return closeInProgress;
}

Expand Down Expand Up @@ -2051,7 +2051,7 @@ && isClosed()) {
membershipEventQueue.take();
handleMemberEvent(ev);
} catch (InterruptedException e) {
if (isCloseInProgress()) {
if (shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: InterruptedException during shutdown");
}
Expand All @@ -2062,7 +2062,7 @@ && isClosed()) {
} catch (DistributedSystemDisconnectedException e) {
break;
} catch (CancelException e) {
if (isCloseInProgress()) {
if (shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: cancelled");
}
Expand Down Expand Up @@ -2653,7 +2653,7 @@ public void handleManagerDeparture(InternalDistributedMember theId, boolean p_cr
stats.incNodes(-1);
}
String msg;
if (p_crashed && !isCloseInProgress()) {
if (p_crashed && !shouldInhibitMembershipWarnings()) {
msg =
"Member at {} unexpectedly left the distributed cache: {}";
addMemberEvent(new MemberCrashedEvent(theId, p_reason));
Expand Down Expand Up @@ -2881,7 +2881,7 @@ public boolean isLoner() {
}

@Override
public ElderState getElderState(boolean waitToBecomeElder) {
public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
return clusterElderManager.getElderState(waitToBecomeElder);
}

Expand All @@ -2890,7 +2890,8 @@ public ElderState getElderState(boolean waitToBecomeElder) {
*
* @return true if newElder is the elder; false if it is no longer a member or we are the elder.
*/
public boolean waitForElder(final InternalDistributedMember desiredElder) {
public boolean waitForElder(final InternalDistributedMember desiredElder)
throws InterruptedException {

return clusterElderManager.waitForElder(desiredElder);
}
Expand Down Expand Up @@ -3476,7 +3477,7 @@ private void handleEvent(ClusterDistributionManager manager,
try {
handleEvent(manager, listener);
} catch (CancelException e) {
if (manager.isCloseInProgress()) {
if (manager.shouldInhibitMembershipWarnings()) {
if (logger.isTraceEnabled()) {
logger.trace("MemberEventInvoker: cancelled");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public boolean isElder() {
return clusterDistributionManager.getId().equals(getElderCandidate());
}

public ElderState getElderState(boolean waitToBecomeElder) {
public ElderState getElderState(boolean waitToBecomeElder) throws InterruptedException {
if (waitToBecomeElder) {
// This should always return true.
waitForElder(clusterDistributionManager.getId());
Expand Down Expand Up @@ -121,22 +121,16 @@ private ElderState initializeElderState() {
* @return true if desiredElder is the elder; false if it is no longer a member or the local
* member is the elder
*/
public boolean waitForElder(final InternalDistributedMember desiredElder) {
public boolean waitForElder(final InternalDistributedMember desiredElder)
throws InterruptedException {
MembershipChangeListener changeListener =
new MembershipChangeListener();

clusterDistributionManager.addMembershipListener(changeListener);

boolean interrupted = false;
InternalDistributedMember currentElder;

try {
if (logger.isDebugEnabled()) {
currentElder = getElderCandidate();
logger.debug("Waiting for Elder to change. Expecting Elder to be {}, is {}.",
desiredElder, currentElder);
}

while (true) {
if (clusterDistributionManager.isCloseInProgress()) {
return false;
Expand All @@ -145,6 +139,10 @@ public boolean waitForElder(final InternalDistributedMember desiredElder) {
if (desiredElder.equals(currentElder)) {
return true;
}
if (logger.isDebugEnabled()) {
logger.debug("Expecting Elder to be {} but it is {}.",
desiredElder, currentElder);
}
if (!clusterDistributionManager.isCurrentMember(desiredElder)) {
return false; // no longer present
}
Expand All @@ -155,18 +153,13 @@ public boolean waitForElder(final InternalDistributedMember desiredElder) {
return false;
}

try {
changeListener.waitForMembershipChange();
} catch (InterruptedException e) {
interrupted = true;
if (logger.isDebugEnabled()) {
logger.debug("Waiting for membership to change");
}
changeListener.waitForMembershipChange();
}
} finally {
clusterDistributionManager.removeMembershipListener(changeListener);

if (interrupted) {
Thread.currentThread().interrupt();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ Set<InternalDistributedMember> addMembershipListenerAndGetDistributionManagerIds
* @throws IllegalStateException if elder try lock fails
* @since GemFire 4.0
*/
ElderState getElderState(boolean force);
ElderState getElderState(boolean force) throws InterruptedException;

/**
* Returns the membership port of the underlying distribution manager used for communication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,21 +169,27 @@ protected void process(ClusterDistributionManager dm) {
ArrayList grantorVersions = new ArrayList(); // grantor versions
ArrayList grantorSerialNumbers = new ArrayList(); // serial numbers of grantor svcs
ArrayList nonGrantors = new ArrayList(); // svc names non-grantor for
if (dm.waitForElder(this.getSender())) {
GrantorRequestProcessor.readyForElderRecovery(dm.getSystem(), this.getSender(), null);
DLockService.recoverRmtElder(grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
} else if (dm.getOtherNormalDistributionManagerIds().isEmpty()) {
// Either we're alone (and received a message from an unknown member) or else we haven't
// yet processed a view. In either case, we clearly don't have any grantors,
// so we return empty lists.

logger.info(LogMarker.DLS_MARKER,
"{}: returning empty lists because I know of no other members.",
this);
reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
} else {
logger.info(LogMarker.DLS_MARKER, "{}: disregarding request from departed member.", this);
try {
if (dm.waitForElder(this.getSender())) {
GrantorRequestProcessor.readyForElderRecovery(dm.getSystem(), this.getSender(), null);
DLockService
.recoverRmtElder(grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
} else if (dm.getOtherNormalDistributionManagerIds().isEmpty()) {
// Either we're alone (and received a message from an unknown member) or else we haven't
// yet processed a view. In either case, we clearly don't have any grantors,
// so we return empty lists.

logger.info(LogMarker.DLS_MARKER,
"{}: returning empty lists because I know of no other members.",
this);
reply(dm, grantors, grantorVersions, grantorSerialNumbers, nonGrantors);
} else {
logger.info(LogMarker.DLS_MARKER, "{}: disregarding request from departed member.", this);
}
} catch (InterruptedException e) {
// shutting down
logger.info("Elder initialization interrupted - will not send a reply");
}
}

Expand All @@ -207,7 +213,7 @@ public void toData(DataOutput out) throws IOException {
@Override
public String toString() {
StringBuffer buff = new StringBuffer();
buff.append("ElderInitMessage (processorId='").append(this.processorId).append(")");
buff.append("ElderInitMessage (processorId=").append(this.processorId).append(")");
return buff.toString();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ private static void elderSyncWait(InternalDistributedSystem sys,
* Sets currentElder to the memberId of the current elder if elder is remote; null if elder is in
* our vm.
*/
private static ElderState startElderCall(InternalDistributedSystem sys, DLockService dls) {
private static ElderState startElderCall(InternalDistributedSystem sys, DLockService dls)
throws InterruptedException {
InternalDistributedMember elder;
ElderState es = null;

Expand Down Expand Up @@ -328,7 +329,12 @@ private static GrantorInfo basicOp(long grantorVersion, String serviceName, DLoc
try {
do {
tryNewElder = false;
final ElderState es = startElderCall(system, service);
ElderState es = null;
try {
es = startElderCall(system, service);
} catch (InterruptedException e) {
interrupted = true;
}
dm.throwIfDistributionStopped();
try {
if (es != null) {
Expand Down Expand Up @@ -491,7 +497,13 @@ protected void process(ClusterDistributionManager dm) {

protected void basicProcess(final DistributionManager dm) {
// we should be in the elder
ElderState es = dm.getElderState(true);
final ElderState es;
try {
es = dm.getElderState(true);
} catch (InterruptedException e) {
logger.info("Interrupted while processing {}", this);
return;
}
switch (this.opCode) {
case GET_OP:
replyGrantorInfo(dm, es.getGrantor(this.serviceName, getSender(), this.dlsSerialNumber));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2620,7 +2620,7 @@ protected boolean removeEldestEntry(Map.Entry eldest) {
@Override
public boolean isShutdownStarted() {
ClusterDistributionManager dm = listener.getDM();
return shutdownInProgress || (dm != null && dm.isShutdownStarted());
return shutdownInProgress || (dm != null && dm.isCloseInProgress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,15 @@ public void isNotElderIfOldestMember() {
}

@Test
public void waitForElderReturnsTrueIfAnotherMemberIsElder() {
public void waitForElderReturnsTrueIfAnotherMemberIsElder() throws InterruptedException {
ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
when(clusterDistributionManager.getId()).thenReturn(member0);
when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
assertThat(clusterElderManager.waitForElder(member1)).isTrue();
}

@Test
public void waitForElderReturnsFalseIfWeAreElder() {
public void waitForElderReturnsFalseIfWeAreElder() throws InterruptedException {
ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
when(clusterDistributionManager.getId()).thenReturn(member0);
when(clusterDistributionManager.isCurrentMember(eq(member1))).thenReturn(true);
Expand All @@ -140,7 +140,8 @@ public void waitForElderReturnsFalseIfWeAreElder() {
}

@Test
public void waitForElderReturnsFalseIfDesiredElderIsNotACurrentMember() {
public void waitForElderReturnsFalseIfDesiredElderIsNotACurrentMember()
throws InterruptedException {
ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
when(clusterDistributionManager.getId()).thenReturn(member0);
when(clusterDistributionManager.getViewMembers())
Expand All @@ -154,8 +155,24 @@ public void waitForElderWaits() {
when(clusterDistributionManager.getId()).thenReturn(member0);
when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true);
when(clusterDistributionManager.isCloseInProgress()).thenReturn(false);

assertThatInterruptableRunnableWaits(() -> {
try {
clusterElderManager.waitForElder(member0);
} catch (InterruptedException e) {
}
});
}

assertThatRunnableWaits(() -> clusterElderManager.waitForElder(member0));
@Test
public void waitForElderDoesNotWaitIfShuttingDown() throws InterruptedException {
ClusterElderManager clusterElderManager = new ClusterElderManager(clusterDistributionManager);
when(clusterDistributionManager.getId()).thenReturn(member0);
when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true);
when(clusterDistributionManager.isCloseInProgress()).thenReturn(true);
assertThat(clusterElderManager.waitForElder(member0)).isFalse();
}

@Test
Expand Down Expand Up @@ -193,7 +210,7 @@ public void waitForElderStopsWaitingWhenUpdated() {
}

@Test
public void getElderStateAsElder() {
public void getElderStateAsElder() throws InterruptedException {
Supplier<ElderState> elderStateSupplier = mock(Supplier.class);
ElderState elderState = mock(ElderState.class);
when(elderStateSupplier.get()).thenReturn(elderState);
Expand All @@ -207,7 +224,7 @@ public void getElderStateAsElder() {
}

@Test
public void getElderStateGetsBuiltOnceAsElder() {
public void getElderStateGetsBuiltOnceAsElder() throws InterruptedException {
Supplier<ElderState> elderStateSupplier = mock(Supplier.class);
ElderState elderState = mock(ElderState.class);
when(elderStateSupplier.get()).thenReturn(elderState);
Expand Down Expand Up @@ -244,7 +261,7 @@ public void getElderStateFromMultipleThreadsAsElder() {
}

@Test
public void getElderStateNotAsElder() {
public void getElderStateNotAsElder() throws InterruptedException {
Supplier<ElderState> elderStateSupplier = mock(Supplier.class);
ClusterElderManager clusterElderManager =
new ClusterElderManager(clusterDistributionManager, elderStateSupplier);
Expand All @@ -264,12 +281,17 @@ public void getElderStateWaitsToBecomeElder() {
when(clusterDistributionManager.getViewMembers()).thenReturn(Arrays.asList(member1, member0));
when(clusterDistributionManager.isCurrentMember(eq(member0))).thenReturn(true);

assertThatRunnableWaits(() -> clusterElderManager.getElderState(true));
assertThatInterruptableRunnableWaits(() -> {
try {
clusterElderManager.getElderState(true);
} catch (InterruptedException e) {
}
});

verify(elderStateSupplier, times(0)).get();
}

private void assertThatRunnableWaits(Runnable runnable) {
private void assertThatInterruptableRunnableWaits(Runnable runnable) {
Thread waitThread = new Thread(runnable);

waitThread.start();
Expand All @@ -281,6 +303,7 @@ private void assertThatRunnableWaits(Runnable runnable) {
.until(() -> waitingStates.contains(waitThread.getState()));
} finally {
waitThread.interrupt();
await().until(() -> !waitThread.isAlive());
}
}
}

0 comments on commit ef18e4d

Please sign in to comment.