diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java index 8b0c71951a8d..1595e998e982 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/AsyncEventQueueTestBase.java @@ -35,8 +35,11 @@ import java.util.Set; import java.util.StringTokenizer; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.geode.internal.cache.control.InternalResourceManager; +import org.apache.geode.internal.cache.control.InternalResourceManager.ResourceObserver; import org.junit.experimental.categories.Category; import org.apache.geode.DataSerializable; @@ -664,10 +667,22 @@ public static void createPartitionedRegionWithCacheLoaderAndAsyncQueue(String re * Create PartitionedRegion with 1 redundant copy */ public static void createPRWithRedundantCopyWithAsyncEventQueue(String regionName, - String asyncEventQueueId, Boolean offHeap) { + String asyncEventQueueId, Boolean offHeap) throws InterruptedException { IgnoredException exp = IgnoredException.addIgnoredException(ForceReattemptException.class.getName()); + + CountDownLatch recoveryDone = new CountDownLatch(2); + + ResourceObserver observer = new InternalResourceManager.ResourceObserverAdapter() { + @Override + public void recoveryFinished(Region region) { + recoveryDone.countDown(); + } + }; + InternalResourceManager.setResourceObserver(observer); + + try { AttributesFactory fact = new AttributesFactory(); @@ -679,6 +694,7 @@ public static void createPRWithRedundantCopyWithAsyncEventQueue(String regionNam Region r = cache.createRegionFactory(fact.create()).addAsyncEventQueueId(asyncEventQueueId) .create(regionName); assertNotNull(r); + recoveryDone.await(); } finally { exp.remove(); } diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java index 69c2caa648b9..b7adff08b12c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/asyncqueue/AsyncEventListenerDUnitTest.java @@ -1466,7 +1466,6 @@ public void testParallelAsyncEventQueueHA_Scenario1() { * Test case to test possibleDuplicates. vm1 & vm2 are hosting the PR. vm2 is killed and * subsequently vm3 is brought up. Buckets are now rebalanced between vm1 & vm3. */ - @Category(FlakyTest.class) // GEODE-688: random ports, thread sleeps, async actions @Test public void testParallelAsyncEventQueueHA_Scenario2() { Integer lnPort = @@ -1493,8 +1492,6 @@ public void testParallelAsyncEventQueueHA_Scenario2() { vm1.invoke(pauseAsyncEventQueueRunnable()); vm2.invoke(pauseAsyncEventQueueRunnable()); - Wait.pause(1000);// pause for the batchTimeInterval to make sure the AsyncQueue - // is paused LogWriterUtils.getLogWriter().info("Paused the AsyncEventQueue"); @@ -1519,9 +1516,9 @@ public void testParallelAsyncEventQueueHA_Scenario2() { // ------------------------------------------------------------------ - Wait.pause(1000);// give some time for rebalancing to happen - Set primaryBucketsvm3 = (Set) vm3.invoke( - () -> AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(getTestMethodName() + "_PR")); + String regionName = getTestMethodName() + "_PR"; + Set primaryBucketsvm3 = (Set) vm3 + .invoke(() -> AsyncEventQueueTestBase.getAllPrimaryBucketsOnTheNode(regionName)); vm1.invoke(() -> AsyncEventQueueTestBase.resumeAsyncEventQueue("ln"));