Skip to content

Commit

Permalink
GEODE-8890: Catch ClassCastException in LocatorDiscovery.java (apache…
Browse files Browse the repository at this point in the history
…#6000)

- Handle ClassCastException the same way as ClassNotFoundException
- Added unit tests for LocatorDiscovery
- Modify log messages to be clearer
- Clean up LocatorDiscovery static analyzer warnings

Authored-by: Donal Evans <[email protected]>
  • Loading branch information
DonalEvans authored Feb 4, 2021
1 parent 7d6592e commit f4423bb
Show file tree
Hide file tree
Showing 2 changed files with 432 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
package org.apache.geode.cache.client.internal.locator.wan;


import static org.apache.geode.distributed.internal.WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.logging.log4j.Logger;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.distributed.internal.WanLocatorDiscoverer;
import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
Expand All @@ -33,33 +36,37 @@
import org.apache.geode.util.internal.GeodeGlossary;

/**
* This class represent a runnable task which exchange the locator information with local
* locators(within the site) as well as remote locators (across the site)
* This class represents a runnable task which exchanges the locator information with local
* locators (within the site) as well as remote locators (across the site)
*
* @since GemFire 7.0
*/
public class LocatorDiscovery {

private static final Logger logger = LogService.getLogger();

private WanLocatorDiscoverer discoverer;
private final WanLocatorDiscoverer discoverer;

private DistributionLocatorId locatorId;
private final DistributionLocatorId locatorId;

private LocatorMembershipListener locatorListener;
private final LocatorMembershipListener locatorListener;

RemoteLocatorJoinRequest request;

TcpClient locatorClient;

public static final int WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT =
Integer.getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000).intValue();
Integer.getInteger("WANLocator.CONNECTION_RETRY_ATTEMPT", 50000);

public static final int WAN_LOCATOR_CONNECTION_INTERVAL =
Integer.getInteger("WANLocator.CONNECTION_INTERVAL", 10000).intValue();
Integer.getInteger("WANLocator.CONNECTION_INTERVAL", 10000);

public static final int WAN_LOCATOR_PING_INTERVAL =
Integer.getInteger("WANLocator.PING_INTERVAL", 10000).intValue();
Integer.getInteger("WANLocator.PING_INTERVAL", 10000);

// For testing. When true, Thread.sleep() is not called in exchangeLocalLocators() or
// exchangeRemoteLocators()
private final boolean skipWaiting;

public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,
RemoteLocatorJoinRequest request, LocatorMembershipListener locatorListener) {
Expand All @@ -72,14 +79,28 @@ public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId l
InternalDataSerializer.getDSFIDSerializer().getObjectSerializer(),
InternalDataSerializer.getDSFIDSerializer().getObjectDeserializer(),
TcpSocketFactory.DEFAULT);
this.skipWaiting = false;
}

// Test constructor
@VisibleForTesting
LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId locator,
RemoteLocatorJoinRequest request, LocatorMembershipListener locatorListener,
TcpClient locatorClient) {
this.discoverer = discoverer;
this.locatorId = locator;
this.request = request;
this.locatorListener = locatorListener;
this.locatorClient = locatorClient;
this.skipWaiting = true;
}

/**
* When a batch fails, then this keeps the last time when a failure was logged . We don't want to
* swamp the logs in retries due to same batch failures.
*/
private final ConcurrentHashMap<DistributionLocatorId, long[]> failureLogInterval =
new ConcurrentHashMap<DistributionLocatorId, long[]>();
new ConcurrentHashMap<>();

/**
* The maximum size of {@link #failureLogInterval} beyond which it will start logging all failure
Expand All @@ -96,10 +117,10 @@ public LocatorDiscovery(WanLocatorDiscoverer discoverer, DistributionLocatorId l

public boolean skipFailureLogging(DistributionLocatorId locatorId) {
boolean skipLogging = false;
if (this.failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
long[] logInterval = this.failureLogInterval.get(locatorId);
if (failureLogInterval.size() < FAILURE_MAP_MAXSIZE) {
long[] logInterval = failureLogInterval.get(locatorId);
if (logInterval == null) {
logInterval = this.failureLogInterval.putIfAbsent(locatorId,
logInterval = failureLogInterval.putIfAbsent(locatorId,
new long[] {System.currentTimeMillis(), 1000});
}
if (logInterval != null) {
Expand Down Expand Up @@ -132,76 +153,68 @@ public void run() {
}
}

private WanLocatorDiscoverer getDiscoverer() {
return this.discoverer;
}

private void exchangeLocalLocators() {
int retryAttempt = 1;
while (!getDiscoverer().isStopped()) {
while (!discoverer.isStopped()) {
try {
RemoteLocatorJoinResponse response =
(RemoteLocatorJoinResponse) locatorClient.requestToServer(locatorId.getHost(),
request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse) locatorClient
.requestToServer(locatorId.getHost(), request, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (response != null) {
LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener);
logger.info("Locator discovery task exchanged locator information {} with {}: {}.",
new Object[] {request.getLocator(), locatorId, response.getLocators()});
addExchangedLocators(response);
logger.info(
"Locator discovery task for locator {} exchanged locator information with {}: {}.",
request.getLocator(), locatorId, response.getLocators());
break;
}
} catch (IOException ioe) {
if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
ConnectionException coe =
new ConnectionException("Not able to connect to local locator after "
+ WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT + " retry attempts", ioe);
logger.fatal(String.format(
"Locator discovery task could not exchange locator information %s with %s after %s retry attempts.",
new Object[] {request.getLocator(), locatorId, retryAttempt}),
coe);
logger.fatal(
"Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts.",
request.getLocator(), locatorId, retryAttempt, coe);
break;
}
if (skipFailureLogging(locatorId)) {
logger.warn(
"Locator discovery task could not exchange locator information {} with {} after {} retry attempts. Retrying in {} ms.",
new Object[] {request.getLocator(), locatorId, retryAttempt,
WAN_LOCATOR_CONNECTION_INTERVAL});
"Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts. Retrying in {} ms.",
request.getLocator(), locatorId, retryAttempt, WAN_LOCATOR_CONNECTION_INTERVAL);
}
try {
Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
if (!skipWaiting) {
Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
retryAttempt++;
continue;
} catch (ClassNotFoundException classNotFoundException) {
logger.fatal("Locator discovery task encountred unexpected exception",
classNotFoundException);
} catch (ClassNotFoundException | ClassCastException ex) {
logger.fatal("Locator discovery task encountered unexpected exception", ex);
break;
}
}
}

public void exchangeRemoteLocators() {
int retryAttempt = 1;
DistributionLocatorId remoteLocator = this.locatorId;
while (!getDiscoverer().isStopped()) {
RemoteLocatorJoinResponse response;
while (!discoverer.isStopped()) {
try {
response =
(RemoteLocatorJoinResponse) locatorClient.requestToServer(
remoteLocator.getHost(),
request, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
RemoteLocatorJoinResponse response = (RemoteLocatorJoinResponse) locatorClient
.requestToServer(locatorId.getHost(), request, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (response != null) {
LocatorHelper.addExchangedLocators(response.getLocators(), this.locatorListener);
logger.info("Locator discovery task exchanged locator information {} with {}: {}.",
new Object[] {request.getLocator(), locatorId, response.getLocators()});
addExchangedLocators(response);
logger.info(
"Locator discovery task for locator {} exchanged locator information with {}: {}.",
request.getLocator(), locatorId, response.getLocators());
RemoteLocatorPingRequest pingRequest = new RemoteLocatorPingRequest("");
while (true) {
Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
RemoteLocatorPingResponse pingResponse =
(RemoteLocatorPingResponse) locatorClient.requestToServer(
new HostAndPort(remoteLocator.getHostName(), remoteLocator.getPort()),
pingRequest, WanLocatorDiscoverer.WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (!skipWaiting) {
Thread.sleep(WAN_LOCATOR_PING_INTERVAL);
}
RemoteLocatorPingResponse pingResponse = (RemoteLocatorPingResponse) locatorClient
.requestToServer(new HostAndPort(locatorId.getHostName(), locatorId.getPort()),
pingRequest, WAN_LOCATOR_CONNECTION_TIMEOUT, true);
if (pingResponse != null) {
continue;
}
Expand All @@ -210,33 +223,37 @@ public void exchangeRemoteLocators() {
}
} catch (IOException ioe) {
if (retryAttempt == WAN_LOCATOR_CONNECTION_RETRY_ATTEMPT) {
logger.fatal(String.format(
"Locator discovery task could not exchange locator information %s with %s after %s retry attempts.",
new Object[] {request.getLocator(), remoteLocator, retryAttempt}),
ioe);
logger.fatal(
"Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts.",
request.getLocator(), locatorId, retryAttempt, ioe);
break;
}
if (skipFailureLogging(remoteLocator)) {
if (skipFailureLogging(locatorId)) {
logger.warn(
"Locator discovery task could not exchange locator information {} with {} after {} retry attempts. Retrying in {} ms.",
new Object[] {request.getLocator(), remoteLocator, retryAttempt,
"Locator discovery task for locator {} could not exchange locator information with {} after {} retry attempts. Retrying in {} ms.",
new Object[] {request.getLocator(), locatorId, retryAttempt,
WAN_LOCATOR_CONNECTION_INTERVAL});
}
try {
Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
if (!skipWaiting) {
Thread.sleep(WAN_LOCATOR_CONNECTION_INTERVAL);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
retryAttempt++;
continue;
} catch (ClassNotFoundException classNotFoundException) {
logger.fatal("Locator discovery task encountred unexpected exception",
classNotFoundException);
} catch (ClassNotFoundException | ClassCastException ex) {
logger.fatal("Locator discovery task encountered unexpected exception", ex);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

@VisibleForTesting
void addExchangedLocators(RemoteLocatorJoinResponse response) {
LocatorHelper.addExchangedLocators(response.getLocators(), locatorListener);
}

}
Loading

0 comments on commit f4423bb

Please sign in to comment.