Skip to content

Commit

Permalink
GEODE-8745: Closing the queue region when senders are stopped (apache…
Browse files Browse the repository at this point in the history
…#5770)

* cleanQueues are applicable only while using persistence
	* when we are closing the region, the disk files are not deleted.
	* hence the values will still be maintained on restart.
	* Advantage will be that we are creating the queue region and its cache listener together
	* previously the region was not closed and cache listener was attached using mutators.
	* this caused secondary events to be missed before the cache listener is activated
  • Loading branch information
nabarunnag authored Dec 1, 2020
1 parent 1bca728 commit 888e473
Show file tree
Hide file tree
Showing 7 changed files with 33 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void stop() {
InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);
this.eventProcessor = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ protected void processQueue() {

for (;;) {
if (stopped()) {
this.resetLastPeekedEvents = true;
break;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,9 @@ public SerialGatewaySenderEventProcessor(AbstractGatewaySender sender, String id
ThreadsMonitoring tMonitoring, boolean cleanQueues) {
super("Event Processor for GatewaySender_" + id, sender, tMonitoring);

initializeMessageQueue(id, cleanQueues);
this.unprocessedEvents = new LinkedHashMap<EventID, EventWrapper>();
this.unprocessedTokens = new LinkedHashMap<EventID, Long>();

initializeMessageQueue(id, cleanQueues);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.TimeoutException;
Expand Down Expand Up @@ -1163,7 +1164,14 @@ public boolean isRemovalThreadAlive() {

@Override
public void close() {
removeCacheListener();
Region r = getRegion();
if (r != null && !r.isDestroyed()) {
try {
r.close();
} catch (RegionDestroyedException e) {
}
}

}

private class BatchRemovalThread extends Thread {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ public void testStopSerialGatewaySender() throws Exception {
vm4.invoke(() -> validateSenderStoppedState("ln"));
vm5.invoke(() -> validateSenderStoppedState("ln"));

vm4.invoke(() -> validateQueueSizeStat("ln", 20));
vm5.invoke(() -> validateQueueSizeStat("ln", 20));
vm4.invoke(() -> validateQueueSizeStat("ln", 0));
vm5.invoke(() -> validateQueueSizeStat("ln", 0));
/*
* Should have no effect on GatewaySenderState
*/
Expand Down Expand Up @@ -390,6 +390,7 @@ public void testRestartSerialGatewaySendersWhilePutting() throws Exception {
vm4.invoke(() -> validateQueueSizeStat("ln", 0));
vm5.invoke(() -> validateQueueSizeStat("ln", 0));


// do a lot of puts while senders are restarting
AsyncInvocation doPutsInVm7 = vm7.invokeAsync(() -> doPuts(className + "_RR", 5000));

Expand Down Expand Up @@ -619,20 +620,20 @@ public void test_Bug44153_StopOneSender_StartAnotherSender_CheckQueueSize() {

vm5.invoke(() -> doPuts(className + "_RR", 10, 110));

vm5.invoke(() -> validateQueueContents("ln", 110));
vm5.invoke(() -> validateQueueContents("ln", 100));
vm5.invoke(() -> stopSender("ln"));
vm5.invoke(() -> validateSenderStoppedState("ln"));

vm4.invoke(() -> startSender("ln"));
vm4.invoke(() -> validateQueueContents("ln", 110));
vm4.invoke(() -> validateQueueContents("ln", 10));
vm4.invoke(() -> stopSender("ln"));

vm5.invoke(() -> startSender("ln"));
vm2.invoke(() -> createCache(nyPort));
vm2.invoke(() -> createReplicatedRegion(className + "_RR", null));
vm2.invoke(() -> createReceiver());

vm2.invoke(() -> validateRegionSize(className + "_RR", 110));
vm2.invoke(() -> validateRegionSize(className + "_RR", 100));
vm5.invoke(() -> stopSender("ln"));

vm4.invoke(() -> startSender("ln"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ public void testReplicatedRegionPersistentWanGateway_restartSenderWithCleanQueue
Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort));

createCacheInVMs(nyPort, vm2, vm3);
createReceiverInVMs(vm2, vm3);

createCacheInVMs(lnPort, vm4, vm5, vm6, vm7);

String firstDStore = (String) vm4.invoke(() -> WANTestBase.createSenderWithDiskStore("ln", 2,
Expand All @@ -603,27 +605,32 @@ public void testReplicatedRegionPersistentWanGateway_restartSenderWithCleanQueue
vm7.invoke(() -> WANTestBase.createPersistentReplicatedRegion(getTestMethodName() + "_RR", "ln",
isOffHeap()));

vm4.invoke(() -> WANTestBase.pauseSender("ln"));
vm5.invoke(() -> WANTestBase.pauseSender("ln"));

vm4.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_RR", 1000));

logger.info("Completed puts in the region");

vm4.invoke(() -> WANTestBase.stopSender("ln"));
vm5.invoke(() -> WANTestBase.stopSender("ln"));

logger.info("Stopped all the senders. ");

// Create receiver on remote site
createReceiverInVMs(vm2, vm3);
logger.info("Stopped all the senders. ");

vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_RR", 0));
AsyncInvocation inv1 = vm4.invokeAsync(() -> WANTestBase.startSenderwithCleanQueues("ln"));
logger.info("Started the sender in vm 4");

vm4.invoke(() -> WANTestBase.startSenderwithCleanQueues("ln"));
vm5.invoke(() -> WANTestBase.startSenderwithCleanQueues("ln"));
logger.info("Started the sender in vm 5");
try {
inv1.join();
} catch (InterruptedException e) {
fail("Got interrupted exception while waiting for startSender to finish.");
}

vm4.invoke(() -> waitForSenderRunningState("ln"));
vm5.invoke(() -> waitForSenderRunningState("ln"));
logger.info("Started all senders.");

vm4.invoke(() -> checkQueueSize("ln", 0));
vm5.invoke(() -> checkQueueSize("ln", 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ public void stop() {
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);

// this.eventProcessor = null;
this.eventProcessor = null;
}

@Override
Expand Down

0 comments on commit 888e473

Please sign in to comment.