From c2ddc96c9e1bbbfaec156e1a9985979bda7b3e36 Mon Sep 17 00:00:00 2001 From: Dan Smith Date: Tue, 11 Oct 2016 16:46:18 -0700 Subject: [PATCH] GEODE-1991: Removing sleeps from HARegionQueueJUnitTest Getting rid of a bunch of sleeps in HARegionQueueJUnitTest to fix a bunch of tests with race conditions. Tests of expiration were sleeping for short amounts of time and then asserting that expiration happened or didn't. Changing these sleeps to use Awailitily. --- .../ha/BlockingHARegionQueueJUnitTest.java | 182 ++++++------- .../cache/ha/HARegionQueueJUnitTest.java | 252 +++++++----------- 2 files changed, 172 insertions(+), 262 deletions(-) diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java index 48fb3a276999..436cc0cfa95f 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/BlockingHARegionQueueJUnitTest.java @@ -22,6 +22,9 @@ import static org.junit.Assert.fail; import java.io.IOException; +import java.util.concurrent.TimeUnit; + +import com.jayway.awaitility.Awaitility; import org.junit.Ignore; import org.junit.Test; @@ -82,39 +85,30 @@ protected HARegionQueue createHARegionQueue(String name, * */ @Test - public void testBlockingPutAndTake() + public void testBlockingPutAndTake() throws InterruptedException, IOException, ClassNotFoundException { - try { - HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); - hrqa.setBlockingQueueCapacity(1); - final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndTake", - hrqa); - hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. - EventID id1 = new EventID(new byte[] { 1 }, 1, 1); - hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() { - try{ - EventID id2 = new EventID(new byte[] { 1 }, 1, 2); - hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); - }catch(Exception e) { - encounteredException=true; - } + HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); + hrqa.setBlockingQueueCapacity(1); + final HARegionQueue hrq = this.createHARegionQueue("testBlockingPutAndTake", + hrqa); + hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. + EventID id1 = new EventID(new byte[] { 1 }, 1, 1); + hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); + Thread t1 = new Thread(new Runnable() { + public void run() { + try{ + EventID id2 = new EventID(new byte[] { 1 }, 1, 2); + hrq.put(new ConflatableObject("key1", "val2", id2, false, "testing")); + }catch(Exception e) { + encounteredException=true; } - }); - t1.start(); - Thread.sleep(4000); - assertTrue(t1.isAlive()); - Conflatable conf = (Conflatable)hrq.take(); - assertNotNull(conf); - Thread.sleep(2000); - assertTrue(!t1.isAlive()); - - } - catch (Exception e) { - e.printStackTrace(); - fail("Test failed because of exception " + e); - } + } + }); + t1.start(); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + Conflatable conf = (Conflatable)hrq.take(); + assertNotNull(conf); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive()); } /** @@ -123,45 +117,37 @@ public void run() { * */ @Test - public void testBlockingPutAndPeekRemove() + public void testBlockingPutAndPeekRemove() throws InterruptedException, IOException, ClassNotFoundException { - try { - HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); - hrqa.setBlockingQueueCapacity(1); - final HARegionQueue hrq = this.createHARegionQueue( - "testBlockingPutAndPeekRemove", hrqa); - hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. - EventID id1 = new EventID(new byte[] { 1 }, 1, 1); - hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() - { - try { - EventID id2 = new EventID(new byte[] { 1 }, 1, 2); - hrq - .put(new ConflatableObject("key1", "val2", id2, false, - "testing")); - } - catch (Exception e) { - encounteredException = true; - } + HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); + hrqa.setBlockingQueueCapacity(1); + final HARegionQueue hrq = this.createHARegionQueue( + "testBlockingPutAndPeekRemove", hrqa); + hrq.setPrimary(true);//fix for 40314 - capacity constraint is checked for primary only. + EventID id1 = new EventID(new byte[] { 1 }, 1, 1); + hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); + Thread t1 = new Thread(new Runnable() { + public void run() + { + try { + EventID id2 = new EventID(new byte[] { 1 }, 1, 2); + hrq + .put(new ConflatableObject("key1", "val2", id2, false, + "testing")); } - }); - t1.start(); - Thread.sleep(4000); - assertTrue("put-thread expected to blocked, but was not ", t1.isAlive()); - Conflatable conf = (Conflatable)hrq.peek(); - assertNotNull(conf); - hrq.remove(); - Thread.sleep(2000); - assertFalse("Put-thread blocked unexpectedly", t1.isAlive()); - assertFalse("Exception occured in put-thread", encounteredException); + catch (Exception e) { + encounteredException = true; + } + } + }); + t1.start(); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + Conflatable conf = (Conflatable)hrq.peek(); + assertNotNull(conf); + hrq.remove(); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> !t1.isAlive()); + assertFalse("Exception occured in put-thread", encounteredException); - } - catch (Exception e) { - e.printStackTrace(); - fail("Test failed because of exception " + e); - } } /** @@ -173,42 +159,36 @@ public void run() //expiry is not applicable on primary so marking this test as invalid. @Ignore @Test - public void testBlockingPutAndExpiry() + public void testBlockingPutAndExpiry() throws InterruptedException, IOException, ClassNotFoundException { - try { - HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); - hrqa.setBlockingQueueCapacity(1); - hrqa.setExpiryTime(4); - final HARegionQueue hrq = this.createHARegionQueue( - "testBlockingPutAndExpiry", hrqa); - - EventID id1 = new EventID(new byte[] { 1 }, 1, 1); - hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); - Thread t1 = new Thread(new Runnable() { - public void run() - { - try { - EventID id2 = new EventID(new byte[] { 1 }, 1, 2); - hrq - .put(new ConflatableObject("key1", "val2", id2, false, - "testing")); - } - catch (Exception e) { - encounteredException = true; - } + HARegionQueueAttributes hrqa = new HARegionQueueAttributes(); + hrqa.setBlockingQueueCapacity(1); + hrqa.setExpiryTime(1); + final HARegionQueue hrq = this.createHARegionQueue( + "testBlockingPutAndExpiry", hrqa); + + EventID id1 = new EventID(new byte[] { 1 }, 1, 1); + long start = System.currentTimeMillis(); + hrq.put(new ConflatableObject("key1", "val1", id1, false, "testing")); + Thread t1 = new Thread(new Runnable() { + public void run() + { + try { + EventID id2 = new EventID(new byte[] { 1 }, 1, 2); + hrq + .put(new ConflatableObject("key1", "val2", id2, false, + "testing")); } - }); - t1.start(); - Thread.sleep(2000); - assertTrue("put-thread expected to blocked, but was not ", t1.isAlive()); - Thread.sleep(2500); + catch (Exception e) { + encounteredException = true; + } + } + }); + t1.start(); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(() -> t1.isAlive()); + waitAtLeast(1000, start, () -> { assertFalse("Put-thread blocked unexpectedly", t1.isAlive()); - assertFalse("Exception occured in put-thread", encounteredException); - - } - catch (Exception e) { - e.printStackTrace(); - fail("Test failed because of exception " + e); - } + }); + assertFalse("Exception occured in put-thread", encounteredException); } } \ No newline at end of file diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java index 37047583d477..a161b12699f9 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/HARegionQueueJUnitTest.java @@ -20,6 +20,7 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -29,6 +30,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.TimeUnit; + +import com.jayway.awaitility.Awaitility; import org.junit.After; import org.junit.Before; @@ -436,49 +440,31 @@ public void run() { * tests whether expiry of entry in the regin queue occurs as expected */ @Test - public void testExpiryPositive() { - try { - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(1); - //HARegionQueue regionqueue = new HARegionQueue("testing", cache, haa); - HARegionQueue regionqueue = createHARegionQueue("testing",haa); - regionqueue.put(new ConflatableObject("key", "value", new EventID( - new byte[] { 1 }, 1, 1), true, "testing")); - Map map = (Map)regionqueue.getConflationMapForTesting().get("testing"); - assertTrue(!map.isEmpty()); - Thread.sleep(3000); - assertTrue(" Expected region size to be zero since expiry time has been exceeded but it is " - + regionqueue.getRegion().keys().size(), regionqueue - .getRegion().keys().size() == 0); - - assertTrue(map.isEmpty()); - } - catch (Exception e) { - throw new AssertionError(" test failed due to ", e); - } + public void testExpiryPositive() throws InterruptedException, IOException, ClassNotFoundException { + HARegionQueueAttributes haa = new HARegionQueueAttributes(); + haa.setExpiryTime(1); + HARegionQueue regionqueue = createHARegionQueue("testing", haa); + long start = System.currentTimeMillis(); + regionqueue.put(new ConflatableObject("key", "value", new EventID( + new byte[] { 1 }, 1, 1), true, "testing")); + Map map = (Map) regionqueue.getConflationMapForTesting().get("testing"); + waitAtLeast(1000, start, () -> { + assertEquals(Collections.EMPTY_MAP, map); + assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys()); + }); } /** - * tests whether things are not deleted before expiry + * Wait until a given runnable stops throwing exceptions. It should take + * at least minimumElapsedTime after the supplied start time to happen. + * + * This is useful for validating that an entry doesn't expire until + * a certain amount of time has passed */ - @Test - public void testExpiryNegative() { - try { - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(100); - //RegionQueue regionqueue = new HARegionQueue("testing", cache, haa); - HARegionQueue regionqueue = createHARegionQueue("testing",haa); - regionqueue.put(new ConflatableObject("key", "value", new EventID( - new byte[] { 1 }, 1, 1), false, "testing")); - Thread.sleep(1200); - assertTrue(" Expected region size to be 2, since expiry time has not been exceeded but it is : " - + regionqueue.getRegion().keys().size(), regionqueue - .getRegion().keys().size() == 2); - - } - catch (Exception e) { - throw new AssertionError(" test failed due to ", e); - } + protected void waitAtLeast(final int minimumElapsedTIme, final long start, final Runnable runnable) { + Awaitility.await().atMost(1, TimeUnit.MINUTES).until(runnable); + long elapsed = System.currentTimeMillis() - start; + assertTrue(elapsed >= minimumElapsedTIme); } /** @@ -486,82 +472,34 @@ public void testExpiryNegative() { * expected */ @Test - public void testExpiryPositiveWithConflation() { - try { - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(2); - //HARegionQueue regionqueue = new HARegionQueue("testing", cache, haa); - HARegionQueue regionqueue = createHARegionQueue("testing",haa); - regionqueue.put(new ConflatableObject("key", "value", new EventID( - new byte[] { 1 }, 1, 1), true, "testing")); - regionqueue.put(new ConflatableObject("key", "newValue", new EventID( - new byte[] { 1 }, 1, 2), true, "testing")); - assertTrue(" Expected region size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.size() == 0)); - assertTrue(" Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getAvalaibleIds().size() == 0)); - assertTrue(" Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so " - + ((((Map)(regionqueue.getConflationMapForTesting() - .get("testing"))).get("key"))), - !((((Map)(regionqueue.getConflationMapForTesting().get("testing"))) - .get("key")) == null)); - assertTrue(" Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getEventsMapForTesting().size() == 0)); - Thread.sleep(5000); - - ThreadIdentifier tid = new ThreadIdentifier(new byte[] { 1 }, 1); - System.out.println(" it still contains thread id : " - + regionqueue.getRegion().containsKey(tid)); - assertTrue(" Expected region size to be zero since expiry time has been exceeded but it is not so ", - regionqueue.getRegion().keys().size() == 0); - assertTrue(" Expected the available id's size to be zero since expiry time has been exceeded but it is not so ", - regionqueue.getAvalaibleIds().size() == 0); - System.out.println((((Map)(regionqueue.getConflationMapForTesting() - .get("testing"))).get("key"))); - assertTrue(" Expected conflation map size to be zero since expiry time has been exceeded but it is not so ", - ((((Map)(regionqueue.getConflationMapForTesting().get("testing"))) - .get("key")) == null)); - assertTrue(" Expected eventID to be zero since expiry time has been exceeded but it is not so ", - (regionqueue.getEventsMapForTesting().size() == 0)); - } - catch (Exception e) { - throw new AssertionError("test failed due to ", e); - } - } - - /** - * test no expiry of events or data if expiry time not exceeded - */ - @Test - public void testExpiryNegativeWithConflation() { - try { - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(100); - //RegionQueue regionqueue = new HARegionQueue("testing", cache, haa); - HARegionQueue regionqueue = createHARegionQueue("testing",haa); - regionqueue.put(new ConflatableObject("key", "value", new EventID( - new byte[] { 1 }, 1, 1), true, "testing")); - regionqueue.put(new ConflatableObject("key", "newValue", new EventID( - new byte[] { 1 }, 1, 2), true, "testing")); - Thread.sleep(1200); - assertTrue( - " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.size() == 0)); - assertTrue( - " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getAvalaibleIds().size() == 0)); - assertTrue( - " Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so ", - !(((Map)(regionqueue - .getConflationMapForTesting().get("testing"))).size() == 0)); - assertTrue( - " Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", - !(regionqueue.getEventsMapForTesting().size() == 0)); - - } - catch (Exception e) { - throw new AssertionError("test failed due to ", e); - } + public void testExpiryPositiveWithConflation() throws InterruptedException, IOException, ClassNotFoundException { + HARegionQueueAttributes haa = new HARegionQueueAttributes(); + haa.setExpiryTime(1); + HARegionQueue regionqueue = createHARegionQueue("testing", haa); + long start = System.currentTimeMillis(); + regionqueue.put(new ConflatableObject("key", "value", new EventID( + new byte[] { 1 }, 1, 1), true, "testing")); + regionqueue.put(new ConflatableObject("key", "newValue", new EventID( + new byte[] { 1 }, 1, 2), true, "testing")); + assertTrue(" Expected region size not to be zero since expiry time has not been exceeded but it is not so ", + !(regionqueue.size() == 0)); + assertTrue( + " Expected the available id's size not to be zero since expiry time has not been exceeded but it is not so ", + !(regionqueue.getAvalaibleIds().size() == 0)); + assertTrue(" Expected conflation map size not to be zero since expiry time has not been exceeded but it is not so " + + ((((Map) (regionqueue.getConflationMapForTesting() + .get("testing"))).get("key"))), + !((((Map) (regionqueue.getConflationMapForTesting().get("testing"))) + .get("key")) == null)); + assertTrue(" Expected eventID map size not to be zero since expiry time has not been exceeded but it is not so ", + !(regionqueue.getEventsMapForTesting().size() == 0)); + + waitAtLeast(1000, start, () -> { + assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys()); + assertEquals(Collections.EMPTY_SET, regionqueue.getAvalaibleIds()); + assertEquals(Collections.EMPTY_MAP, regionqueue.getConflationMapForTesting().get("testing")); + assertEquals(Collections.EMPTY_MAP, regionqueue.getEventsMapForTesting()); + }); } /** @@ -571,7 +509,7 @@ public void testExpiryNegativeWithConflation() { public void testNoExpiryOfThreadId() { try { HARegionQueueAttributes haa = new HARegionQueueAttributes(); - haa.setExpiryTime(3); + haa.setExpiryTime(45); //RegionQueue regionqueue = new HARegionQueue("testing", cache, haa); HARegionQueue regionqueue = createHARegionQueue("testing",haa); EventID ev1 = new EventID(new byte[] { 1 }, 1, 1); @@ -581,9 +519,11 @@ public void testNoExpiryOfThreadId() { Conflatable cf2 = new ConflatableObject("key", "value2", ev2, true, "testing"); regionqueue.put(cf1); - Thread.sleep(2000); + final long tailKey = regionqueue.tailKey.get(); regionqueue.put(cf2); - Thread.sleep(1500); + //Invalidate will trigger the expiration of the entry + //See HARegionQueue.createCacheListenerForHARegion + regionqueue.getRegion().invalidate(tailKey); assertTrue( " Expected region size not to be zero since expiry time has not been exceeded but it is not so ", !(regionqueue.size() == 0)); @@ -637,27 +577,22 @@ public void testQRMComingBeforeLocalPut() { * corresponding put comes */ @Test - public void testOnlyQRMComing() { - try { - HARegionQueueAttributes harqAttr = new HARegionQueueAttributes(); - harqAttr.setExpiryTime(1); - //RegionQueue regionqueue = new HARegionQueue("testing", cache, harqAttr); - HARegionQueue regionqueue = createHARegionQueue("testing",harqAttr); - EventID id = new EventID(new byte[] { 1 }, 1, 1); - regionqueue.removeDispatchedEvents(id); - assertTrue( - " Expected testingID to be present since only QRM achieved ", - regionqueue.getRegion().containsKey( - new ThreadIdentifier(new byte[] { 1 }, 1))); - Thread.sleep(2500); - assertTrue( - " Expected testingID not to be present since it should have expired after 2.5 seconds", - !regionqueue.getRegion().containsKey( - new ThreadIdentifier(new byte[] { 1 }, 1))); - } - catch (Exception e) { - throw new AssertionError("test failed due to ", e); - } + public void testOnlyQRMComing() throws InterruptedException, IOException, ClassNotFoundException { + HARegionQueueAttributes harqAttr = new HARegionQueueAttributes(); + harqAttr.setExpiryTime(1); + //RegionQueue regionqueue = new HARegionQueue("testing", cache, harqAttr); + HARegionQueue regionqueue = createHARegionQueue("testing",harqAttr); + EventID id = new EventID(new byte[] { 1 }, 1, 1); + long start = System.currentTimeMillis(); + regionqueue.removeDispatchedEvents(id); + assertTrue( + " Expected testingID to be present since only QRM achieved ", + regionqueue.getRegion().containsKey( + new ThreadIdentifier(new byte[] { 1 }, 1))); + waitAtLeast(1000, start, () -> + assertTrue(" Expected testingID not to be present since it should have expired after 2.5 seconds", + !regionqueue.getRegion().containsKey( + new ThreadIdentifier(new byte[] { 1 }, 1)))); } /** @@ -1821,29 +1756,26 @@ private void testBatchPeekWithRemove(boolean createBlockingQueue) { * system property to set expiry */ @Test - public void testExpiryUsingSystemProperty() { - try { - System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME,"1"); - - HARegionQueueAttributes haa = new HARegionQueueAttributes(); - HARegionQueue regionqueue = createHARegionQueue("testing",haa); + public void testExpiryUsingSystemProperty() throws InterruptedException, IOException, ClassNotFoundException { + try { + System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME, "1"); + + HARegionQueueAttributes haa = new HARegionQueueAttributes(); + HARegionQueue regionqueue = createHARegionQueue("testing", haa); + long start = System.currentTimeMillis(); regionqueue.put(new ConflatableObject("key", "value", new EventID( - new byte[] { 1 }, 1, 1), true, "testing")); - Map map = (Map)regionqueue.getConflationMapForTesting().get("testing"); + new byte[] { 1 }, 1, 1), true, "testing")); + Map map = (Map) regionqueue.getConflationMapForTesting().get("testing"); assertTrue(!map.isEmpty()); - Thread.sleep(3000); - assertTrue( - " Expected region size to be zero since expiry time has been exceeded but it is " - + regionqueue.getRegion().keys().size(), regionqueue - .getRegion().keys().size() == 0); - assertTrue(map.isEmpty()); + waitAtLeast(1000, start, () -> { + assertEquals(Collections.EMPTY_MAP, map); + assertEquals(Collections.EMPTY_SET, regionqueue.getRegion().keys()); + }); + } finally { // [yogi]system property set to null, to avoid using it in the subsequent tests System.setProperty(HARegionQueue.REGION_ENTRY_EXPIRY_TIME,""); } - catch (Exception e) { - throw new AssertionError(" test failed due to ", e); - } } /** @@ -1862,11 +1794,9 @@ public void testUpdationOfMessageSyncInterval() throws Exception { int updatedMessageSyncInterval = 10; cache.setMessageSyncInterval(updatedMessageSyncInterval); - // sleep for a time just more the intial messageSyncInterval1 , so that - // the value is updated in QRM run loop. - Thread.sleep((initialMessageSyncInterval + 1) * 1000); - assertEquals("messageSyncInterval not updated.", - updatedMessageSyncInterval, HARegionQueue.getMessageSyncInterval()); + Awaitility.await().atMost(1, TimeUnit.MINUTES).until( () -> + assertEquals("messageSyncInterval not updated.", + updatedMessageSyncInterval, HARegionQueue.getMessageSyncInterval())); } }