From 0000b2ceeacfe848e36e5316291e3936ef692349 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 14 Feb 2014 13:25:47 +0800 Subject: [PATCH] Add timeout to CoundDownLatch, ignore InterruptException and fix the test to be consistent with the new subscribeOn --- .../OperatorTimeoutWithSelectorTest.java | 68 ++++++++++++------- 1 file changed, 42 insertions(+), 26 deletions(-) diff --git a/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java b/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java index 65d7854a2f..709a82caa3 100644 --- a/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java @@ -15,12 +15,20 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertFalse; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import java.util.Arrays; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; import org.mockito.InOrder; @@ -30,6 +38,7 @@ import rx.Observable; import rx.Observable.OnSubscribe; import rx.Observer; +import rx.Scheduler; import rx.Subscriber; import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; @@ -329,6 +338,8 @@ public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws Interr final CountDownLatch observerReceivedTwo = new CountDownLatch(1); final CountDownLatch timeoutEmittedOne = new CountDownLatch(1); final CountDownLatch observerCompleted = new CountDownLatch(1); + final CountDownLatch enteredTimeoutOne = new CountDownLatch(1); + final AtomicBoolean latchTimeout = new AtomicBoolean(false); final Func1> timeoutFunc = new Func1>() { @Override @@ -338,31 +349,23 @@ public Observable call(Integer t1) { return Observable.create(new OnSubscribe() { @Override public void call(Subscriber subscriber) { - subscriber.add(Subscriptions.create(new Action0() { - @Override - public void call() { - try { - // emulate "unsubscribe" is busy and finishes after timeout.onNext(1) - timeoutEmittedOne.await(); - } catch (InterruptedException e) { - // if we are interrupted then we complete (as this can happen when unsubscribed) - observerCompleted.countDown(); - e.printStackTrace(); + enteredTimeoutOne.countDown(); + // force the timeout message be sent after observer.onNext(2) + while (true) { + try { + if (!observerReceivedTwo.await(30, TimeUnit.SECONDS)) { + // CountDownLatch timeout + // There should be something wrong + latchTimeout.set(true); } + break; + } catch (InterruptedException e) { + // Since we just want to emulate a busy method, + // we ignore the interrupt signal from Scheduler. } - })); - // force the timeout message be sent after observer.onNext(2) - try { - observerReceivedTwo.await(); - } catch (InterruptedException e) { - // if we are interrupted then we complete (as this can happen when unsubscribed) - observerCompleted.countDown(); - e.printStackTrace(); - } - if (!subscriber.isUnsubscribed()) { - subscriber.onNext(1); - timeoutEmittedOne.countDown(); } + subscriber.onNext(1); + timeoutEmittedOne.countDown(); } }).subscribeOn(Schedulers.newThread()); } else { @@ -401,9 +404,18 @@ public void run() { PublishSubject source = PublishSubject.create(); source.timeout(timeoutFunc, Observable.from(3)).subscribe(ts); source.onNext(1); // start timeout + try { + if(!enteredTimeoutOne.await(30, TimeUnit.SECONDS)) { + latchTimeout.set(true); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } source.onNext(2); // disable timeout try { - timeoutEmittedOne.await(); + if(!timeoutEmittedOne.await(30, TimeUnit.SECONDS)) { + latchTimeout.set(true); + } } catch (InterruptedException e) { e.printStackTrace(); } @@ -412,7 +424,11 @@ public void run() { }).start(); - observerCompleted.await(); + if(!observerCompleted.await(30, TimeUnit.SECONDS)) { + latchTimeout.set(true); + } + + assertFalse("CoundDownLatch timeout", latchTimeout.get()); InOrder inOrder = inOrder(o); inOrder.verify(o).onNext(1);