From 8e8ec93d516b76d4fb559ab91c0a5762708d1789 Mon Sep 17 00:00:00 2001 From: Barry Oglesby Date: Wed, 21 Mar 2018 12:39:45 -0700 Subject: [PATCH] GEODE-4451: Changed sender startup to retry when a remote security exception occurs * GEODE-4451: Changed sender startup to retry when a remote security exception occurs * GEODE-4451: Prevented sender from being created when members aren't all current version * GEODE-4451: Apply spotless * GEODE-4451: Refactored test to use ConfigurationProperties --- ...ntParallelGatewaySenderEventProcessor.java | 2 +- .../parallel/ParallelGatewaySenderQueue.java | 3 +- ...rentSerialGatewaySenderEventProcessor.java | 2 +- .../geode/internal/i18n/LocalizedStrings.java | 2 +- .../commands/CreateGatewaySenderCommand.java | 13 ++ .../internal/cli/i18n/CliStrings.java | 2 + .../CreateGatewaySenderCommandTest.java | 23 +++ .../GatewaySenderEventRemoteDispatcher.java | 105 ++++++------ .../cache/wan/WANRollingUpgradeDUnitTest.java | 150 +++++++++++++++--- .../geode/internal/cache/wan/WANTestBase.java | 46 ++---- .../misc/NewWanAuthenticationDUnitTest.java | 86 +++++++--- .../cache/wan/misc/WANSSLDUnitTest.java | 69 +++++--- ...anagerWithInvalidCredentials.security.json | 18 +++ 13 files changed, 367 insertions(+), 154 deletions(-) create mode 100644 geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java index c4b154675568..54b70347f1ee 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java @@ -189,7 +189,7 @@ private void waitForRunningStatus() { if (ex != null) { throw new GatewaySenderException( LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1 - .toLocalizedString(new Object[] {this.getId(), ex.getMessage()}), + .toLocalizedString(new Object[] {this.sender.getId(), ex.getMessage()}), ex.getCause()); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java index 75ce63cbf103..3aa8534675e1 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java @@ -1577,8 +1577,7 @@ private class BatchRemovalThread extends Thread { * Constructor : Creates and initializes the thread */ public BatchRemovalThread(InternalCache c, ParallelGatewaySenderQueue queue) { - super("BatchRemovalThread"); - // TODO:REF: Name for this thread ? + super("BatchRemovalThread for GatewaySender_" + queue.sender.getId() + "_" + queue.index); this.setDaemon(true); this.cache = c; this.parallelQueue = queue; diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java index 6413e1cbd156..e7beb07c1965 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java @@ -221,7 +221,7 @@ private void waitForRunningStatus() { if (ex != null) { throw new GatewaySenderException( LocalizedStrings.Sender_COULD_NOT_START_GATEWAYSENDER_0_BECAUSE_OF_EXCEPTION_1 - .toLocalizedString(new Object[] {this.getId(), ex.getMessage()}), + .toLocalizedString(new Object[] {this.sender.getId(), ex.getMessage()}), ex.getCause()); } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java index 905086af5110..a81d5a59e89e 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java +++ b/geode-core/src/main/java/org/apache/geode/internal/i18n/LocalizedStrings.java @@ -2122,7 +2122,7 @@ public class LocalizedStrings { public static final StringId CacheClientProxy_EXCEPTION_OCCURRED_WHILE_TRYING_TO_CREATE_A_MESSAGE_QUEUE = new StringId(2297, "Exception occurred while trying to create a message queue."); public static final StringId GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1 = - new StringId(2298, "{0} : Could not connect. {1}"); + new StringId(2298, "{0} : Could not connect due to: {1}"); public static final StringId CacheCollector_UNABLE_TO_MIX_REGION_AND_ENTRY_SNAPSHOTS_IN_CACHECOLLECTOR = new StringId(2300, "Unable to mix region and entry snapshots in CacheCollector."); diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java index 8a86c09679a4..b132852d1030 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java @@ -23,6 +23,8 @@ import org.apache.geode.cache.wan.GatewaySender.OrderPolicy; import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Version; import org.apache.geode.management.cli.CliMetaData; import org.apache.geode.management.cli.ConverterHint; import org.apache.geode.management.cli.Result; @@ -116,6 +118,12 @@ public Result createGatewaySender(@CliOption(key = {CliStrings.GROUP, CliStrings Set membersToCreateGatewaySenderOn = getMembers(onGroups, onMember); + // Don't allow sender to be created if all members are not the current version. + if (!verifyAllCurrentVersion(membersToCreateGatewaySenderOn)) { + return ResultBuilder.createUserErrorResult( + CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS); + } + List gatewaySenderCreateResults = executeAndGetFunctionResult(GatewaySenderCreateFunction.INSTANCE, gatewaySenderFunctionArgs, membersToCreateGatewaySenderOn); @@ -139,6 +147,11 @@ public Result createGatewaySender(@CliOption(key = {CliStrings.GROUP, CliStrings return result; } + private boolean verifyAllCurrentVersion(Set members) { + return members.stream().allMatch( + member -> ((InternalDistributedMember) member).getVersionObject().equals(Version.CURRENT)); + } + public static class Interceptor extends AbstractCliAroundInterceptor { @Override public Result preExecution(GfshParseResult parseResult) { diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java index a1e9e3e9abcc..e7237d4db975 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/i18n/CliStrings.java @@ -2248,6 +2248,8 @@ public class CliStrings { "Could not instantiate class \"{0}\" specified for \"{1}\"."; public static final String CREATE_GATEWAYSENDER__MSG__COULD_NOT_ACCESS_CLASS_0_SPECIFIED_FOR_1 = "Could not access class \"{0}\" specified for \"{1}\"."; + public static final String CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS = + "Gateway Sender cannot be created until all members are the current version"; /* stop gateway-receiver */ public static final String START_GATEWAYSENDER = "start gateway-sender"; diff --git a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java index 122cf10428a0..b24792f7ea7a 100644 --- a/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java +++ b/geode-core/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; @@ -33,9 +34,13 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.InternalClusterConfigurationService; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.management.internal.cli.functions.CliFunctionResult; +import org.apache.geode.management.internal.cli.i18n.CliStrings; import org.apache.geode.management.internal.configuration.domain.XmlEntity; import org.apache.geode.test.junit.categories.UnitTest; import org.apache.geode.test.junit.rules.GfshParserRule; @@ -148,4 +153,22 @@ public void whenNoXml() { .hasNoFailToPersistError(); verify(ccService, never()).deleteXmlEntity(any(), any()); } + + @Test + public void whenMembersAreDifferentVersions() { + // Create a set of mixed version members + Set members = new HashSet<>(); + InternalDistributedMember currentVersionMember = mock(InternalDistributedMember.class); + doReturn(Version.CURRENT).when(currentVersionMember).getVersionObject(); + InternalDistributedMember oldVersionMember = mock(InternalDistributedMember.class); + doReturn(Version.GEODE_140).when(oldVersionMember).getVersionObject(); + members.add(currentVersionMember); + members.add(oldVersionMember); + doReturn(members).when(command).getMembers(any(), any()); + + // Verify executing the command fails + gfsh.executeAndAssertThat(command, + "create gateway-sender --id=1 --remote-distributed-system-id=1").statusIsError() + .containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS); + } } diff --git a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java index 782d7c0d2721..83576698f7b0 100644 --- a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java +++ b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java @@ -78,10 +78,7 @@ public GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor ev try { initializeConnection(); } catch (GatewaySenderException e) { - if (e.getCause() instanceof GemFireSecurityException) { - throw e; - } - + // It is ok to ignore this exception. It is logged in the initializeConnection call. } } @@ -168,7 +165,7 @@ public boolean dispatchBatch(List events, boolean removeFromQueueOnException, bo // if our pool is shutdown then just be silent } else if (t instanceof IOException || t instanceof ServerConnectivityException || t instanceof ConnectionDestroyedException || t instanceof MessageTooLargeException - || t instanceof IllegalStateException) { + || t instanceof IllegalStateException || t instanceof GemFireSecurityException) { this.processor.handleException(); // If the cause is an IOException or a ServerException, sleep and retry. // Sleep for a bit and recheck. @@ -431,58 +428,29 @@ private void initializeConnection() throws GatewaySenderException, GemFireSecuri } } } catch (ServerConnectivityException e) { - this.failedConnectCount++; - Throwable ex = null; + // Get the exception to throw + GatewaySenderException gse = getInitializeConnectionExceptionToThrow(e); - if (e.getCause() instanceof GemFireSecurityException) { - ex = e.getCause(); - if (logConnectionFailure()) { - // only log this message once; another msg is logged once we connect - logger.warn(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1, - new Object[] {this.processor.getSender().getId(), ex.getMessage()})); - } - throw new GatewaySenderException(ex); - } - List servers = this.sender.getProxy().getCurrentServers(); - String ioMsg = null; - if (servers.size() == 0) { - ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS - .toLocalizedString(); - } else { - final StringBuilder buffer = new StringBuilder(); - for (ServerLocation server : servers) { - String endpointName = String.valueOf(server); - if (buffer.length() > 0) { - buffer.append(", "); - } - buffer.append(endpointName); - } - ioMsg = - LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0 - .toLocalizedString(buffer.toString()); - } - ex = new IOException(ioMsg); - // Set the serverLocation to null so that a new connection can be - // obtained in next attempt + // Set the serverLocation to null so that a new connection can be obtained in next attempt this.sender.setServerLocation(null); - if (this.failedConnectCount == 1) { + + // Log the exception if necessary + if (logConnectionFailure()) { // only log this message once; another msg is logged once we connect logger.warn(LocalizedMessage.create( - LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT, - this.processor.getSender().getId())); - + LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1, + new Object[] {this.processor.getSender().getId(), gse.getCause().getMessage()})); } - // Wrap the IOException in a GatewayException so it can be processed the - // same as the other exceptions that might occur in sendBatch. - throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT - .toLocalizedString(this.processor.getSender().getId()), - ex); + + // Increment failed connection count + this.failedConnectCount++; + + // Throw the exception + throw gse; } if (this.failedConnectCount > 0) { - Object[] logArgs = new Object[] {this.processor.getSender().getId(), con, - Integer.valueOf(this.failedConnectCount)}; + Object[] logArgs = + new Object[] {this.processor.getSender().getId(), con, this.failedConnectCount}; logger.info(LocalizedMessage.create( LocalizedStrings.GatewayEventRemoteDispatcher_0_USING_1_AFTER_2_FAILED_CONNECT_ATTEMPTS, logArgs)); @@ -496,14 +464,47 @@ private void initializeConnection() throws GatewaySenderException, GemFireSecuri this.processor.checkIfPdxNeedsResend(this.connection.getQueueStatus().getPdxSize()); } catch (ConnectionDestroyedException e) { throw new GatewaySenderException( - LocalizedStrings.GatewayEventRemoteDispatcher__0___COULD_NOT_CONNECT - .toLocalizedString(this.processor.getSender().getId()), + LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1.toLocalizedString( + new Object[] {this.processor.getSender().getId(), e.getMessage()}), e); } finally { this.connectionLifeCycleLock.writeLock().unlock(); } } + private GatewaySenderException getInitializeConnectionExceptionToThrow( + ServerConnectivityException e) { + GatewaySenderException gse = null; + if (e.getCause() instanceof GemFireSecurityException) { + gse = new GatewaySenderException(e.getCause()); + } else { + List servers = this.sender.getProxy().getCurrentServers(); + String ioMsg; + if (servers.size() == 0) { + ioMsg = LocalizedStrings.GatewayEventRemoteDispatcher_THERE_ARE_NO_ACTIVE_SERVERS + .toLocalizedString(); + } else { + final StringBuilder buffer = new StringBuilder(); + for (ServerLocation server : servers) { + String endpointName = String.valueOf(server); + if (buffer.length() > 0) { + buffer.append(", "); + } + buffer.append(endpointName); + } + ioMsg = + LocalizedStrings.GatewayEventRemoteDispatcher_NO_AVAILABLE_CONNECTION_WAS_FOUND_BUT_THE_FOLLOWING_ACTIVE_SERVERS_EXIST_0 + .toLocalizedString(buffer.toString()); + } + IOException ex = new IOException(ioMsg); + gse = new GatewaySenderException( + LocalizedStrings.GatewayEventRemoteDispatcher_0_COULD_NOT_CONNECT_1.toLocalizedString( + new Object[] {this.processor.getSender().getId(), ex.getMessage()}), + ex); + } + return gse; + } + protected boolean logConnectionFailure() { // always log the first failure if (logger.isDebugEnabled() || this.failedConnectCount == 0) { diff --git a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java index e0b7dfd959e9..cf7c7b17a236 100644 --- a/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/cache/wan/WANRollingUpgradeDUnitTest.java @@ -14,19 +14,27 @@ */ package org.apache.geode.cache.wan; +import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.distributed.ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.JMX_MANAGER_START; +import static org.apache.geode.distributed.ConfigurationProperties.LOCATORS; +import static org.apache.geode.distributed.ConfigurationProperties.LOG_LEVEL; +import static org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT; +import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.logging.log4j.Logger; import org.awaitility.Awaitility; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -43,8 +51,6 @@ import org.apache.geode.cache.server.CacheServer; import org.apache.geode.cache.util.CacheListenerAdapter; import org.apache.geode.distributed.Locator; -import org.apache.geode.distributed.internal.DistributionConfig; -import org.apache.geode.distributed.internal.DistributionConfigImpl; import org.apache.geode.distributed.internal.InternalLocator; import org.apache.geode.internal.AvailablePort; import org.apache.geode.internal.AvailablePortHelper; @@ -52,7 +58,8 @@ import org.apache.geode.internal.cache.wan.parallel.BatchRemovalThreadHelper; import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderQueue; -import org.apache.geode.internal.logging.LogService; +import org.apache.geode.management.internal.cli.i18n.CliStrings; +import org.apache.geode.management.internal.cli.util.CommandStringBuilder; import org.apache.geode.test.dunit.DistributedTestUtils; import org.apache.geode.test.dunit.Host; import org.apache.geode.test.dunit.IgnoredException; @@ -64,6 +71,7 @@ import org.apache.geode.test.junit.categories.BackwardCompatibilityTest; import org.apache.geode.test.junit.categories.DistributedTest; import org.apache.geode.test.junit.categories.WanTest; +import org.apache.geode.test.junit.rules.GfshCommandRule; import org.apache.geode.test.junit.runners.CategoryWithParameterizedRunnerFactory; @SuppressWarnings("ConstantConditions") @@ -85,6 +93,9 @@ public static Collection data() { // the old version of Geode we're testing against private String oldVersion; + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + public WANRollingUpgradeDUnitTest(String version) { oldVersion = version; } @@ -500,7 +511,7 @@ public void testEventProcessingMixedSiteOneCurrentSiteTwo() throws Exception { VM site1Server2 = host.getVM(oldVersion, 2); VM site1Client = host.getVM(oldVersion, 3); - // Get old site members + // Get current site members VM site2Locator = host.getVM(VersionManager.CURRENT_VERSION, 4); VM site2Server1 = host.getVM(VersionManager.CURRENT_VERSION, 5); VM site2Server2 = host.getVM(VersionManager.CURRENT_VERSION, 6); @@ -512,7 +523,7 @@ public void testEventProcessingMixedSiteOneCurrentSiteTwo() throws Exception { final String site1Locators = hostName + "[" + site1LocatorPort + "]"; final int site1DistributedSystemId = 0; - // Get old site locator properties + // Get current site locator properties final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); final String site2Locators = hostName + "[" + site2LocatorPort + "]"; @@ -530,7 +541,7 @@ public void testEventProcessingMixedSiteOneCurrentSiteTwo() throws Exception { !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration() || InternalLocator.getLocator().isSharedConfigurationRunning()))); - // Start old site locator + // Start current site locator site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, site2Locators, site1Locators)); @@ -548,12 +559,12 @@ public void testEventProcessingMixedSiteOneCurrentSiteTwo() throws Exception { rollStartAndConfigureServerToCurrent(site1Server2, site1Locators, site2DistributedSystemId, regionName, site1SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); - // Start and configure old site servers + // Start and configure old current servers String site2SenderId = getName() + "_gatewaysender_" + site1DistributedSystemId; startAndConfigureServers(site2Server1, site2Server2, site2Locators, site1DistributedSystemId, regionName, site2SenderId, ParallelGatewaySenderQueue.DEFAULT_MESSAGE_SYNC_INTERVAL); - // Do puts from mixed site client and verify events on old site + // Do puts from mixed site client and verify events on current site int numPuts = 100; doClientPutsAndVerifyEvents(site1Client, site1Server1, site1Server2, site2Server1, site2Server2, hostName, site1LocatorPort, regionName, numPuts, site1SenderId, false); @@ -561,15 +572,31 @@ public void testEventProcessingMixedSiteOneCurrentSiteTwo() throws Exception { private void startLocator(int port, int distributedSystemId, String locators, String remoteLocators) throws IOException { - Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); - props.setProperty(DistributionConfig.DISTRIBUTED_SYSTEM_ID_NAME, - String.valueOf(distributedSystemId)); - props.setProperty(DistributionConfig.LOCATORS_NAME, locators); - props.setProperty(DistributionConfig.REMOTE_LOCATORS_NAME, remoteLocators); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel); - props.setProperty(DistributionConfig.ENABLE_CLUSTER_CONFIGURATION_NAME, "false"); + Properties props = getLocatorProperties(distributedSystemId, locators, remoteLocators); + Locator.startLocatorAndDS(port, null, props); + } + + private int startLocatorWithJmxManager(int port, int distributedSystemId, String locators, + String remoteLocators) throws IOException { + Properties props = getLocatorProperties(distributedSystemId, locators, remoteLocators); + int jmxPort = AvailablePortHelper.getRandomAvailableTCPPort(); + props.put(JMX_MANAGER_PORT, String.valueOf(jmxPort)); + props.put(JMX_MANAGER, "true"); + props.put(JMX_MANAGER_START, "true"); Locator.startLocatorAndDS(port, null, props); + return jmxPort; + } + + private Properties getLocatorProperties(int distributedSystemId, String locators, + String remoteLocators) { + Properties props = new Properties(); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(DISTRIBUTED_SYSTEM_ID, String.valueOf(distributedSystemId)); + props.setProperty(LOCATORS, locators); + props.setProperty(REMOTE_LOCATORS, remoteLocators); + props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel); + props.setProperty(ENABLE_CLUSTER_CONFIGURATION, "false"); + return props; } private void stopLocator() throws Exception { @@ -707,11 +734,92 @@ public void testVerifyGatewayReceiverDoesNotSendRemoveCacheServerProfileToMember } } + @Test + public void testCreateGatewaySenderMixedSiteOneCurrentSiteTwo() throws Exception { + final Host host = Host.getHost(0); + + // Get mixed site members + VM site1Locator = host.getVM(oldVersion, 0); + VM site1Server1 = host.getVM(oldVersion, 1); + VM site1Server2 = host.getVM(oldVersion, 2); + + // Get current site members + VM site2Locator = host.getVM(VersionManager.CURRENT_VERSION, 4); + VM site2Server1 = host.getVM(VersionManager.CURRENT_VERSION, 5); + VM site2Server2 = host.getVM(VersionManager.CURRENT_VERSION, 6); + + // Get mixed site locator properties + String hostName = NetworkUtils.getServerHostName(host); + final int site1LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site1LocatorPort); + final String site1Locators = hostName + "[" + site1LocatorPort + "]"; + final int site1DistributedSystemId = 0; + + // Get current site locator properties + final int site2LocatorPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET); + DistributedTestUtils.deleteLocatorStateFile(site2LocatorPort); + final String site2Locators = hostName + "[" + site2LocatorPort + "]"; + final int site2DistributedSystemId = 1; + + // Start mixed site locator + site1Locator.invoke(() -> startLocator(site1LocatorPort, site1DistributedSystemId, + site1Locators, site2Locators)); + + // Locators before 1.4 handled configuration asynchronously. + // We must wait for configuration configuration to be ready, or confirm that it is disabled. + site1Locator.invoke( + () -> Awaitility.await().atMost(65, TimeUnit.SECONDS).pollInterval(1, TimeUnit.SECONDS) + .until(() -> assertTrue( + !InternalLocator.getLocator().getConfig().getEnableClusterConfiguration() + || InternalLocator.getLocator().isSharedConfigurationRunning()))); + + // Start current site locator + site2Locator.invoke(() -> startLocator(site2LocatorPort, site2DistributedSystemId, + site2Locators, site1Locators)); + + // Start current site servers with receivers + site2Server1.invoke(() -> createCache(site2Locators)); + site2Server1.invoke(() -> createGatewayReceiver()); + site2Server2.invoke(() -> createCache(site2Locators)); + site2Server2.invoke(() -> createGatewayReceiver()); + + // Start mixed site servers + site1Server1.invoke(() -> createCache(site1Locators)); + site1Server2.invoke(() -> createCache(site1Locators)); + + // Roll mixed site locator to current with jmx manager + site1Locator.invoke(() -> stopLocator()); + VM site1RolledLocator = host.getVM(VersionManager.CURRENT_VERSION, site1Locator.getId()); + int jmxManagerPort = + site1RolledLocator.invoke(() -> startLocatorWithJmxManager(site1LocatorPort, + site1DistributedSystemId, site1Locators, site2Locators)); + + // Roll one mixed site server to current + site1Server2.invoke(() -> closeCache()); + VM site1Server2RolledServer = host.getVM(VersionManager.CURRENT_VERSION, site1Server2.getId()); + site1Server2RolledServer.invoke(() -> createCache(site1Locators)); + + // Use gfsh to attempt to create a gateway sender in the mixed site servers + this.gfsh.connectAndVerify(jmxManagerPort, GfshCommandRule.PortType.jmxManager); + this.gfsh + .executeAndAssertThat(getCreateGatewaySenderCommand("toSite2", site2DistributedSystemId)) + .statusIsError() + .containsOutput(CliStrings.CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS); + } + + private String getCreateGatewaySenderCommand(String id, int remoteDsId) { + CommandStringBuilder csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER); + csb.addOption(CliStrings.CREATE_GATEWAYSENDER__ID, id); + csb.addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, + String.valueOf(remoteDsId)); + return csb.toString(); + } + private void createCache(String locators) { Properties props = new Properties(); - props.setProperty(DistributionConfig.MCAST_PORT_NAME, "0"); - props.setProperty(DistributionConfig.LOCATORS_NAME, locators); - props.setProperty(DistributionConfig.LOG_LEVEL_NAME, DUnitLauncher.logLevel); + props.setProperty(MCAST_PORT, "0"); + props.setProperty(LOCATORS, locators); + props.setProperty(LOG_LEVEL, DUnitLauncher.logLevel); getCache(props); } diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java index 993133cfc2d8..226595b0bef0 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -36,6 +36,7 @@ import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; import static org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR; import static org.apache.geode.test.dunit.Host.getHost; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -3295,54 +3296,35 @@ public static void waitForConcurrentSerialSenderQueueToDrain(String senderId) { * @param senderId */ public static void verifySenderPausedState(String senderId) { - Set senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = cache.getGatewaySender(senderId); assertTrue(sender.isPaused()); } public static void verifySenderResumedState(String senderId) { - Set senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = cache.getGatewaySender(senderId); assertFalse(sender.isPaused()); assertTrue(sender.isRunning()); } public static void verifySenderStoppedState(String senderId) { - Set senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = cache.getGatewaySender(senderId); assertFalse(sender.isRunning()); } public static void verifySenderRunningState(String senderId) { - Set senders = cache.getGatewaySenders(); - GatewaySender sender = null; - for (GatewaySender s : senders) { - if (s.getId().equals(senderId)) { - sender = s; - break; - } - } + GatewaySender sender = cache.getGatewaySender(senderId); assertTrue(sender.isRunning()); } + public static void verifySenderConnectedState(String senderId, boolean shouldBeConnected) { + AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); + if (shouldBeConnected) { + assertThat(sender.getEventProcessor().getDispatcher().isConnectedToRemote()).isTrue(); + } else { + assertThat(sender.getEventProcessor().getDispatcher().isConnectedToRemote()).isFalse(); + } + } + public static void verifyPool(String senderId, boolean poolShouldExist, int expectedPoolLocatorsSize) { AbstractGatewaySender sender = (AbstractGatewaySender) cache.getGatewaySender(senderId); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java index 6d77fbb037c6..4f183632830f 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.java @@ -223,15 +223,14 @@ public void testWanAuthInvalidCredentials() { () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap())); logger.info("Created RR in vm3"); - try { - vm2.invoke(() -> WANTestBase.startSender("ln")); - fail( - "Authentication Failed: While starting the sender, an exception should have been thrown"); - } catch (Exception e) { - if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) { - fail("Authentication is not working as expected", e); - } - } + // Start sender + vm2.invoke(() -> WANTestBase.startSender("ln")); + + // Verify the sender is started + vm2.invoke(() -> verifySenderRunningState("ln")); + + // Verify the sender is not connected + vm2.invoke(() -> verifySenderConnectedState("ln", false)); } /** @@ -258,28 +257,61 @@ public void testWanSecurityManagerWithInvalidCredentials() { vm3.invoke(() -> NewWanAuthenticationDUnitTest.createSecuredCache(props2, null, nyPort)); logger.info("Created secured cache in vm3"); - vm2.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + String senderId = "ln"; + vm2.invoke( + () -> WANTestBase.createSender(senderId, 2, false, 100, 10, false, false, null, true)); logger.info("Created sender in vm2"); vm3.invoke(() -> createReceiverInSecuredCache()); logger.info("Created receiver in vm3"); - vm2.invoke( - () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap())); + String regionName = getTestMethodName() + "_RR"; + vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, senderId, isOffHeap())); logger.info("Created RR in vm2"); - vm3.invoke( - () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap())); + vm3.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); logger.info("Created RR in vm3"); - try { - vm2.invoke(() -> WANTestBase.startSender("ln")); - fail( - "Authentication Failed: While starting the sender, an exception should have been thrown"); - } catch (Exception e) { - if (!(e.getCause().getCause() instanceof AuthenticationFailedException)) { - fail("Authentication is not working as expected", e); - } - } + // Start sender + vm2.invoke(() -> WANTestBase.startSender(senderId)); + + // Verify the sender is started + vm2.invoke(() -> verifySenderRunningState(senderId)); + + // Verify the sender is not connected + vm2.invoke(() -> verifySenderConnectedState(senderId, false)); + + // Do some puts in the sender + int numPuts = 10; + vm2.invoke(() -> WANTestBase.doPuts(regionName, numPuts)); + + // Verify the sender is still started + vm2.invoke(() -> verifySenderRunningState(senderId)); + + // Verify the sender is still not connected + vm2.invoke(() -> verifySenderConnectedState(senderId, false)); + + // Verify the sender queue size + vm2.invoke(() -> testQueueSize(senderId, numPuts)); + + // Stop the receiver + vm3.invoke(() -> closeCache()); + + // Restart the receiver with a SecurityManager that accepts the existing sender's username and + // password. The + // NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json. + // file contains the admin user definition that the SecurityManager will accept. + String securityJsonRersource = "org/apache/geode/internal/cache/wan/misc/" + + getClass().getSimpleName() + "." + getTestMethodName() + ".security.json"; + Properties propsRestart = buildSecurityProperties("guest", "guest", securityJsonRersource); + vm3.invoke(() -> createSecuredCache(propsRestart, null, nyPort)); + vm3.invoke(() -> createReplicatedRegion(regionName, null, isOffHeap())); + vm3.invoke(() -> createReceiverInSecuredCache()); + + // Wait for the queue to drain + vm2.invoke(() -> checkQueueSize(senderId, 0)); + + // Verify region size on receiver + vm3.invoke(() -> validateRegionSize(regionName, numPuts)); } private static Properties buildProperties(String clientauthenticator, String clientAuthInit, @@ -304,9 +336,15 @@ private static Properties buildProperties(String clientauthenticator, String cli } private static Properties buildSecurityProperties(String username, String password) { + return buildSecurityProperties(username, password, + "org/apache/geode/security/templates/security.json"); + } + + private static Properties buildSecurityProperties(String username, String password, + String securityJsonResource) { Properties props = new Properties(); props.put(SECURITY_MANAGER, TestSecurityManager.class.getName()); - props.put("security-json", "org/apache/geode/security/templates/security.json"); + props.put("security-json", securityJsonResource); props.put(SECURITY_CLIENT_AUTH_INIT, UserPasswdAI.class.getName()); props.put("security-username", username); props.put("security-password", password); diff --git a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java index 8f3d53f30b91..a1976b16291f 100644 --- a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java +++ b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/misc/WANSSLDUnitTest.java @@ -21,13 +21,11 @@ import org.apache.geode.cache.Region; import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.internal.net.SocketCreatorFactory; import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.dunit.WaitCriterion; -import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase; -import org.apache.geode.test.dunit.internal.JUnit4DistributedTestCase; import org.apache.geode.test.junit.categories.DistributedTest; -import org.apache.geode.test.junit.categories.FlakyTest; import org.apache.geode.test.junit.categories.WanTest; @Category({DistributedTest.class, WanTest.class}) @@ -66,31 +64,58 @@ public void testSenderNoSSLReceiverSSL() { IgnoredException.addIgnoredException("Unexpected IOException"); IgnoredException.addIgnoredException("SSL Error"); IgnoredException.addIgnoredException("Unrecognized SSL message"); - try { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - vm2.invoke(() -> WANTestBase.createReceiverWithSSL(nyPort)); + Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - vm4.invoke(() -> WANTestBase.createCache(lnPort)); + vm2.invoke(() -> WANTestBase.createReceiverWithSSL(nyPort)); - vm4.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createCache(lnPort)); - vm2.invoke( - () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", null, isOffHeap())); + String senderId = "ln"; + vm4.invoke( + () -> WANTestBase.createSender(senderId, 2, false, 100, 10, false, false, null, true)); - vm4.invoke(() -> WANTestBase.startSender("ln")); + String regionName = getTestMethodName() + "_RR"; + vm2.invoke(() -> WANTestBase.createReplicatedRegion(regionName, null, isOffHeap())); - vm4.invoke( - () -> WANTestBase.createReplicatedRegion(getTestMethodName() + "_RR", "ln", isOffHeap())); + vm4.invoke(() -> WANTestBase.startSender(senderId)); - vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000)); + // Verify the sender is started + vm4.invoke(() -> verifySenderRunningState(senderId)); - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 1000)); - fail("Expected exception as only Receiver is SSL enabled. Not Sender"); - } catch (Exception e) { - assertTrue(e.getCause().getMessage().contains("Server expecting SSL connection")); - } + // Verify the sender is not connected + vm4.invoke(() -> verifySenderConnectedState(senderId, false)); + + vm4.invoke(() -> WANTestBase.createReplicatedRegion(regionName, "ln", isOffHeap())); + + // Do some puts in the sender + int numPuts = 10; + vm4.invoke(() -> WANTestBase.doPuts(regionName, numPuts)); + + // Verify the sender is still started + vm4.invoke(() -> verifySenderRunningState(senderId)); + + // Verify the sender is still not connected + vm4.invoke(() -> verifySenderConnectedState(senderId, false)); + + // Verify the sender queue size + vm4.invoke(() -> testQueueSize(senderId, numPuts)); + + // Stop the receiver + vm2.invoke(() -> closeCache()); + vm2.invoke(() -> closeSocketCreatorFactory()); + + // Restart the receiver with SSL disabled + createCacheInVMs(nyPort, vm2); + vm2.invoke(() -> createReplicatedRegion(regionName, null, isOffHeap())); + vm2.invoke(() -> createReceiver()); + + // Wait for the queue to drain + vm4.invoke(() -> checkQueueSize(senderId, 0)); + + // Verify region size on receiver + vm2.invoke(() -> validateRegionSize(regionName, numPuts)); } @Test @@ -144,4 +169,8 @@ public String description() { } return false; } + + private void closeSocketCreatorFactory() { + SocketCreatorFactory.close(); + } } diff --git a/geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json b/geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json new file mode 100644 index 000000000000..8fa76d8226b7 --- /dev/null +++ b/geode-wan/src/test/resources/org/apache/geode/internal/cache/wan/misc/NewWanAuthenticationDUnitTest.testWanSecurityManagerWithInvalidCredentials.security.json @@ -0,0 +1,18 @@ +{ + "roles": [ + { + "name": "admin", + "operationsAllowed": [ + "CLUSTER:MANAGE", + "DATA:MANAGE" + ] + } + ], + "users": [ + { + "name": "admin", + "password": "wrongPswd", + "roles": ["admin"] + } + ] +}