Skip to content

Commit

Permalink
Merge pull request ReactiveX#2809 from akarnokd/TakeUntilTerminationFix
Browse files Browse the repository at this point in the history
Fixed takeUntil not unsubscribing from either of the observables in case
  • Loading branch information
benjchristensen committed Mar 6, 2015
2 parents ecbd27d + d6eb4dd commit 4f29be4
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 12 deletions.
52 changes: 43 additions & 9 deletions src/main/java/rx/internal/operators/OperatorTakeUntil.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,62 @@ public OperatorTakeUntil(final Observable<? extends E> other) {

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new SerializedSubscriber<T>(child);

other.unsafeSubscribe(new Subscriber<E>(child) {

final Subscriber<T> serial = new SerializedSubscriber<T>(child, false);

final Subscriber<T> main = new Subscriber<T>(serial, false) {
@Override
public void onNext(T t) {
serial.onNext(t);
}
@Override
public void onError(Throwable e) {
try {
serial.onError(e);
} finally {
serial.unsubscribe();
}
}
@Override
public void onCompleted() {
parent.onCompleted();
try {
serial.onCompleted();
} finally {
serial.unsubscribe();
}
}
};

final Subscriber<E> so = new Subscriber<E>() {
@Override
public void onStart() {
request(Long.MAX_VALUE);
}

@Override
public void onCompleted() {
main.onCompleted();
}

@Override
public void onError(Throwable e) {
parent.onError(e);
main.onError(e);
}

@Override
public void onNext(E t) {
parent.onCompleted();
onCompleted();
}

});
};

serial.add(main);
serial.add(so);

child.add(serial);

other.unsafeSubscribe(so);

return parent;
return main;
}

}
104 changes: 101 additions & 3 deletions src/test/java/rx/internal/operators/OperatorTakeUntilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@
*/
package rx.internal.operators;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;

import java.util.Arrays;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.observers.TestSubscriber;
import rx.subjects.PublishSubject;

public class OperatorTakeUntilTest {

Expand Down Expand Up @@ -188,4 +192,98 @@ public void call(Subscriber<? super String> observer) {
observer.add(s);
}
}

@Test
public void testUntilFires() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> until = PublishSubject.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.takeUntil(until).unsafeSubscribe(ts);

assertTrue(source.hasObservers());
assertTrue(until.hasObservers());

source.onNext(1);

ts.assertReceivedOnNext(Arrays.asList(1));
until.onNext(1);

ts.assertReceivedOnNext(Arrays.asList(1));
ts.assertNoErrors();
ts.assertTerminalEvent();

assertFalse("Source still has observers", source.hasObservers());
assertFalse("Until still has observers", until.hasObservers());
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
}
@Test
public void testMainCompletes() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> until = PublishSubject.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.takeUntil(until).unsafeSubscribe(ts);

assertTrue(source.hasObservers());
assertTrue(until.hasObservers());

source.onNext(1);
source.onCompleted();

ts.assertReceivedOnNext(Arrays.asList(1));
ts.assertNoErrors();
ts.assertTerminalEvent();

assertFalse("Source still has observers", source.hasObservers());
assertFalse("Until still has observers", until.hasObservers());
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
}
@Test
public void testDownstreamUnsubscribes() {
PublishSubject<Integer> source = PublishSubject.create();
PublishSubject<Integer> until = PublishSubject.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

source.takeUntil(until).take(1).unsafeSubscribe(ts);

assertTrue(source.hasObservers());
assertTrue(until.hasObservers());

source.onNext(1);

ts.assertReceivedOnNext(Arrays.asList(1));
ts.assertNoErrors();
ts.assertTerminalEvent();

assertFalse("Source still has observers", source.hasObservers());
assertFalse("Until still has observers", until.hasObservers());
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
}
public void testBackpressure() {
PublishSubject<Integer> until = PublishSubject.create();

TestSubscriber<Integer> ts = new TestSubscriber<Integer>() {
@Override
public void onStart() {
requestMore(0);
}
};

Observable.range(1, 10).takeUntil(until).unsafeSubscribe(ts);

assertTrue(until.hasObservers());

ts.requestMore(1);

ts.assertReceivedOnNext(Arrays.asList(1));
ts.assertNoErrors();
assertTrue("TestSubscriber completed", ts.getOnCompletedEvents().isEmpty());

assertFalse("Until still has observers", until.hasObservers());
assertFalse("TestSubscriber is unsubscribed", ts.isUnsubscribed());
}
}

0 comments on commit 4f29be4

Please sign in to comment.