Skip to content

Commit

Permalink
Switch operator should now propagate onError in subsequences.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeldejong committed May 15, 2013
1 parent 9869e78 commit 58cc48d
Showing 1 changed file with 34 additions and 19 deletions.
53 changes: 34 additions & 19 deletions rxjava-core/src/main/java/rx/operators/OperationSwitch.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -30,9 +27,19 @@
import rx.Subscription;
import rx.concurrency.TestScheduler;
import rx.subscriptions.Subscriptions;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;


/**
* This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single
Expand Down Expand Up @@ -67,54 +74,64 @@ public Switch(Observable<Observable<T>> sequences) {

@Override
public Subscription call(Observer<T> observer) {
return sequences.subscribe(new SwitchObserver<T>(observer));
AtomicObservableSubscription subscription = new AtomicObservableSubscription();
subscription.wrap(sequences.subscribe(new SwitchObserver<T>(observer, subscription)));
return subscription;
}
}

private static class SwitchObserver<T> implements Observer<Observable<T>> {

private final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();

private final Observer<T> observer;
private final AtomicObservableSubscription parent;
private final AtomicReference<Subscription> subsequence = new AtomicReference<Subscription>();

public SwitchObserver(Observer<T> observer) {
public SwitchObserver(Observer<T> observer, AtomicObservableSubscription parent) {
this.observer = observer;
this.parent = parent;
}

@Override
public void onCompleted() {
unsubscribeFromSubSequence();
observer.onCompleted();
}

@Override
public void onError(Exception e) {
unsubscribeFromSubSequence();
observer.onError(e);
}

@Override
public void onNext(Observable<T> args) {
Subscription previousSubscription = subscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
unsubscribeFromSubSequence();

subscription.set(args.subscribe(new Observer<T>() {
subsequence.set(args.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
// Do nothing.
// Do nothing.
}

@Override
public void onError(Exception e) {
// Do nothing.
parent.unsubscribe();
observer.onError(e);
}

@Override
@Override
public void onNext(T args) {
observer.onNext(args);
}
}));
}

private void unsubscribeFromSubSequence() {
Subscription previousSubscription = subsequence.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
}
}

public static class UnitTest {
Expand Down Expand Up @@ -299,7 +316,6 @@ public Subscription call(Observer<String> observer) {
verify(observer, never()).onError(any(Exception.class));

scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Exception.class));
Expand Down Expand Up @@ -355,10 +371,9 @@ public Subscription call(Observer<String> observer) {
verify(observer, never()).onError(any(Exception.class));

scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS);
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
inOrder.verify(observer, never()).onNext("three");
verify(observer, never()).onCompleted();
verify(observer, never()).onError(any(Exception.class));
verify(observer, times(1)).onError(any(TestException.class));
}

private <T> void publishCompleted(final Observer<T> observer, long delay) {
Expand Down

0 comments on commit 58cc48d

Please sign in to comment.