Skip to content

Commit

Permalink
Merge pull request ReactiveX#904 from benjchristensen/merge-subscript…
Browse files Browse the repository at this point in the history
…ions

Merge: Unsubscribe Completed Inner Observables
  • Loading branch information
benjchristensen committed Feb 19, 2014
2 parents fe7e449 + 94c8b6b commit 44b015f
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 2 deletions.
6 changes: 6 additions & 0 deletions rxjava-core/src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ public void assertTerminalEvent() {
testObserver.assertTerminalEvent();
}

public void assertUnsubscribed() {
if (!isUnsubscribed()) {
throw new AssertionError("Not unsubscribed.");
}
}

public void awaitTerminalEvent() {
try {
latch.await();
Expand Down
17 changes: 15 additions & 2 deletions rxjava-core/src/main/java/rx/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import rx.Observable.Operator;
import rx.Subscriber;
import rx.observers.SynchronizedSubscriber;
import rx.subscriptions.CompositeSubscription;

/**
* Flattens a list of Observables into one Observable sequence, without any transformation.
Expand All @@ -36,6 +37,9 @@ public final class OperatorMerge<T> implements Operator<T, Observable<? extends
public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> outerOperation) {

final Subscriber<T> o = new SynchronizedSubscriber<T>(outerOperation);
final CompositeSubscription childrenSubscriptions = new CompositeSubscription();
outerOperation.add(childrenSubscriptions);

return new Subscriber<Observable<? extends T>>(outerOperation) {

private volatile boolean completed = false;
Expand All @@ -57,32 +61,41 @@ public void onError(Throwable e) {
@Override
public void onNext(Observable<? extends T> innerObservable) {
runningCount.incrementAndGet();
innerObservable.subscribe(new InnerObserver());
Subscriber<T> i = new InnerObserver();
childrenSubscriptions.add(i);
innerObservable.subscribe(i);
}

final class InnerObserver extends Subscriber<T> {

public InnerObserver() {
super(o);
}

@Override
public void onCompleted() {
if (runningCount.decrementAndGet() == 0 && completed) {
o.onCompleted();
}
cleanup();
}

@Override
public void onError(Throwable e) {
o.onError(e);
cleanup();
}

@Override
public void onNext(T a) {
o.onNext(a);
}

private void cleanup() {
// remove subscription onCompletion so it cleans up immediately and doesn't memory leak
// see https://github.com/Netflix/RxJava/issues/897
childrenSubscriptions.remove(this);
}

};

};
Expand Down
100 changes: 100 additions & 0 deletions rxjava-core/src/test/java/rx/operators/OperatorMergeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,11 +34,16 @@
import org.mockito.MockitoAnnotations;

import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.schedulers.TestScheduler;
import rx.subscriptions.Subscriptions;

public class OperatorMergeTest {
Expand Down Expand Up @@ -372,4 +379,97 @@ public Subscription onSubscribe(Observer<? super String> observer) {
}
}

@Test
public void testUnsubscribeAsObservablesComplete() {
TestScheduler scheduler1 = Schedulers.test();
AtomicBoolean os1 = new AtomicBoolean(false);
Observable<Long> o1 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler1, os1);

TestScheduler scheduler2 = Schedulers.test();
AtomicBoolean os2 = new AtomicBoolean(false);
Observable<Long> o2 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler2, os2);

TestSubscriber<Long> ts = new TestSubscriber<Long>();
Observable.merge(o1, o2).subscribe(ts);

// we haven't incremented time so nothing should be received yet
ts.assertReceivedOnNext(Collections.<Long> emptyList());

scheduler1.advanceTimeBy(3, TimeUnit.SECONDS);
scheduler2.advanceTimeBy(2, TimeUnit.SECONDS);

ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L));
// not unsubscribed yet
assertFalse(os1.get());
assertFalse(os2.get());

// advance to the end at which point it should complete
scheduler1.advanceTimeBy(3, TimeUnit.SECONDS);

ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L, 3L, 4L));
assertTrue(os1.get());
assertFalse(os2.get());

// both should be completed now
scheduler2.advanceTimeBy(3, TimeUnit.SECONDS);

ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L, 3L, 4L, 2L, 3L, 4L));
assertTrue(os1.get());
assertTrue(os2.get());

ts.assertTerminalEvent();
}

@Test
public void testEarlyUnsubscribe() {
TestScheduler scheduler1 = Schedulers.test();
AtomicBoolean os1 = new AtomicBoolean(false);
Observable<Long> o1 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler1, os1);

TestScheduler scheduler2 = Schedulers.test();
AtomicBoolean os2 = new AtomicBoolean(false);
Observable<Long> o2 = createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(scheduler2, os2);

TestSubscriber<Long> ts = new TestSubscriber<Long>();
Subscription s = Observable.merge(o1, o2).subscribe(ts);

// we haven't incremented time so nothing should be received yet
ts.assertReceivedOnNext(Collections.<Long> emptyList());

scheduler1.advanceTimeBy(3, TimeUnit.SECONDS);
scheduler2.advanceTimeBy(2, TimeUnit.SECONDS);

ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L));
// not unsubscribed yet
assertFalse(os1.get());
assertFalse(os2.get());

// early unsubscribe
s.unsubscribe();

assertTrue(os1.get());
assertTrue(os2.get());

ts.assertReceivedOnNext(Arrays.asList(0L, 1L, 2L, 0L, 1L));
ts.assertUnsubscribed();
}

private Observable<Long> createObservableOf5IntervalsOf1SecondIncrementsWithSubscriptionHook(final Scheduler scheduler, final AtomicBoolean unsubscribed) {
return Observable.create(new OnSubscribe<Long>() {

@Override
public void call(Subscriber<? super Long> s) {
s.add(Subscriptions.create(new Action0() {

@Override
public void call() {
unsubscribed.set(true);
}

}));
Observable.interval(1, TimeUnit.SECONDS, scheduler).take(5).subscribe(s);
}
});
}

}

0 comments on commit 44b015f

Please sign in to comment.