Skip to content

Commit

Permalink
Merge pull request square#1702 from square/jw/under-pressure
Browse files Browse the repository at this point in the history
Honor backpressure in Observable creation.
  • Loading branch information
JakeWharton committed Mar 30, 2016
2 parents 6b2bb3a + c107778 commit 1b05fc9
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (C) 2016 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit2.adapter.rxjava;

import retrofit2.Response;
import rx.Observable;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.functions.Func1;

/**
* A version of {@link Observable#map(Func1)} which lets us trigger {@code onError} without having
* to use {@link Observable#flatMap(Func1)} which breaks producer requests from propagating.
*/
final class OperatorMapResponseToBodyOrError<T> implements Operator<T, Response<T>> {
private static final OperatorMapResponseToBodyOrError<Object> INSTANCE =
new OperatorMapResponseToBodyOrError<>();

@SuppressWarnings("unchecked") // Safe because of erasure.
static <R> OperatorMapResponseToBodyOrError<R> instance() {
return (OperatorMapResponseToBodyOrError<R>) INSTANCE;
}

@Override public Subscriber<? super Response<T>> call(final Subscriber<? super T> child) {
return new Subscriber<Response<T>>(child) {
@Override public void onNext(Response<T> response) {
if (response.isSuccessful()) {
child.onNext(response.body());
} else {
child.onError(new HttpException(response));
}
}

@Override public void onCompleted() {
child.onCompleted();
}

@Override public void onError(Throwable e) {
child.onError(e);
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import java.lang.annotation.Annotation;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.concurrent.atomic.AtomicBoolean;
import retrofit2.Call;
import retrofit2.CallAdapter;
import retrofit2.Response;
import retrofit2.Retrofit;
import rx.Observable;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.exceptions.Exceptions;
Expand Down Expand Up @@ -131,14 +133,28 @@ static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response

@Override public void call(final Subscriber<? super Response<T>> subscriber) {
// Since Call is a one-shot type, clone it for each new subscriber.
final Call<T> call = originalCall.clone();
Call<T> call = originalCall.clone();

// Attempt to cancel the call if it is still in-flight on unsubscription.
subscriber.add(Subscriptions.create(new Action0() {
@Override public void call() {
call.cancel();
}
}));
// Wrap the call in a helper which handles both unsubscription and backpressure.
RequestArbiter<T> requestArbiter = new RequestArbiter<>(call, subscriber);
subscriber.add(Subscriptions.create(requestArbiter));
subscriber.setProducer(requestArbiter);
}
}

static final class RequestArbiter<T> extends AtomicBoolean implements Action0, Producer {
private final Call<T> call;
private final Subscriber<? super Response<T>> subscriber;

RequestArbiter(Call<T> call, Subscriber<? super Response<T>> subscriber) {
this.call = call;
this.subscriber = subscriber;
}

@Override public void request(long n) {
if (n < 0) throw new IllegalArgumentException("n < 0: " + n);
if (n == 0) return; // Nothing to do when requesting 0.
if (!compareAndSet(false, true)) return; // Request was already triggered.

try {
Response<T> response = call.execute();
Expand All @@ -157,6 +173,10 @@ static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response
subscriber.onCompleted();
}
}

@Override public void call() {
call.cancel();
}
}

static final class ResponseCallAdapter implements CallAdapter<Observable<?>> {
Expand Down Expand Up @@ -196,14 +216,7 @@ static final class SimpleCallAdapter implements CallAdapter<Observable<?>> {

@Override public <R> Observable<R> adapt(Call<R> call) {
Observable<R> observable = Observable.create(new CallOnSubscribe<>(call)) //
.flatMap(new Func1<Response<R>, Observable<R>>() {
@Override public Observable<R> call(Response<R> response) {
if (response.isSuccessful()) {
return Observable.just(response.body());
}
return Observable.error(new HttpException(response));
}
});
.lift(OperatorMapResponseToBodyOrError.<R>instance());
if (scheduler != null) {
return observable.subscribeOn(scheduler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import retrofit2.http.GET;
import rx.Observable;
import rx.observables.BlockingObservable;
import rx.observers.TestSubscriber;

import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST;
import static org.assertj.core.api.Assertions.assertThat;
Expand Down Expand Up @@ -83,6 +84,22 @@ interface Service {
}
}

@Test public void bodyRespectsBackpressure() {
server.enqueue(new MockResponse().setBody("Hi"));

TestSubscriber<String> subscriber = new TestSubscriber<>(0);
Observable<String> o = service.body();

o.subscribe(subscriber);
assertThat(server.getRequestCount()).isEqualTo(0);

subscriber.requestMore(1);
assertThat(server.getRequestCount()).isEqualTo(1);

subscriber.requestMore(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP requests.
assertThat(server.getRequestCount()).isEqualTo(1);
}

@Test public void responseSuccess200() {
server.enqueue(new MockResponse().setBody("Hi"));

Expand Down Expand Up @@ -113,6 +130,22 @@ interface Service {
}
}

@Test public void responseRespectsBackpressure() {
server.enqueue(new MockResponse().setBody("Hi"));

TestSubscriber<Response<String>> subscriber = new TestSubscriber<>(0);
Observable<Response<String>> o = service.response();

o.subscribe(subscriber);
assertThat(server.getRequestCount()).isEqualTo(0);

subscriber.requestMore(1);
assertThat(server.getRequestCount()).isEqualTo(1);

subscriber.requestMore(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP requests.
assertThat(server.getRequestCount()).isEqualTo(1);
}

@Test public void resultSuccess200() {
server.enqueue(new MockResponse().setBody("Hi"));

Expand Down Expand Up @@ -143,4 +176,20 @@ interface Service {
assertThat(result.isError()).isTrue();
assertThat(result.error()).isInstanceOf(IOException.class);
}

@Test public void resultRespectsBackpressure() {
server.enqueue(new MockResponse().setBody("Hi"));

TestSubscriber<Result<String>> subscriber = new TestSubscriber<>(0);
Observable<Result<String>> o = service.result();

o.subscribe(subscriber);
assertThat(server.getRequestCount()).isEqualTo(0);

subscriber.requestMore(1);
assertThat(server.getRequestCount()).isEqualTo(1);

subscriber.requestMore(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP requests.
assertThat(server.getRequestCount()).isEqualTo(1);
}
}

0 comments on commit 1b05fc9

Please sign in to comment.