Skip to content

Commit

Permalink
GEODE-4451: Changed sender startup to retry when a remote security ex…
Browse files Browse the repository at this point in the history
…ception occurs

* GEODE-4451: Changed sender startup to retry when a remote security exception occurs

* GEODE-4451: Prevented sender from being created when members aren't all current version

* GEODE-4451: Apply spotless

* GEODE-4451: Refactored test to use ConfigurationProperties
  • Loading branch information
boglesby authored Mar 21, 2018
1 parent c689dfd commit 8e8ec93
Show file tree
Hide file tree
Showing 13 changed files with 367 additions and 154 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private void waitForRunningStatus() {
if (ex != null) {
throw new GatewaySenderException(
LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1
.toLocalizedString(new Object[] {this.getId(), ex.getMessage()}),
.toLocalizedString(new Object[] {this.sender.getId(), ex.getMessage()}),
ex.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1577,8 +1577,7 @@ private class BatchRemovalThread extends Thread {
* Constructor : Creates and initializes the thread
*/
public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) {
super("BatchRemovalThread");
// TODO:REF: Name for this thread ?
super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index);
this.setDaemon(true);
this.cache = c;
this.parallelQueue = queue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ private void waitForRunningStatus() {
if (ex != null) {
throw new GatewaySenderException(
LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1
.toLocalizedString(new Object[] {this.getId(), ex.getMessage()}),
.toLocalizedString(new Object[] {this.sender.getId(), ex.getMessage()}),
ex.getCause());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2122,7 +2122,7 @@ public class LocalizedStrings {
public static final StringId CacheClientProxy_EXCEPTION_OCCURRED_WHILE_TRYING_TO_CREATE_A_MESSAGE_QUEUE =
new StringId(2297, "Exception occurred while trying to create a message queue.");
public static final StringId GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1 =
new StringId(2298, "{0} : Could not connect. {1}");
new StringId(2298, "{0} : Could not connect due to: {1}");

public static final StringId CacheCollector_UNABLE_TO_MIX_REGION_AND_ENTRY_SNAPSHOTS_IN_CACHECOLLECTOR =
new StringId(2300, "Unable to mix region and entry snapshots in CacheCollector.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

import org.apache.geode.cache.wan.GatewaySender.OrderPolicy;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Version;
import org.apache.geode.management.cli.CliMetaData;
import org.apache.geode.management.cli.ConverterHint;
import org.apache.geode.management.cli.Result;
Expand Down Expand Up @@ -116,6 +118,12 @@ public Result createGatewaySender(@CliOption(key = {CliStrings.GROUP, CliStrings

Set<DistributedMember> membersToCreateGatewaySenderOn = getMembers(onGroups, onMember);

// Don't allow sender to be created if all members are not the current version.
if (!verifyAllCurrentVersion(membersToCreateGatewaySenderOn)) {
return ResultBuilder.createUserErrorResult(
CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
}

List<CliFunctionResult> gatewaySenderCreateResults =
executeAndGetFunctionResult(GatewaySenderCreateFunction.INSTANCE, gatewaySenderFunctionArgs,
membersToCreateGatewaySenderOn);
Expand All @@ -139,6 +147,11 @@ public Result createGatewaySender(@CliOption(key = {CliStrings.GROUP, CliStrings
return result;
}

private boolean verifyAllCurrentVersion(Set<DistributedMember> members) {
return members.stream().allMatch(
member -> ((InternalDistributedMember) member).getVersionObject().equals(Version.CURRENT));
}

public static class Interceptor extends AbstractCliAroundInterceptor {
@Override
public Result preExecution(GfshParseResult parseResult) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2248,6 +2248,8 @@ public class CliStrings {
"Could not instantiate class \"{0}\" specified for \"{1}\".";
public static final String CREATE_GATEWAYSENDER__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 =
"Could not access class \"{0}\" specified for \"{1}\".";
public static final String CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
"Gateway Sender cannot be created until all members are the current version";

/* stop gateway-receiver */
public static final String START_GATEWAYSENDER = "start gateway-sender";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

Expand All @@ -33,9 +34,13 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.InternalClusterConfigurationService;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Version;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.management.internal.cli.functions.CliFunctionResult;
import org.apache.geode.management.internal.cli.i18n.CliStrings;
import org.apache.geode.management.internal.configuration.domain.XmlEntity;
import org.apache.geode.test.junit.categories.UnitTest;
import org.apache.geode.test.junit.rules.GfshParserRule;
Expand Down Expand Up @@ -148,4 +153,22 @@ public void whenNoXml() {
.hasNoFailToPersistError();
verify(ccService, never()).deleteXmlEntity(any(), any());
}

@Test
public void whenMembersAreDifferentVersions() {
// Create a set of mixed version members
Set<DistributedMember> members = new HashSet<>();
InternalDistributedMember currentVersionMember = mock(InternalDistributedMember.class);
doReturn(Version.CURRENT).when(currentVersionMember).getVersionObject();
InternalDistributedMember oldVersionMember = mock(InternalDistributedMember.class);
doReturn(Version.GEODE_140).when(oldVersionMember).getVersionObject();
members.add(currentVersionMember);
members.add(oldVersionMember);
doReturn(members).when(command).getMembers(any(), any());

// Verify executing the command fails
gfsh.executeAndAssertThat(command,
"create gateway-sender --id=1 --remote-distributed-system-id=1").statusIsError()
.containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,7 @@ public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor ev
try {
initializeConnection();
} catch (GatewaySenderException e) {
if (e.getCause() instanceof GemFireSecurityException) {
throw e;
}

// It is ok to ignore this exception. It is logged in the initializeConnection call.
}
}

Expand Down Expand Up @@ -168,7 +165,7 @@ public boolean dispatchBatch(List events, boolean removeFromQueueOnException, bo
// if our pool is shutdown then just be silent
} else if (t instanceof IOException || t instanceof ServerConnectivityException
|| t instanceof ConnectionDestroyedException || t instanceof MessageTooLargeException
|| t instanceof IllegalStateException) {
|| t instanceof IllegalStateException || t instanceof GemFireSecurityException) {
this.processor.handleException();
// If the cause is an IOException or a ServerException, sleep and retry.
// Sleep for a bit and recheck.
Expand Down Expand Up @@ -431,58 +428,29 @@ private void initializeConnection() throws GatewaySenderException, GemFireSecuri
}
}
} catch (ServerConnectivityException e) {
this.failedConnectCount++;
Throwable ex = null;
// Get the exception to throw
GatewaySenderException gse = getInitializeConnectionExceptionToThrow(e);

if (e.getCause() instanceof GemFireSecurityException) {
ex = e.getCause();
if (logConnectionFailure()) {
// only log this message once; another msg is logged once we connect
logger.warn(LocalizedMessage.create(
LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
new Object[] {this.processor.getSender().getId(), ex.getMessage()}));
}
throw new GatewaySenderException(ex);
}
List<ServerLocation> servers = this.sender.getProxy().getCurrentServers();
String ioMsg = null;
if (servers.size() == 0) {
ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
.toLocalizedString();
} else {
final StringBuilder buffer = new StringBuilder();
for (ServerLocation server : servers) {
String endpointName = String.valueOf(server);
if (buffer.length() > 0) {
buffer.append(", ");
}
buffer.append(endpointName);
}
ioMsg =
LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
.toLocalizedString(buffer.toString());
}
ex = new IOException(ioMsg);
// Set the serverLocation to null so that a new connection can be
// obtained in next attempt
// Set the serverLocation to null so that a new connection can be obtained in next attempt
this.sender.setServerLocation(null);
if (this.failedConnectCount == 1) {

// Log the exception if necessary
if (logConnectionFailure()) {
// only log this message once; another msg is logged once we connect
logger.warn(LocalizedMessage.create(
LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT,
this.processor.getSender().getId()));

LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1,
new Object[] {this.processor.getSender().getId(), gse.getCause().getMessage()}));
}
// Wrap the IOException in a GatewayException so it can be processed the
// same as the other exceptions that might occur in sendBatch.
throw new GatewaySenderException(
LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
.toLocalizedString(this.processor.getSender().getId()),
ex);

// Increment failed connection count
this.failedConnectCount++;

// Throw the exception
throw gse;
}
if (this.failedConnectCount > 0) {
Object[] logArgs = new Object[] {this.processor.getSender().getId(), con,
Integer.valueOf(this.failedConnectCount)};
Object[] logArgs =
new Object[] {this.processor.getSender().getId(), con, this.failedConnectCount};
logger.info(LocalizedMessage.create(
LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS,
logArgs));
Expand All @@ -496,14 +464,47 @@ private void initializeConnection() throws GatewaySenderException, GemFireSecuri
this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize());
} catch (ConnectionDestroyedException e) {
throw new GatewaySenderException(
LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT
.toLocalizedString(this.processor.getSender().getId()),
LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1.toLocalizedString(
new Object[] {this.processor.getSender().getId(), e.getMessage()}),
e);
} finally {
this.connectionLifeCycleLock.writeLock().unlock();
}
}

private GatewaySenderException getInitializeConnectionExceptionToThrow(
ServerConnectivityException e) {
GatewaySenderException gse = null;
if (e.getCause() instanceof GemFireSecurityException) {
gse = new GatewaySenderException(e.getCause());
} else {
List<ServerLocation> servers = this.sender.getProxy().getCurrentServers();
String ioMsg;
if (servers.size() == 0) {
ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS
.toLocalizedString();
} else {
final StringBuilder buffer = new StringBuilder();
for (ServerLocation server : servers) {
String endpointName = String.valueOf(server);
if (buffer.length() > 0) {
buffer.append(", ");
}
buffer.append(endpointName);
}
ioMsg =
LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0
.toLocalizedString(buffer.toString());
}
IOException ex = new IOException(ioMsg);
gse = new GatewaySenderException(
LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1.toLocalizedString(
new Object[] {this.processor.getSender().getId(), ex.getMessage()}),
ex);
}
return gse;
}

protected boolean logConnectionFailure() {
// always log the first failure
if (logger.isDebugEnabled() || this.failedConnectCount == 0) {
Expand Down
Loading

0 comments on commit 8e8ec93

Please sign in to comment.