Skip to content

Commit

Permalink
fix a flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
qiangdavidliu committed Jan 10, 2018
1 parent 1ee6ae2 commit 88a3d6a
Showing 1 changed file with 18 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ public void testInfiniteOnNextStream() throws Exception {

System.out.println("Total lines:" + writes.get());
assertTrue(writes.get() >= 9); // Observable is configured to emit events in every 100 ms. So expect at least 9 in a second.
assertTrue(stream.getConcurrentConnections().get() == 0); // Provider is expected to decrement connection count when streaming process is terminated.

// Provider is expected to decrement connection count when streaming process is terminated.
assertTrue(hasNoMoreConcurrentConnections(stream.getConcurrentConnections(), 200, 10, TimeUnit.MILLISECONDS));
}

@Test
Expand All @@ -143,6 +145,20 @@ public void testOnComplete() throws Exception {
testStreamOnce(streamOfOnNextThenOnCompleted);
}

// as the concurrentConnections count is decremented asynchronously, we need to potentially give the check a little bit of time
private static boolean hasNoMoreConcurrentConnections(AtomicInteger concurrentConnectionsCount, long waitDuration, long pollInterval, TimeUnit timeUnit) throws InterruptedException {
long period = (pollInterval > waitDuration) ? waitDuration : pollInterval;

for (long i = 0; i < waitDuration; i += period) {
if (concurrentConnectionsCount.get() == 0) {
return true;
}
Thread.sleep(timeUnit.toMillis(period));
}

return false;
}

private void testStreamOnce(Observable<String> observable) throws Exception {
final PipedInputStream is = new PipedInputStream();
final PipedOutputStream os = new PipedOutputStream(is);
Expand All @@ -157,8 +173,8 @@ private void testStreamOnce(Observable<String> observable) throws Exception {

System.out.println("Total lines:" + writes.get());
assertTrue(writes.get() == 1);
assertTrue(stream.getConcurrentConnections().get() == 0);

assertTrue(hasNoMoreConcurrentConnections(stream.getConcurrentConnections(), 200, 10, TimeUnit.MILLISECONDS));
}

private static Thread startStreamingThread(final HystrixStream stream, final OutputStream outputSteam) {
Expand Down

0 comments on commit 88a3d6a

Please sign in to comment.