From 169ca6add2887c6560f30a5ad12ceb088411e973 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Tue, 29 Sep 2020 08:07:41 +0200 Subject: [PATCH] =?UTF-8?q?GEODE-8491:=20Do=20not=20store=20dropped=20even?= =?UTF-8?q?ts=20in=20stopped=20primary=20gateway=20se=E2=80=A6=20(#5509)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * GEODE-8491: Do not store dropped events in stopped primary gateway sender when possible Instead of storing dropped events in tmpDroppedEvents to later send batch removal messages when the primary gateway sender is not started, try to send the batch removal message when the event to be dropped is received. That way, when the sender is stopped for a long time and there are events coming, the memory of the AbstractGatewaySender will not grow with entries in the tmpDroppedEvents member. --- .../cache/wan/AbstractGatewaySender.java | 18 +- .../geode/internal/cache/wan/WANTestBase.java | 14 + ...rallelWANPropagationLoopBackDUnitTest.java | 310 +++++++++++++-- ...SerialWANPropagationsFeatureDUnitTest.java | 359 +++++++++++++++++- 4 files changed, 641 insertions(+), 60 deletions(-) diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java index 396edca30cd3..071974d22ec0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java @@ -1034,10 +1034,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, // If this gateway is not running, return if (!isRunning()) { if (this.isPrimary()) { - tmpDroppedEvents.add(clonedEvent); - if (isDebugEnabled) { - logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent); - } + recordDroppedEvent(clonedEvent); } if (isDebugEnabled) { logger.debug("Returning back without putting into the gateway sender queue:" + event); @@ -1118,6 +1115,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event, } } + private void recordDroppedEvent(EntryEventImpl event) { + if (this.eventProcessor != null) { + this.eventProcessor.registerEventDroppedInPrimaryQueue(event); + } else { + tmpDroppedEvents.add(event); + if (logger.isDebugEnabled()) { + logger.debug("added to tmpDroppedEvents event: {}", event); + } + } + } + @VisibleForTesting int getTmpDroppedEventSize() { return tmpDroppedEvents.size(); @@ -1138,7 +1146,7 @@ int getTmpDroppedEventSize() { public void enqueueTempEvents() { if (this.eventProcessor != null) {// Fix for defect #47308 // process tmpDroppedEvents - EntryEventImpl droppedEvent = null; + EntryEventImpl droppedEvent; while ((droppedEvent = tmpDroppedEvents.poll()) != null) { this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent); } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java index e0e439ae70cf..883b31398eac 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java @@ -1646,6 +1646,20 @@ public static void resumeSender(String senderId) { } } + public static void stopSenderInVMsAsync(String senderId, VM... vms) { + List> tasks = new LinkedList<>(); + for (VM vm : vms) { + tasks.add(vm.invokeAsync(() -> stopSender(senderId))); + } + for (AsyncInvocation invocation : tasks) { + try { + invocation.await(); + } catch (InterruptedException e) { + fail("Stopping senders was interrupted"); + } + } + } + public static void stopSender(String senderId) { final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect"); IgnoredException exp = diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java index bd25e8792b2c..62c57a301e82 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java @@ -18,6 +18,7 @@ import org.junit.experimental.categories.Category; import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.dunit.Wait; import org.apache.geode.test.junit.categories.WanTest; @@ -36,8 +37,8 @@ public ParallelWANPropagationLoopBackDUnitTest() { */ @Test public void testParallelPropagationLoopBack() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); // create receiver on site1 and site2 createCacheInVMs(lnPort, vm2, vm4, vm5); @@ -137,9 +138,9 @@ public void testParallelPropagationLoopBack() { @Test public void testParallelPropagationLoopBack3Sites() { // Create locators - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - Integer tkPort = (Integer) vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer tkPort = vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); // create cache and receivers on all the 3 sites createCacheInVMs(lnPort, vm3, vm6); @@ -226,9 +227,9 @@ public void testParallelPropagationLoopBack3Sites() { */ @Test public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); - Integer tkPort = (Integer) vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer tkPort = vm2.invoke(() -> WANTestBase.createFirstRemoteLocator(3, lnPort)); createCacheInVMs(lnPort, vm3, vm6); createCacheInVMs(nyPort, vm4, vm7); @@ -369,9 +370,9 @@ public void testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() { * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally */ @Test - public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); // create receiver on site-ln and site-ny createCacheInVMs(lnPort, vm2, vm4); @@ -379,60 +380,301 @@ public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() throws Exc createCacheInVMs(nyPort, vm3, vm5); createReceiverInVMs(vm3, vm5); - // create senders on site-ln, Note: sender-id is its destination, i.e. ny + // create senders on site-ny, Note: sender-id is its destination, i.e. ny vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); - // create senders on site-ny, Note: sender-id is its destination, i.e. ln + // create senders on site-ln, Note: sender-id is its destination, i.e. ln vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); - // create PR on site-ln + // create PR on site-ny vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, isOffHeap())); - // create PR on site-ny + // create PR on site-ln vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, isOffHeap())); - // start sender on site-ln + // start sender on site-ny startSenderInVMs("ny", vm2, vm4); - // Do 100 puts on site-ln - vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 100)); - // verify site-ny received the 100 events + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - // verify tmpDroppedEvents should be 0 at site-ny - vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); - - // do next 100 puts on site-ny - vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 200)); - - // verify site-ny have 200 entries - vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 200)); - - // verify tmpDroppedEvents should be 100 at site-ny, because the sender is not started yet + // verify tmpDroppedEvents should be 100 at site-ln, because the sender is not started yet vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 100)); // verify site-ln has not received the events from site-ny yet - vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); - vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); - // start sender on site-ny + // start sender on site-ln startSenderInVMsAsync("ln", vm3, vm5); // verify tmpDroppedEvents should be 0 now at site-ny vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.await(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); } + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.await(); + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedSenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + stopSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify tmpDroppedEvents is 0 at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 1000)); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + + // verify the secondary's queues are drained at site-ny + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped primary gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop one instance of the sender in NY simultaneously + * Start the stopped instance of the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedPrimarySenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, true, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, true, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + stopSenderInVMsAsync("ny", vm2); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify tmpDroppedEvents is 0 at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 50)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 50)); + + startSenderInVMsAsync("ny", vm2); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 1000)); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 950)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 950)); + + // verify the secondary's queues are drained at site-ny + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } } diff --git a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java index c1c76a704ecf..e928ea5cf800 100644 --- a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java +++ b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/serial/SerialWANPropagationsFeatureDUnitTest.java @@ -20,6 +20,7 @@ import org.junit.experimental.categories.Category; import org.apache.geode.internal.cache.wan.WANTestBase; +import org.apache.geode.test.dunit.AsyncInvocation; import org.apache.geode.test.junit.categories.WanTest; @@ -34,8 +35,8 @@ public SerialWANPropagationsFeatureDUnitTest() { @Test public void testSerialReplicatedWanWithOverflow() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createCacheInVMs(lnPort, vm4, vm5, vm6, vm7); @@ -73,8 +74,8 @@ public void testSerialReplicatedWanWithOverflow() { @Test public void testSerialReplicatedWanWithPersistence() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -110,10 +111,10 @@ public void testSerialReplicatedWanWithPersistence() { } @Test - public void testReplicatedSerialPropagationWithConflation() throws Exception { + public void testReplicatedSerialPropagationWithConflation() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -148,10 +149,10 @@ public void testReplicatedSerialPropagationWithConflation() throws Exception { } @Test - public void testReplicatedSerialPropagationWithParallelThreads() throws Exception { + public void testReplicatedSerialPropagationWithParallelThreads() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -186,10 +187,10 @@ public void testReplicatedSerialPropagationWithParallelThreads() throws Exceptio } @Test - public void testSerialPropagationWithFilter() throws Exception { + public void testSerialPropagationWithFilter() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -225,10 +226,10 @@ public void testSerialPropagationWithFilter() throws Exception { } @Test - public void testReplicatedSerialPropagationWithFilter() throws Exception { + public void testReplicatedSerialPropagationWithFilter() { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm2, vm3); createReceiverInVMs(vm2, vm3); @@ -259,9 +260,9 @@ public void testReplicatedSerialPropagationWithFilter() throws Exception { } @Test - public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exception { - Integer lnPort = (Integer) vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); - Integer nyPort = (Integer) vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); + public void testReplicatedSerialPropagationWithFilter_AfterAck() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(1)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(2, lnPort)); createCacheInVMs(nyPort, vm6, vm7); createReceiverInVMs(vm6, vm7); @@ -287,8 +288,8 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); vm5.invoke(() -> WANTestBase.validateQueueContents("ln", 0)); - Integer vm4Acks = (Integer) vm4.invoke(() -> WANTestBase.validateAfterAck("ln")); - Integer vm5Acks = (Integer) vm5.invoke(() -> WANTestBase.validateAfterAck("ln")); + Integer vm4Acks = vm4.invoke(() -> WANTestBase.validateAfterAck("ln")); + Integer vm5Acks = vm5.invoke(() -> WANTestBase.validateAfterAck("ln")); assertEquals(2000, (vm4Acks + vm5Acks)); @@ -297,4 +298,320 @@ public void testReplicatedSerialPropagationWithFilter_AfterAck() throws Exceptio vm4.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); vm5.invoke(() -> WANTestBase.waitForConcurrentSerialSenderQueueToDrain("ln")); } + + /** + * Test unstarted sender + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * + * Make sure the events are sent from LN to NY and will not be added into tmpDroppedEvents + * while normal events put from NY site can still be added to tmpDroppedEvents + * Start the sender, make sure the events in tmpDroppedEvents are sent to LN finally + */ + @Test + public void unstartedSenderShouldNotAddReceivedEventsIntoTmpDropped() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + // create receiver on site-ln and site-ny + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + // create senders on site-ny, Note: sender-id is its destination, i.e. ny + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + // create senders on site-ln, Note: sender-id is its destination, i.e. ln + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + // create PR on site-ny + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + // create PR on site-ln + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + // start sender on site-ny + startSenderInVMs("ny", vm2, vm4); + + // do 100 puts on site-ln + vm3.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify site-ny have 100 entries + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + // verify site-ln has not received the events from site-ny yet + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + // start sender on site-ln + startSenderInVMsAsync("ln", vm3, vm5); + + // verify tmpDroppedEvents should be 0 now at site-ny + vm3.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + vm5.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ln", 0)); + + vm3.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + vm5.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ln")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was starting but was not + * started yet, after the primary finishes starting. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=true + * LN site's sender's manual-start=true + * + * put some events from LN and start the sender in NY simultaneously + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStartingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, true)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, true)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + startSenderInVMsAsync("ny", vm2, vm4); + inv.await(); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that gateway sender's secondary queues do not keep dropped events + * by the primary gateway sender received while it was stopping after it is started again. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void startedSenderReceivingEventsWhileStoppingShouldDrainQueues() + throws Exception { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + AsyncInvocation inv = + vm2.invokeAsync(() -> WANTestBase.doPuts(getTestMethodName() + "_PR", 1000)); + stopSenderInVMsAsync("ny", vm2, vm4); + inv.await(); + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop the sender in NY simultaneously + * Start the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedSenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + stopSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify tmpDroppedEvents is 0 at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 0)); + + + startSenderInVMsAsync("ny", vm2, vm4); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 1000)); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 900)); + + // verify the secondary's queues are drained at site-ny + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + + /** + * Test that a stopped primary gateway sender receiving events + * does not store them in tmpDroppedEvents but after started + * does not leave any event in the + * gateway sender's secondary queues. + * Site-LN: dsid=2: senderId="ny": vm2, vm4 + * Site-NY: dsid=1: senderId="ln": vm3, vm6 + * NY site's sender's manual-start=false + * LN site's sender's manual-start=false + * + * put some events from LN and stop one instance of the sender in NY simultaneously + * Start the stopped instance of the sender in NY. + * Make sure there are no events in tmpDroppedEvents and the queues are drained. + */ + @Test + public void stoppedPrimarySenderShouldNotAddEventsToTmpDroppedEventsButStillDrainQueuesWhenStarted() { + Integer lnPort = vm0.invoke(() -> WANTestBase.createFirstLocatorWithDSId(2)); + Integer nyPort = vm1.invoke(() -> WANTestBase.createFirstRemoteLocator(1, lnPort)); + + createCacheInVMs(lnPort, vm2, vm4); + createReceiverInVMs(vm2, vm4); + createCacheInVMs(nyPort, vm3, vm5); + createReceiverInVMs(vm3, vm5); + + vm2.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + vm4.invoke(() -> WANTestBase.createSender("ny", 1, false, 100, 10, false, false, null, false)); + + vm3.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + vm5.invoke(() -> WANTestBase.createSender("ln", 2, false, 100, 10, false, false, null, false)); + + vm2.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + vm4.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ny", 1, 100, + isOffHeap())); + + vm3.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + vm5.invoke(() -> WANTestBase.createPartitionedRegion(getTestMethodName() + "_PR", "ln", 1, 100, + isOffHeap())); + + stopSenderInVMsAsync("ny", vm2); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 0, 100)); + + // verify tmpDroppedEvents is 0 at site-ny + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 100)); + + startSenderInVMsAsync("ny", vm2); + + vm2.invoke(() -> WANTestBase.doPutsFrom(getTestMethodName() + "_PR", 100, 1000)); + + vm2.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + vm4.invoke(() -> WANTestBase.verifyTmpDroppedEventSize("ny", 0)); + + vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + vm5.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName() + "_PR", 1000)); + + // verify the secondary's queues are drained at site-ny + vm2.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + vm4.invoke(() -> WANTestBase.validateParallelSenderQueueAllBucketsDrained("ny")); + } + }