Skip to content

Commit

Permalink
[Issue 9451] Fix flaky test SimpleProducerConsumerTest.testConcurrent…
Browse files Browse the repository at this point in the history
…ConsumerReceiveWhileReconnect (apache#9575)

- use Awaitility to wait for assertions to pass
  • Loading branch information
lhotari authored Feb 12, 2021
1 parent 341fe5e commit 8b45a65
Showing 1 changed file with 21 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -760,12 +760,9 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
}

barrier.await();
// there will be 10 threads calling receive() from the same consumer and will block
Thread.sleep(100);

// we restart the broker to reconnect
restartBroker();
Thread.sleep(2000);

// publish 100 messages so that the consumers blocked on receive() will now get the messages
ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
Expand All @@ -781,12 +778,14 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
String message = "my-message-" + i;
producer.send(message.getBytes());
}
Thread.sleep(500);

ConsumerImpl<byte[]> consumerImpl = (ConsumerImpl<byte[]>) consumer;
// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);

Awaitility.await().untilAsserted(() -> {
// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
});

barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
Expand All @@ -797,11 +796,12 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
});
}
barrier.await();
Thread.sleep(100);

// The available permits should be 20 and num messages in the queue should be 80
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - (numConsumersThreads * 2));
Awaitility.await().untilAsserted(() -> {
// The available permits should be 20 and num messages in the queue should be 80
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads * 2);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - (numConsumersThreads * 2));
});

// clear the queue
while (true) {
Expand All @@ -811,9 +811,11 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
}
}

// The available permits should be 0 and num messages in the queue should be 0
Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
Awaitility.await().untilAsserted(() -> {
// The available permits should be 0 and num messages in the queue should be 0
Assert.assertEquals(consumerImpl.getAvailablePermits(), 0);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), 0);
});

barrier.reset();
for (int i = 0; i < numConsumersThreads; i++) {
Expand All @@ -824,15 +826,14 @@ public void testConcurrentConsumerReceiveWhileReconnect(int batchMessageDelayMs)
});
}
barrier.await();
// we again make 10 threads call receive() and get blocked
Thread.sleep(100);

restartBroker();
Thread.sleep(2000);

// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
Awaitility.await().untilAsserted(() -> {
// The available permits should be 10 and num messages in the queue should be 90
Assert.assertEquals(consumerImpl.getAvailablePermits(), numConsumersThreads);
Assert.assertEquals(consumerImpl.numMessagesInQueue(), recvQueueSize - numConsumersThreads);
});
consumer.close();
executor.shutdown();
}
Expand Down

0 comments on commit 8b45a65

Please sign in to comment.