Skip to content

Commit

Permalink
GEODE-6423 availability checks sometimes immediately initiate removal
Browse files Browse the repository at this point in the history
Ensure that the availability check is performed for the contracted
member-timeout period.  This allows a suspect to survive the check if
it's having a momentary glitch like a brief garbage-collection, or if
there is short network outage.

This change caused some "reconnect" tests to fail due to short
auto-reconnect intervals letting disconnected nodes start reconnecting
before suspect processing completed on the force-disconnected nodes.
I've fixed this by reinitializing the UUID part of the membership ID in
JGroupsMessenger during reconnect attempts.
  • Loading branch information
bschuchardt committed Feb 22, 2019
1 parent d866bbd commit 8b29d9e
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.geode.internal.cache.backup.BackupOperation;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase;
import org.apache.geode.test.dunit.rules.CacheRule;
Expand Down Expand Up @@ -73,6 +74,7 @@ public void setUp() throws Exception {
vm0 = getVM(0);
vm1 = getVM(1);
regionName = getClass().getSimpleName() + "-" + testName.getMethodName();
IgnoredException.addIgnoredException("Possible loss of quorum");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_TTL;
import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.apache.geode.test.awaitility.GeodeAwaitility.getTimeout;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -130,7 +131,7 @@ public void initMocks() throws UnknownHostException {
nonDefault.put(MCAST_TTL, "0");
nonDefault.put(LOG_FILE, "");
nonDefault.put(LOG_LEVEL, "fine");
nonDefault.put(MEMBER_TIMEOUT, "2000");
nonDefault.put(MEMBER_TIMEOUT, "" + memberTimeout);
nonDefault.put(LOCATORS, "localhost[10344]");
DistributionManager dm = mock(DistributionManager.class);
SocketCreatorFactory.setDistributionConfig(new DistributionConfigImpl(new Properties()));
Expand Down Expand Up @@ -784,6 +785,44 @@ public void testBeSickAndPlayDead() throws Exception {
gmsHealthMonitor.processMessage(smm);
}

@Test
public void testTcpCheckMemberTriesUntilTimeout() throws Exception {
ServerSocket mySocket = new ServerSocket(0);
Thread serverThread = new Thread() {
public void run() {
long giveupTime = System.currentTimeMillis() + (5 * memberTimeout);
while (System.currentTimeMillis() < giveupTime) {
try {
Socket acceptedSocket = mySocket.accept();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
acceptedSocket.close();
} catch (IOException e) {
if (!mySocket.isClosed()) {
System.err.println("Test failed with unexpected IOException");
e.printStackTrace(System.err);
}
return;
}
}
}
};
serverThread.setDaemon(true);
serverThread.start();
InternalDistributedMember otherMember =
createInternalDistributedMember(Version.CURRENT_ORDINAL, 0, 1, 1);
long startTime = System.currentTimeMillis();
gmsHealthMonitor.doTCPCheckMember(otherMember, mySocket.getLocalPort());
mySocket.close();
serverThread.interrupt();
serverThread.join(getTimeout().getValueInMS());
assertThat(System.currentTimeMillis()).isGreaterThanOrEqualTo(startTime + memberTimeout);
}

@Test
public void testDoTCPCheckMemberWithOkStatus() throws Exception {
executeTestDoTCPCheck(GMSHealthMonitor.OK, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,33 +513,50 @@ private boolean doCheckMember(InternalDistributedMember member, boolean waitForR
*/
boolean doTCPCheckMember(InternalDistributedMember suspectMember, int port) {
Socket clientSocket = null;
try {
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember,
suspectMember.getInetAddress(), port);
clientSocket =
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
.connect(suspectMember.getInetAddress(), port, (int) memberTimeout,
new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
clientSocket.setTcpNoDelay(true);
return doTCPCheckMember(suspectMember, clientSocket);
} catch (IOException e) {
// this is expected if it is a connection-timeout or other failure
// to connect
} catch (IllegalStateException e) {
if (!isStopping) {
logger.trace("Unexpected exception", e);
// make sure we try to check on the member for the contracted memberTimeout period
// in case a timed socket.connect() returns immediately
long giveupTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(
services.getConfig().getMemberTimeout(), TimeUnit.MILLISECONDS);
boolean passed = false;
int iteration = 0;
do {
iteration++;
if (iteration > 1) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
} finally {
try {
if (clientSocket != null) {
clientSocket.setSoLinger(true, 0); // abort the connection
clientSocket.close();
}
logger.debug("Checking member {} with TCP socket connection {}:{}.", suspectMember,
suspectMember.getInetAddress(), port);
clientSocket =
SocketCreatorFactory.getSocketCreatorForComponent(SecurableCommunicationChannel.CLUSTER)
.connect(suspectMember.getInetAddress(), port, (int) memberTimeout,
new ConnectTimeoutTask(services.getTimer(), memberTimeout), false, -1, false);
clientSocket.setTcpNoDelay(true);
passed = doTCPCheckMember(suspectMember, clientSocket);
} catch (IOException e) {
// expected
// this is expected if it is a connection-timeout or other failure
// to connect
} catch (IllegalStateException | GemFireConfigException e) {
if (!isStopping) {
logger.trace("Unexpected exception", e);
}
} finally {
try {
if (clientSocket != null) {
clientSocket.setSoLinger(true, 0); // abort the connection
clientSocket.close();
}
} catch (IOException e) {
// expected
}
}
}
return false;
} while (!passed && !this.isShutdown() && System.nanoTime() < giveupTime);
return passed;
}

// Package protected for testing purposes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1036,6 +1036,7 @@ private void processViewMessage(final InstallViewMessage m) {
this.preparedView.getCreator().equals(view.getCreator())) {
// this can happen if we received two prepares during auto-reconnect
} else {
// send the conflicting view to the creator of this new view
services.getMessenger()
.send(new ViewAckMessage(view.getViewId(), m.getSender(), this.preparedView));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,16 @@ public void start() {
members.add(new UUID(0, 0));// TODO open a JGroups JIRA for GEODE-3034
View jgv = new View(vid, members);
this.myChannel.down(new Event(Event.VIEW_CHANGE, jgv));
UUID logicalAddress = (UUID) myChannel.getAddress();
if (logicalAddress instanceof JGAddress) {
((JGAddress) logicalAddress).setVmViewId(-1);
// attempt to establish a new UUID in the jgroups channel so the member address will be
// different
try {
Method setAddressMethod = JChannel.class.getDeclaredMethod("setAddress");
setAddressMethod.setAccessible(true);
setAddressMethod.invoke(myChannel);
} catch (SecurityException | NoSuchMethodException e) {
logger.warn("Unable to establish a new JGroups address. "
+ "My address will be exactly the same as last time. Exception={}",
e.getMessage());
}
reconnecting = true;
} else {
Expand Down Expand Up @@ -364,7 +371,7 @@ public void start() {
jgroupsReceiver = new JGroupsReceiver();
myChannel.setReceiver(jgroupsReceiver);
if (!reconnecting) {
myChannel.connect("AG"); // apache g***** (whatever we end up calling it)
myChannel.connect("AG"); // Apache Geode
}
} catch (Exception e) {
myChannel.close();
Expand Down Expand Up @@ -556,7 +563,7 @@ private void establishLocalAddress() {
gmsMember.setMemberWeight((byte) (services.getConfig().getMemberWeight() & 0xff));
gmsMember.setNetworkPartitionDetectionEnabled(
services.getConfig().getDistributionConfig().getEnableNetworkPartitionDetection());

logger.info("Established local address {}", localAddress);
services.setLocalAddress(localAddress);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public MemberVM startServerVM(int index, String version,
SerializableFunction<ServerStarterRule> ruleOperator) {
final String defaultName = "server-" + index;
VM serverVM = getVM(index, version);
Server server = serverVM.invoke(() -> {
Server server = serverVM.invoke("startServerVM", () -> {
memberStarter = new ServerStarterRule();
ServerStarterRule serverStarter = (ServerStarterRule) memberStarter;
if (logFile) {
Expand Down

0 comments on commit 8b29d9e

Please sign in to comment.