diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 398ee00c71851..c2346433f91a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -733,7 +733,7 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { msg = subscriber1.receive(5, TimeUnit.SECONDS); // Verify: cache has to be cleared as there is no message needs to be consumed by active subscriber - assertTrue(entryCache.getSize() == 0); + assertEquals(entryCache.getSize(), 0, 1); /************ usecase-2: *************/ // 1.b Subscriber slower-subscriber @@ -763,7 +763,18 @@ public void testActiveAndInActiveConsumerEntryCacheBehavior() throws Exception { // 3.b Close subscriber2: which will trigger cache to clear the cache subscriber2.close(); - + + // retry strategically until broker clean up closed subscribers and invalidate all cache entries + int retry = 5; + for (int i = 0; i < retry; i++) { + if (entryCache.getSize() != 0) { + if (i != retry - 1) { + Thread.sleep(100); + } + } else { + break; + } + } // Verify: EntryCache should be cleared assertTrue(entryCache.getSize() == 0); subscriber1.close();