Skip to content

Commit

Permalink
GEODE-10267: fix creating gw sender with non-existent disk store (apa…
Browse files Browse the repository at this point in the history
…che#7643)

* GEODE-10267: fix creating gw sender with non-existent disk store
  • Loading branch information
mkevo authored Jun 13, 2022
1 parent a10c4f8 commit d7ff22c
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ public void testGatewayReceiver() throws Exception {
public void testParallelGatewaySender() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.create("LNSender");

GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
gatewaySenderFactory.setParallel(true);
Expand Down Expand Up @@ -163,6 +165,9 @@ public void testParallelGatewaySender() throws Exception {
public void testSerialGatewaySender() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.create("LNSender");

GatewaySenderFactory gatewaySenderFactory = cache.createGatewaySenderFactory();
gatewaySenderFactory.setParallel(false);
gatewaySenderFactory.setManualStart(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public void testAsyncEventQueueWithSubstitutionFilter() throws Exception {
public void testGatewaySenderWithSubstitutionFilter() throws Exception {
getSystem();
CacheCreation cache = new CacheCreation();
DiskStoreFactory dsf = cache.createDiskStoreFactory();
dsf.create(DiskStoreFactory.DEFAULT_DISK_STORE_NAME);

// Create a GatewaySender with GatewayEventSubstitutionFilter.
// Don't start the sender to avoid 'Locators must be configured before starting gateway-sender'
Expand All @@ -113,6 +115,7 @@ public void testGatewaySenderWithSubstitutionFilter() throws Exception {
GatewaySenderFactory factory = cache.createGatewaySenderFactory();
factory.setManualStart(true);
factory.setGatewayEventSubstitutionFilter(new MyGatewayEventSubstitutionFilter());
factory.setDiskStoreName(DiskStoreFactory.DEFAULT_DISK_STORE_NAME);
GatewaySender sender = factory.create(id, 2);

// Verify the GatewayEventSubstitutionFilter is set on the GatewaySender.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderAttributes;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderDoesNotExist;
import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
import static org.apache.geode.test.dunit.IgnoredException.addIgnoredException;
import static org.assertj.core.api.Assertions.assertThat;

import java.io.Serializable;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class CreateDestroyGatewaySenderCommandDUnitTest implements Serializable
private static MemberVM server2;
private static MemberVM server3;

String nonExistingDiskStore = "nonExistingDiskStore";

@BeforeClass
public static void beforeClass() {
Properties props = new Properties();
Expand Down Expand Up @@ -436,4 +439,41 @@ public void testCreateDestroyParallelGatewaySenderWithDispatcherThreads() {
server3);
}

@Test
public void testCreateParallelGatewaySenderWithNonExistingDiskStore() {
addIgnoredException(IllegalStateException.class);

String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID
+ "=ny" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --"
+ CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=true" + " --"
+ CliStrings.CREATE_GATEWAYSENDER__DISKSTORENAME + "=" + nonExistingDiskStore;

gfsh.executeAndAssertThat(command).statusIsError()
.hasTableSection().hasRowSize(3).hasColumn("Message").contains(
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found");

gfsh.executeAndAssertThat("list gateways").statusIsSuccess().containsOutput(
"GatewaySenders or GatewayReceivers are not available in cluster");
}

@Test
public void testCreateSerialGatewaySenderWithNonExistingDiskStore() {
addIgnoredException(IllegalStateException.class);

String command = CliStrings.CREATE_GATEWAYSENDER + " --" + CliStrings.CREATE_GATEWAYSENDER__ID
+ "=ny" + " --" + CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID + "=2" + " --"
+ CliStrings.CREATE_GATEWAYSENDER__PARALLEL + "=false" + " --"
+ CliStrings.CREATE_GATEWAYSENDER__DISKSTORENAME + "=" + nonExistingDiskStore;

gfsh.executeAndAssertThat(command).statusIsError()
.hasTableSection().hasRowSize(3).hasColumn("Message").contains(
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found",
" java.lang.IllegalStateException: Disk store " + nonExistingDiskStore + " not found");

gfsh.executeAndAssertThat("list gateways").statusIsSuccess().containsOutput(
"GatewaySenders or GatewayReceivers are not available in cluster");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,43 @@ public void test_GatewaySender_Serial_ZERO_DispatcherThread() {
}
}

@Test
public void test_GatewaySender_Serial_NonExistingDiskStore() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setManualStart(true);
fact.setDiskStoreName("FORNY");
try {
GatewaySender sender1 = fact.create("NYSender", 2);
fail("Expected IllegalStateException but not thrown");
} catch (Exception e) {
if (e instanceof IllegalStateException
&& e.getMessage().contains("Disk store FORNY not found")) {
} else {
fail("Expected IllegalStateException but received :" + e);
}
}
}

@Test
public void test_GatewaySender_Parallel_NonExistingDiskStore() {
cache = new CacheFactory().set(MCAST_PORT, "0").create();
GatewaySenderFactory fact = cache.createGatewaySenderFactory();
fact.setManualStart(true);
fact.setParallel(true);
fact.setDiskStoreName("FORNY");
try {
GatewaySender sender1 = fact.create("NYSender", 2);
fail("Expected IllegalStateException but not thrown");
} catch (Exception e) {
if (e instanceof IllegalStateException
&& e.getMessage().contains("Disk store FORNY not found")) {
} else {
fail("Expected IllegalStateException but received :" + e);
}
}
}

/**
* Test to validate the gateway receiver attributes are correctly set
*/
Expand Down Expand Up @@ -303,7 +340,6 @@ public void test_ValidateSerialGatewaySenderAttributes() {
fact.setBatchSize(200);
fact.setBatchTimeInterval(300);
fact.setPersistenceEnabled(false);
fact.setDiskStoreName("FORNY");
fact.setMaximumQueueMemory(200);
fact.setAlertThreshold(1200);
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
Expand All @@ -327,7 +363,6 @@ public void test_ValidateSerialGatewaySenderAttributes() {
assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
assertEquals(sender1.getGatewayEventFilters().size(),
Expand All @@ -350,7 +385,6 @@ public void test_ValidateParallelGatewaySenderAttributes() {
fact.setBatchSize(200);
fact.setBatchTimeInterval(300);
fact.setPersistenceEnabled(false);
fact.setDiskStoreName("FORNY");
fact.setMaximumQueueMemory(200);
fact.setAlertThreshold(1200);
GatewayEventFilter myEventFilter1 = new MyGatewayEventFilter1();
Expand All @@ -374,7 +408,6 @@ public void test_ValidateParallelGatewaySenderAttributes() {
assertEquals(sender1.getBatchSize(), gatewaySender.getBatchSize());
assertEquals(sender1.getBatchTimeInterval(), gatewaySender.getBatchTimeInterval());
assertEquals(sender1.isPersistenceEnabled(), gatewaySender.isPersistenceEnabled());
assertEquals(sender1.getDiskStoreName(), gatewaySender.getDiskStoreName());
assertEquals(sender1.getMaximumQueueMemory(), gatewaySender.getMaximumQueueMemory());
assertEquals(sender1.getAlertThreshold(), gatewaySender.getAlertThreshold());
assertEquals(sender1.getGatewayEventFilters().size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ public GatewaySender create(String id, int remoteDSId) {
String.format("GatewaySender %s can not be created with dispatcher threads less than 1",
id));
}

// Verify socket read timeout if a proper logger is available
if (cache instanceof GemFireCacheImpl) {
// If socket read timeout is less than the minimum, log a warning.
Expand All @@ -254,6 +253,13 @@ public GatewaySender create(String id, int remoteDSId) {
attrs.setSocketReadTimeout(GatewaySender.MINIMUM_SOCKET_READ_TIMEOUT);
}

if (attrs.getDiskStoreName() != null
&& cache.findDiskStore(attrs.getDiskStoreName()) == null) {
throw new IllegalStateException(
String.format("Disk store %s not found",
attrs.getDiskStoreName()));
}

// Log a warning if the old system property is set.
if (GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY_CHECKED.compareAndSet(false, true)) {
if (System.getProperty(GatewaySender.GATEWAY_CONNECTION_READ_TIMEOUT_PROPERTY) != null) {
Expand Down

0 comments on commit d7ff22c

Please sign in to comment.