Skip to content

Commit

Permalink
Implement proper error handling for Rx types.
Browse files Browse the repository at this point in the history
Since we've implemented our own observables instead of using the built-in factories we have to do the heavy lifting of these edge cases ourselves.
  • Loading branch information
JakeWharton committed Aug 19, 2016
1 parent eccfe20 commit 504dd09
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import retrofit2.Response;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.plugins.RxJavaPlugins;

final class BodyOnSubscribe<T> implements OnSubscribe<T> {
private final OnSubscribe<Response<T>> upstream;
Expand Down Expand Up @@ -45,15 +47,27 @@ private static class BodySubscriber<R> extends Subscriber<Response<R>> {
subscriber.onNext(response.body());
} else {
subscriberTerminated = true;
subscriber.onError(new HttpException(response));
Throwable t = new HttpException(response);
try {
subscriber.onError(t);
} catch (Throwable inner) {
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
}
}

@Override public void onError(Throwable throwable) {
if (!subscriberTerminated) {
subscriber.onError(throwable);
} else {
// This should never happen! onNext handles and forwards errors automatically.
Throwable broken = new AssertionError(
"This should never happen! Report as a Retrofit bug with the full stacktrace.");
//noinspection UnnecessaryInitCause Two-arg AssertionError constructor is 1.7+ only.
broken.initCause(throwable);
RxJavaPlugins.getInstance().getErrorHandler().handleError(broken);
}
// TODO else send to plugin as unhandled error
}

@Override public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import rx.Producer;
import rx.Subscriber;
import rx.Subscription;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.plugins.RxJavaPlugins;

final class CallOnSubscribe<T> implements OnSubscribe<Response<T>> {
private final Call<T> originalCall;
Expand Down Expand Up @@ -62,18 +64,27 @@ private static final class CallArbiter<T> extends AtomicBoolean
return;
}

boolean terminated = false;
try {
Response<T> response = call.execute();
if (!call.isCanceled()) {
subscriber.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
subscriber.onCompleted();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (!call.isCanceled()) {
subscriber.onError(t);
if (terminated) {
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
} else if (!call.isCanceled()) {
try {
subscriber.onError(t);
} catch (Throwable inner) {
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import retrofit2.Response;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.plugins.RxJavaPlugins;

final class ResultOnSubscribe<T> implements OnSubscribe<Result<T>> {
private final OnSubscribe<Response<T>> upstream;
Expand Down Expand Up @@ -46,7 +48,12 @@ private static class ResultSubscriber<R> extends Subscriber<Response<R>> {
try {
subscriber.onNext(Result.<R>error(throwable));
} catch (Throwable t) {
subscriber.onError(t);
try {
subscriber.onError(t);
} catch (Throwable inner) {
CompositeException composite = new CompositeException(t, inner);
RxJavaPlugins.getInstance().getErrorHandler().handleError(composite);
}
return;
}
subscriber.onCompleted();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,27 @@
package retrofit2.adapter.rxjava;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.mockwebserver.MockResponse;
import okhttp3.mockwebserver.MockWebServer;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestRule;
import retrofit2.Retrofit;
import retrofit2.http.GET;
import rx.Completable;
import rx.exceptions.CompositeException;
import rx.exceptions.Exceptions;
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaPlugins;

import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST;
import static org.assertj.core.api.Assertions.assertThat;

public final class CompletableTest {
@Rule public final MockWebServer server = new MockWebServer();
@Rule public final TestRule pluginsReset = new RxJavaPluginsResetRule();
@Rule public final RecordingSubscriber.Rule subscriberRule = new RecordingSubscriber.Rule();

interface Service {
Expand Down Expand Up @@ -69,4 +76,56 @@ interface Service {
service.completable().unsafeSubscribe(subscriber);
subscriber.assertError(IOException.class);
}

@Test public void bodyThrowingInOnCompletedDeliveredToPlugin() {
server.enqueue(new MockResponse().setBody("Hi"));

final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override public void handleError(Throwable throwable) {
if (!throwableRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable);
}
}
});

RecordingSubscriber<String> subscriber = subscriberRule.create();
final RuntimeException e = new RuntimeException();
service.completable().unsafeSubscribe(new ForwardingSubscriber<String>(subscriber) {
@Override public void onCompleted() {
throw e;
}
});

assertThat(throwableRef.get()).isSameAs(e);
}

@Test public void bodyThrowingInOnErrorDeliveredToPlugin() {
server.enqueue(new MockResponse().setResponseCode(404));

final AtomicReference<Throwable> throwableRef = new AtomicReference<>();
RxJavaPlugins.getInstance().registerErrorHandler(new RxJavaErrorHandler() {
@Override public void handleError(Throwable throwable) {
if (!throwableRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable);
}
}
});

RecordingSubscriber<String> subscriber = subscriberRule.create();
final AtomicReference<Throwable> errorRef = new AtomicReference<>();
final RuntimeException e = new RuntimeException();
service.completable().unsafeSubscribe(new ForwardingSubscriber<String>(subscriber) {
@Override public void onError(Throwable throwable) {
if (!errorRef.compareAndSet(null, throwable)) {
throw Exceptions.propagate(throwable);
}
throw e;
}
});

//noinspection ThrowableResultOfMethodCallIgnored
CompositeException composite = (CompositeException) throwableRef.get();
assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;

import rx.Subscriber;

abstract class ForwardingSubscriber<T> extends Subscriber<T> {
private final Subscriber<T> delegate;

ForwardingSubscriber(Subscriber<T> delegate) {
this.delegate = delegate;
}

@Override public void onNext(T value) {
delegate.onNext(value);
}

@Override public void onCompleted() {
delegate.onCompleted();
}

@Override public void onError(Throwable throwable) {
delegate.onError(throwable);
}
}
Loading

0 comments on commit 504dd09

Please sign in to comment.