Skip to content

Commit

Permalink
GEODE-7760: NPE in Locator during auto-reconnect (apache#4668)
Browse files Browse the repository at this point in the history
* GEODE-7760: NPE in Locator during auto-reconnect

* empty commit

* fixes for stress-testing

* working on another stresstest failure

This test passes all the time outside of stress testing

* ignoring test that seems to have trouble with workingDir/configDir and temporary folders in stress testing

* remove use of static locator variable in InternalLocator.  added crash test.  unset shutdownHandled after locator restarts

* fixing stresstest issues
  • Loading branch information
bschuchardt authored Feb 7, 2020
1 parent bda6bdf commit af83072
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/
package org.apache.geode.distributed;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.geode.distributed.ConfigurationProperties.DISABLE_AUTO_RECONNECT;
Expand All @@ -22,6 +23,7 @@
import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
import static org.apache.geode.distributed.ConfigurationProperties.LOCATOR_WAIT_TIME;
import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL;
import static org.apache.geode.distributed.ConfigurationProperties.MAX_WAIT_TIME_RECONNECT;
import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
import static org.apache.geode.distributed.ConfigurationProperties.MEMBER_TIMEOUT;
import static org.apache.geode.distributed.ConfigurationProperties.NAME;
Expand Down Expand Up @@ -100,9 +102,11 @@
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.tcp.Connection;
import org.apache.geode.logging.internal.log4j.api.LogService;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.apache.geode.test.dunit.DUnitBlackboard;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.Invoke;
import org.apache.geode.test.dunit.NetworkUtils;
Expand Down Expand Up @@ -250,6 +254,53 @@ protected static void stopLocator() {
}
}

@Test
public void testCrashLocatorMultipleTimes() throws Exception {
port1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
DistributedTestUtils.deleteLocatorStateFile(port1);
File logFile = new File("");
File stateFile = new File("locator" + port1 + "state.dat");
VM vm = VM.getVM(0);
final Properties properties =
getBasicProperties(Host.getHost(0).getHostName() + "[" + port1 + "]");
int memberTimeoutMS = 3000;
properties.put(MEMBER_TIMEOUT, "" + memberTimeoutMS);
properties.put(MAX_WAIT_TIME_RECONNECT, "" + (3 * memberTimeoutMS));
// since we're restarting location services let's be a little forgiving about that service
// starting up so that stress-tests can pass
properties.put(LOCATOR_WAIT_TIME, "" + (3 * memberTimeoutMS));
addDSProps(properties);
if (stateFile.exists()) {
assertThat(stateFile.delete()).isTrue();
}

Locator locator = Locator.startLocatorAndDS(port1, logFile, properties);
system = (InternalDistributedSystem) locator.getDistributedSystem();

vm.invoke(() -> {
getConnectedDistributedSystem(properties);
return null;
});

try {
for (int i = 0; i < 4; i++) {
forceDisconnect();
system.waitUntilReconnected(GeodeAwaitility.getTimeout().getValueInMS(), MILLISECONDS);
assertThat(system.getReconnectedSystem()).isNotNull();
system = (InternalDistributedSystem) system.getReconnectedSystem();
}
assertEquals(2, ((InternalDistributedSystem) locator.getDistributedSystem()).getDM()
.getViewMembers().size());
} finally {
vm.invoke("disconnect", () -> {
getConnectedDistributedSystem(properties).disconnect();
return null;
});
locator.stop();
}

}

/**
* This tests that the locator can resume control as coordinator after all locators have been shut
* down and one is restarted. It's necessary to have a lock service start so elder failover is
Expand Down Expand Up @@ -1581,11 +1632,13 @@ Properties getBasicProperties(String locators) {
return props;
}

private Properties getClusterProperties(String locators, String s) {
private Properties getClusterProperties(String locators,
String enableNetworkPartitionDetectionString) {
Properties properties = getBasicProperties(locators);
properties.setProperty(DISABLE_AUTO_RECONNECT, "true");
properties.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false");
properties.setProperty(ENABLE_NETWORK_PARTITION_DETECTION, s);
properties.setProperty(ENABLE_NETWORK_PARTITION_DETECTION,
enableNetworkPartitionDetectionString);
properties.setProperty(LOCATOR_WAIT_TIME, "10"); // seconds
properties.setProperty(MEMBER_TIMEOUT, "2000");
properties.setProperty(USE_CLUSTER_CONFIGURATION, "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package org.apache.geode.distributed.internal;

import static org.apache.geode.internal.AvailablePortHelper.getRandomAvailableTCPPort;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand All @@ -28,6 +27,7 @@

import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand All @@ -36,6 +36,7 @@
import org.mockito.junit.MockitoRule;
import org.mockito.quality.Strictness;

import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.logging.InternalLogWriter;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.logging.internal.LoggingSession;
Expand Down Expand Up @@ -68,12 +69,16 @@ public class InternalLocatorIntegrationTest {

@Before
public void setUp() throws IOException {
port = getRandomAvailableTCPPort();
port = 0;
hostnameForClients = "";
bindAddress = null;

logFile = temporaryFolder.newFile("logfile.log");
workingDirectory = temporaryFolder.getRoot().toPath();

if (Locator.hasLocator()) {
Locator.getLocator().stop();
}
}

@After
Expand All @@ -98,11 +103,26 @@ public void constructs() {
}).doesNotThrowAnyException();
}

@Test
public void restartingClusterConfigurationDoesNotThrowException() throws IOException {
internalLocator = InternalLocator.startLocator(port, logFile, logWriter,
securityLogWriter, bindAddress, true,
distributedSystemProperties, hostnameForClients, workingDirectory);
port = internalLocator.getPort();
internalLocator.stop(true, true, false);
assertThat(InternalLocator.getLocator()).isNull();
// try starting a cluster configuration service when a reconnected locator doesn't exist
assertThatCode(() -> {
internalLocator.startClusterManagementService();
}).doesNotThrowAnyException();
}

@Test
public void startedLocatorIsRunning() throws IOException {
internalLocator = InternalLocator.startLocator(port, logFile, logWriter,
securityLogWriter, bindAddress, true,
distributedSystemProperties, hostnameForClients, workingDirectory);
port = internalLocator.getPort();

assertThat(internalLocator.isStopped()).isFalse();
}
Expand All @@ -112,6 +132,7 @@ public void startedLocatorHasLocator() throws IOException {
internalLocator = InternalLocator.startLocator(port, logFile, logWriter,
securityLogWriter, bindAddress, true,
distributedSystemProperties, hostnameForClients, workingDirectory);
port = internalLocator.getPort();

assertThat(InternalLocator.hasLocator()).isTrue();
}
Expand All @@ -121,6 +142,7 @@ public void stoppedLocatorIsStopped() throws IOException {
internalLocator = InternalLocator.startLocator(port, logFile, logWriter,
securityLogWriter, bindAddress, true,
distributedSystemProperties, hostnameForClients, workingDirectory);
port = internalLocator.getPort();

internalLocator.stop();

Expand All @@ -132,13 +154,15 @@ public void stoppedLocatorDoesNotHaveLocator() throws IOException {
internalLocator = InternalLocator.startLocator(port, logFile, logWriter,
securityLogWriter, bindAddress, true,
distributedSystemProperties, hostnameForClients, workingDirectory);
port = internalLocator.getPort();

internalLocator.stop();

assertThat(InternalLocator.hasLocator()).isFalse();
}

@Test
@Ignore("GEODE-7762 this test fails repeatedly in stress tests")
public void startLocatorFail() throws Exception {
Properties properties = new Properties();
// use this property to induce a NPE when calling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1624,7 +1624,7 @@ protected void disconnect(boolean preparingForReconnect, String reason, boolean
dm.close();
// we close the locator after the DM so that when split-brain detection
// is enabled, loss of the locator doesn't cause the DM to croak
if (startedLocator != null) {
if (startedLocator != null && !isReconnectingDS) {
startedLocator.stop(forcedDisconnect, preparingForReconnect, false);
startedLocator = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -773,15 +773,18 @@ private void startCache(DistributedSystem system) throws IOException {
startClusterManagementService();
}

private void startClusterManagementService() throws IOException {
@VisibleForTesting
void startClusterManagementService() throws IOException {
startConfigurationPersistenceService();

if (internalCache == null) {
InternalCache myCache = this.internalCache;

if (myCache == null) {
return;
}

clusterManagementService = new LocatorClusterManagementService(locator.internalCache,
locator.configurationPersistenceService);
clusterManagementService = new LocatorClusterManagementService(myCache,
configurationPersistenceService);

// start management rest service
AgentUtil agentUtil = new AgentUtil(GemFireVersion.getGemFireVersion());
Expand All @@ -796,7 +799,7 @@ private void startClusterManagementService() throws IOException {

Map<String, Object> serviceAttributes = new HashMap<>();
serviceAttributes.put(HttpService.SECURITY_SERVICE_SERVLET_CONTEXT_PARAM,
internalCache.getSecurityService());
myCache.getSecurityService());
serviceAttributes.put(HttpService.CLUSTER_MANAGEMENT_SERVICE_CONTEXT_PARAM,
clusterManagementService);

Expand All @@ -807,7 +810,7 @@ private void startClusterManagementService() throws IOException {
serviceAttributes.put(HttpService.AUTH_TOKEN_ENABLED_PARAM, managementAuthTokenEnabled);

if (distributionConfig.getEnableManagementRestService()) {
internalCache.getOptionalService(HttpService.class).ifPresent(x -> {
myCache.getOptionalService(HttpService.class).ifPresent(x -> {
try {
logger.info("Geode Property {}=true Geode Management Rest Service is enabled.",
ConfigurationProperties.ENABLE_MANAGEMENT_REST_SERVICE);
Expand Down Expand Up @@ -948,6 +951,7 @@ public void stop(boolean forcedDisconnect, boolean stopForReconnect, boolean wai
locatorDiscoverer = null;
}

// stop the TCPServer
membershipLocator.stop();

removeLocator(this);
Expand Down Expand Up @@ -1060,12 +1064,13 @@ private void launchRestartThread() {
} catch (IOException e) {
logger.info("attempt to restart location services terminated", e);
} finally {
shutdownHandled.set(false);
if (!restarted) {
stoppedForReconnect = false;
}
reconnected = restarted;
restartThread = null;
}
restartThread = null;
});
restartThread.start();
}
Expand Down Expand Up @@ -1094,15 +1099,15 @@ private boolean attemptReconnect() throws InterruptedException, IOException {

while (system.getReconnectedSystem() == null && !system.isReconnectCancelled()) {
if (quorumChecker == null) {
quorumChecker = internalDistributedSystem.getQuorumChecker();
quorumChecker = system.getQuorumChecker();
if (quorumChecker != null) {
logger.info("The distributed system returned this quorum checker: {}", quorumChecker);
}
}

if (quorumChecker != null && !tcpServerStarted) {
boolean start = quorumChecker
.checkForQuorum(3L * internalDistributedSystem.getConfig().getMemberTimeout());
.checkForQuorum(3L * system.getConfig().getMemberTimeout());
if (start) {
// start up peer location. server location is started after the DS finishes reconnecting
logger.info("starting peer location");
Expand All @@ -1121,9 +1126,7 @@ private boolean attemptReconnect() throws InterruptedException, IOException {
try {
system.waitUntilReconnected(waitTime, TimeUnit.MILLISECONDS);
} catch (CancelException e) {
logger.info("Attempt to reconnect failed and further attempts have been terminated");
stoppedForReconnect = false;
return false;
continue; // DistributedSystem failed to restart - loop until it gives up
}
}

Expand Down Expand Up @@ -1370,14 +1373,14 @@ private void startConfigurationPersistenceService() throws IOException {
}


if (locator.configurationPersistenceService == null) {
if (configurationPersistenceService == null) {
// configurationPersistenceService will already be created in case of auto-reconnect
locator.configurationPersistenceService =
new InternalConfigurationPersistenceService(locator.internalCache, workingDirectory,
configurationPersistenceService =
new InternalConfigurationPersistenceService(internalCache, workingDirectory,
JAXBService.create());
}
locator.configurationPersistenceService
.initSharedConfiguration(locator.loadFromSharedConfigDir());
configurationPersistenceService
.initSharedConfiguration(loadFromSharedConfigDir());
logger.info(
"Cluster configuration service start up completed successfully and is now running ....");
isSharedConfigurationStarted = true;
Expand Down Expand Up @@ -1419,8 +1422,8 @@ public SharedConfigurationStatusResponse call() throws InterruptedException {
InternalLocator locator = InternalLocator.this;

SharedConfigurationStatusResponse response;
if (locator.configurationPersistenceService != null) {
response = locator.configurationPersistenceService.createStatusResponse();
if (configurationPersistenceService != null) {
response = configurationPersistenceService.createStatusResponse();
} else {
response = new SharedConfigurationStatusResponse();
response.setStatus(SharedConfigurationStatus.UNDETERMINED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import javax.xml.transform.sax.SAXSource;
import javax.xml.transform.stream.StreamResult;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.xml.sax.Attributes;
import org.xml.sax.ContentHandler;
import org.xml.sax.DTDHandler;
Expand All @@ -55,6 +56,7 @@
import org.xml.sax.XMLReader;
import org.xml.sax.helpers.AttributesImpl;

import org.apache.geode.CancelException;
import org.apache.geode.InternalGemFireException;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.AttributesFactory;
Expand Down Expand Up @@ -397,6 +399,9 @@ private void generate(PrintWriter pw) {
pw.flush();

} catch (Exception ex) {
if (ExceptionUtils.getRootCause(ex) instanceof CancelException) {
throw (CancelException) ExceptionUtils.getRootCause(ex);
}
throw new RuntimeException("An Exception was thrown while generating XML.", ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public synchronized boolean checkForQuorum(long timeout) throws InterruptedExcep
return true;
}

resume(); // make sure this quorum checker is the JGroups receiver

if (isInfoEnabled) {
logger.info("beginning quorum check with {}", this);
}
Expand Down

0 comments on commit af83072

Please sign in to comment.