Skip to content

Commit

Permalink
Localized Operator Error Handling
Browse files Browse the repository at this point in the history
- use the lift function rather than try/catch in subscribe since this catches at the operator level rather than for an entire sequence
- unit tests with onErrorResumeNext demonstrating the use cases
  • Loading branch information
benjchristensen committed Feb 25, 2014
1 parent 3a1c5f5 commit f18e4d2
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 5 deletions.
12 changes: 11 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,17 @@ public <R> Observable<R> lift(final Operator<? extends R, ? super T> lift) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
f.call(hook.onLift(lift).call(o));
try {
f.call(hook.onLift(lift).call(o));
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
if (e instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) e;
}
o.onError(e);
}
}
});
}
Expand Down
8 changes: 8 additions & 0 deletions rxjava-core/src/main/java/rx/exceptions/OnErrorThrowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ public static OnErrorThrowable from(Throwable t) {
* @return Throwable e passed in
*/
public static Throwable addValueAsLastCause(Throwable e, Object value) {
Throwable lastCause = Exceptions.getFinalCause(e);
if (lastCause != null && lastCause instanceof OnNextValue) {
// purposefully using == for object reference check
if (((OnNextValue) lastCause).getValue() == value) {
// don't add another
return e;
}
}
Exceptions.addCause(e, new OnNextValue(value));
return e;
}
Expand Down
13 changes: 9 additions & 4 deletions rxjava-core/src/test/java/rx/operators/OperatorMapTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.functions.Func2;
Expand Down Expand Up @@ -171,7 +172,7 @@ public String call(String s) {
public void call(Throwable t1) {
t1.printStackTrace();
}

});

m.subscribe(stringObserver);
Expand Down Expand Up @@ -255,7 +256,7 @@ public Integer call(Integer i) {
}).toBlockingObservable().single();
}

@Test(expected = RuntimeException.class)
@Test(expected = OnErrorNotImplementedException.class)
public void verifyExceptionIsThrownIfThereIsNoExceptionHandler() {

Observable.OnSubscribe<Object> creator = new Observable.OnSubscribe<Object>() {
Expand All @@ -273,7 +274,6 @@ public void call(Subscriber<? super Object> observer) {

@Override
public Observable<Object> call(Object object) {

return Observable.from(object);
}
};
Expand All @@ -299,7 +299,12 @@ public void call(Object object) {
}
};

Observable.create(creator).flatMap(manyMapper).map(mapper).subscribe(onNext);
try {
Observable.create(creator).flatMap(manyMapper).map(mapper).subscribe(onNext);
} catch (RuntimeException e) {
e.printStackTrace();
throw e;
}
}

private static Map<String, String> getMap(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import org.mockito.Mockito;

import rx.Observable;
import rx.Observable.Operator;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;
import rx.observers.TestSubscriber;
Expand Down Expand Up @@ -143,6 +145,87 @@ public Observable<String> call(Throwable t1) {
verify(observer, times(0)).onCompleted();
}

/**
* Test that we receive the onError if an exception is thrown from an operator that
* does not have manual try/catch handling like map does.
*/
@Test
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperator() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Observable.from(1).lift(new Operator<String, Integer>() {

@Override
public Subscriber<? super Integer> call(Subscriber<? super String> t1) {
throw new RuntimeException("failed");
}

}).onErrorResumeNext(new Func1<Throwable, Observable<String>>() {

@Override
public Observable<String> call(Throwable t1) {
if (t1.getMessage().equals("failed")) {
return Observable.from("success");
} else {
return Observable.error(t1);
}
}

}).subscribe(ts);

ts.assertTerminalEvent();
System.out.println(ts.getOnNextEvents());
ts.assertReceivedOnNext(Arrays.asList("success"));
}

/**
* Test that we receive the onError if an exception is thrown from an operator that
* does not have manual try/catch handling like map does.
*/
@Test
public void testOnErrorResumeReceivesErrorFromPreviousNonProtectedOperatorOnNext() {
TestSubscriber<String> ts = new TestSubscriber<String>();
Observable.from(1).lift(new Operator<String, Integer>() {

@Override
public Subscriber<? super Integer> call(Subscriber<? super String> t1) {
return new Subscriber<Integer>() {

@Override
public void onCompleted() {
throw new RuntimeException("failed");
}

@Override
public void onError(Throwable e) {
throw new RuntimeException("failed");
}

@Override
public void onNext(Integer t) {
throw new RuntimeException("failed");
}

};
}

}).onErrorResumeNext(new Func1<Throwable, Observable<String>>() {

@Override
public Observable<String> call(Throwable t1) {
if (t1.getMessage().equals("failed")) {
return Observable.from("success");
} else {
return Observable.error(t1);
}
}

}).subscribe(ts);

ts.assertTerminalEvent();
System.out.println(ts.getOnNextEvents());
ts.assertReceivedOnNext(Arrays.asList("success"));
}

private static class TestObservable implements Observable.OnSubscribeFunc<String> {

final Subscription s;
Expand Down

0 comments on commit f18e4d2

Please sign in to comment.