diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/OperatorMapResponseToBodyOrError.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/OperatorMapResponseToBodyOrError.java new file mode 100644 index 0000000000..321f463f96 --- /dev/null +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/OperatorMapResponseToBodyOrError.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava; + +import retrofit2.Response; +import rx.Observable; +import rx.Observable.Operator; +import rx.Subscriber; +import rx.functions.Func1; + +/** + * A version of {@link Observable#map(Func1)} which lets us trigger {@code onError} without having + * to use {@link Observable#flatMap(Func1)} which breaks producer requests from propagating. + */ +final class OperatorMapResponseToBodyOrError implements Operator> { + private static final OperatorMapResponseToBodyOrError INSTANCE = + new OperatorMapResponseToBodyOrError<>(); + + @SuppressWarnings("unchecked") // Safe because of erasure. + static OperatorMapResponseToBodyOrError instance() { + return (OperatorMapResponseToBodyOrError) INSTANCE; + } + + @Override public Subscriber> call(final Subscriber child) { + return new Subscriber>(child) { + @Override public void onNext(Response response) { + if (response.isSuccessful()) { + child.onNext(response.body()); + } else { + child.onError(new HttpException(response)); + } + } + + @Override public void onCompleted() { + child.onCompleted(); + } + + @Override public void onError(Throwable e) { + child.onError(e); + } + }; + } +} diff --git a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java index aa135e6c45..43a2105e2c 100644 --- a/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java +++ b/retrofit-adapters/rxjava/src/main/java/retrofit2/adapter/rxjava/RxJavaCallAdapterFactory.java @@ -18,11 +18,13 @@ import java.lang.annotation.Annotation; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.concurrent.atomic.AtomicBoolean; import retrofit2.Call; import retrofit2.CallAdapter; import retrofit2.Response; import retrofit2.Retrofit; import rx.Observable; +import rx.Producer; import rx.Scheduler; import rx.Subscriber; import rx.exceptions.Exceptions; @@ -131,14 +133,28 @@ static final class CallOnSubscribe implements Observable.OnSubscribe> subscriber) { // Since Call is a one-shot type, clone it for each new subscriber. - final Call call = originalCall.clone(); + Call call = originalCall.clone(); - // Attempt to cancel the call if it is still in-flight on unsubscription. - subscriber.add(Subscriptions.create(new Action0() { - @Override public void call() { - call.cancel(); - } - })); + // Wrap the call in a helper which handles both unsubscription and backpressure. + RequestArbiter requestArbiter = new RequestArbiter<>(call, subscriber); + subscriber.add(Subscriptions.create(requestArbiter)); + subscriber.setProducer(requestArbiter); + } + } + + static final class RequestArbiter extends AtomicBoolean implements Action0, Producer { + private final Call call; + private final Subscriber> subscriber; + + RequestArbiter(Call call, Subscriber> subscriber) { + this.call = call; + this.subscriber = subscriber; + } + + @Override public void request(long n) { + if (n < 0) throw new IllegalArgumentException("n < 0: " + n); + if (n == 0) return; // Nothing to do when requesting 0. + if (!compareAndSet(false, true)) return; // Request was already triggered. try { Response response = call.execute(); @@ -157,6 +173,10 @@ static final class CallOnSubscribe implements Observable.OnSubscribe> { @@ -196,14 +216,7 @@ static final class SimpleCallAdapter implements CallAdapter> { @Override public Observable adapt(Call call) { Observable observable = Observable.create(new CallOnSubscribe<>(call)) // - .flatMap(new Func1, Observable>() { - @Override public Observable call(Response response) { - if (response.isSuccessful()) { - return Observable.just(response.body()); - } - return Observable.error(new HttpException(response)); - } - }); + .lift(OperatorMapResponseToBodyOrError.instance()); if (scheduler != null) { return observable.subscribeOn(scheduler); } diff --git a/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/ObservableTest.java b/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/ObservableTest.java index a854f63bd1..f3e05adea5 100644 --- a/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/ObservableTest.java +++ b/retrofit-adapters/rxjava/src/test/java/retrofit2/adapter/rxjava/ObservableTest.java @@ -26,6 +26,7 @@ import retrofit2.http.GET; import rx.Observable; import rx.observables.BlockingObservable; +import rx.observers.TestSubscriber; import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; import static org.assertj.core.api.Assertions.assertThat; @@ -83,6 +84,22 @@ interface Service { } } + @Test public void bodyRespectsBackpressure() { + server.enqueue(new MockResponse().setBody("Hi")); + + TestSubscriber subscriber = new TestSubscriber<>(0); + Observable o = service.body(); + + o.subscribe(subscriber); + assertThat(server.getRequestCount()).isEqualTo(0); + + subscriber.requestMore(1); + assertThat(server.getRequestCount()).isEqualTo(1); + + subscriber.requestMore(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP requests. + assertThat(server.getRequestCount()).isEqualTo(1); + } + @Test public void responseSuccess200() { server.enqueue(new MockResponse().setBody("Hi")); @@ -113,6 +130,22 @@ interface Service { } } + @Test public void responseRespectsBackpressure() { + server.enqueue(new MockResponse().setBody("Hi")); + + TestSubscriber> subscriber = new TestSubscriber<>(0); + Observable> o = service.response(); + + o.subscribe(subscriber); + assertThat(server.getRequestCount()).isEqualTo(0); + + subscriber.requestMore(1); + assertThat(server.getRequestCount()).isEqualTo(1); + + subscriber.requestMore(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP requests. + assertThat(server.getRequestCount()).isEqualTo(1); + } + @Test public void resultSuccess200() { server.enqueue(new MockResponse().setBody("Hi")); @@ -143,4 +176,20 @@ interface Service { assertThat(result.isError()).isTrue(); assertThat(result.error()).isInstanceOf(IOException.class); } + + @Test public void resultRespectsBackpressure() { + server.enqueue(new MockResponse().setBody("Hi")); + + TestSubscriber> subscriber = new TestSubscriber<>(0); + Observable> o = service.result(); + + o.subscribe(subscriber); + assertThat(server.getRequestCount()).isEqualTo(0); + + subscriber.requestMore(1); + assertThat(server.getRequestCount()).isEqualTo(1); + + subscriber.requestMore(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP requests. + assertThat(server.getRequestCount()).isEqualTo(1); + } }