Skip to content

Commit

Permalink
Merge pull request square#981 from square/jw/rxjava-single
Browse files Browse the repository at this point in the history
Add optional support for RxJava's experimental Single type.
  • Loading branch information
JakeWharton committed Aug 22, 2015
2 parents 38a7779 + d1ca096 commit 53942cc
Show file tree
Hide file tree
Showing 7 changed files with 255 additions and 55 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<okhttp.version>2.5.0-SNAPSHOT</okhttp.version>

<!-- Adapter Dependencies -->
<rxjava.version>1.0.10</rxjava.version>
<rxjava.version>1.0.13</rxjava.version>

<!-- Converter Dependencies -->
<gson.version>2.3.1</gson.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright (C) 2015 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit.mock;

import rx.Observable;
import rx.functions.Func1;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public final class RxJavaBehaviorAdapter implements BehaviorAdapter<Object> {
public static RxJavaBehaviorAdapter create() {
return new RxJavaBehaviorAdapter();
}

private RxJavaBehaviorAdapter() {
}

@Override public Object applyBehavior(Behavior behavior, Object value) {
if (value instanceof Observable) {
return applyObservableBehavior(behavior, (Observable<?>) value);
}
String name = value.getClass().getCanonicalName();
if ("rx.Single".equals(name)) {
// Apply behavior to the Single from a separate class. This defers classloading such that
// regular Observable operation can be leveraged without relying on this unstable RxJava API.
return SingleHelper.applySingleBehavior(behavior, value);
}
throw new IllegalStateException("Unsupported type " + name);
}

public Observable<?> applyObservableBehavior(final Behavior behavior, final Observable<?> value) {
return Observable.timer(behavior.calculateDelay(MILLISECONDS), MILLISECONDS)
.flatMap(new Func1<Long, Observable<?>>() {
@Override public Observable<?> call(Long ignored) {
if (behavior.calculateIsFailure()) {
return Observable.error(behavior.failureException());
}
return value;
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,24 @@
package retrofit.mock;

import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import static java.util.concurrent.TimeUnit.MILLISECONDS;

public final class ObservableBehaviorAdapter implements BehaviorAdapter<Observable<?>> {
public static ObservableBehaviorAdapter create() {
return new ObservableBehaviorAdapter();
}

private ObservableBehaviorAdapter() {
}

@Override public Observable<?> applyBehavior(final Behavior behavior, final Observable<?> value) {
final class SingleHelper {
@SuppressWarnings("unchecked") // Caller must instanceof / getClass() verify 'value' is Single.
public static Object applySingleBehavior(final Behavior behavior, Object value) {
final Single<Object> single = (Single<Object>) value;
return Observable.timer(behavior.calculateDelay(MILLISECONDS), MILLISECONDS)
.flatMap(new Func1<Long, Observable<?>>() {
@Override public Observable<?> call(Long ignored) {
if (behavior.calculateIsFailure()) {
return Observable.error(behavior.failureException());
}
return value;
return single.toObservable();
}
});
})
.toSingle();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,39 +23,107 @@
import org.junit.Before;
import org.junit.Test;
import rx.Observable;
import rx.Single;
import rx.Subscriber;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertTrue;

public final class ObservableBehaviorAdapterTest {
public final class RxJavaBehaviorAdapterTest {
interface DoWorkService {
Observable<String> response();
Observable<String> observableResponse();
Single<String> singleResponse();
}

private final Behavior behavior = Behavior.create(new Random(2847));
private DoWorkService service;

@Before public void setUp() {
DoWorkService mockService = new DoWorkService() {
@Override public Observable<String> response() {
@Override public Observable<String> observableResponse() {
return Observable.just("Hi!");
}

@Override public Single<String> singleResponse() {
return Single.just("Hi!");
}
};

BehaviorAdapter<?> adapter = ObservableBehaviorAdapter.create();
BehaviorAdapter<?> adapter = RxJavaBehaviorAdapter.create();
MockRetrofit mockRetrofit = new MockRetrofit(adapter, behavior);
service = mockRetrofit.create(DoWorkService.class, mockService);
}

@Test public void failureAfterDelay() throws InterruptedException {
@Test public void observableFailureAfterDelay() throws InterruptedException {
behavior.setDelay(100, MILLISECONDS);
behavior.setVariancePercent(0);
behavior.setFailurePercent(100);

Observable<String> observable = service.observableResponse();

final long startNanos = System.nanoTime();
final AtomicLong tookMs = new AtomicLong();
final AtomicReference<Throwable> failureRef = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
observable.subscribe(new Subscriber<String>() {
@Override public void onNext(String s) {
throw new AssertionError();
}

@Override public void onError(Throwable throwable) {
tookMs.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
failureRef.set(throwable);
latch.countDown();
}

@Override public void onCompleted() {
}
});
assertTrue(latch.await(1, SECONDS));

assertThat(failureRef.get()).isSameAs(behavior.failureException());
assertThat(tookMs.get()).isGreaterThanOrEqualTo(100);
}

@Test public void observableSuccessAfterDelay() throws InterruptedException {
behavior.setDelay(100, MILLISECONDS);
behavior.setVariancePercent(0);
behavior.setFailurePercent(0);

Observable<String> observable = service.observableResponse();

final long startNanos = System.nanoTime();
final AtomicLong tookMs = new AtomicLong();
final AtomicReference<String> actual = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
observable.subscribe(new Subscriber<String>() {
@Override public void onNext(String value) {
tookMs.set(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
actual.set(value);
latch.countDown();
}

@Override public void onError(Throwable throwable) {
throw new AssertionError();
}

@Override public void onCompleted() {
}
});
assertTrue(latch.await(1, SECONDS));

assertThat(actual.get()).isEqualTo("Hi!");
assertThat(tookMs.get()).isGreaterThanOrEqualTo(100);
}

@Test public void singleFailureAfterDelay() throws InterruptedException {
behavior.setDelay(100, MILLISECONDS);
behavior.setVariancePercent(0);
behavior.setFailurePercent(100);

Observable<String> observable = service.response();
Single<String> observable = service.singleResponse();

final long startNanos = System.nanoTime();
final AtomicLong tookMs = new AtomicLong();
Expand All @@ -81,12 +149,12 @@ interface DoWorkService {
assertThat(tookMs.get()).isGreaterThanOrEqualTo(100);
}

@Test public void successAfterDelay() throws InterruptedException {
@Test public void singleSuccessAfterDelay() throws InterruptedException {
behavior.setDelay(100, MILLISECONDS);
behavior.setVariancePercent(0);
behavior.setFailurePercent(0);

Observable<String> observable = service.response();
Single<String> observable = service.singleResponse();

final long startNanos = System.nanoTime();
final AtomicLong tookMs = new AtomicLong();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,41 @@
/**
* TODO docs
*/
public final class ObservableCallAdapterFactory implements CallAdapter.Factory {
public final class RxJavaCallAdapterFactory implements CallAdapter.Factory {
/**
* TODO
*/
public static ObservableCallAdapterFactory create() {
return new ObservableCallAdapterFactory();
public static RxJavaCallAdapterFactory create() {
return new RxJavaCallAdapterFactory();
}

private ObservableCallAdapterFactory() {
private RxJavaCallAdapterFactory() {
}

@Override public CallAdapter<?> get(Type returnType) {
if (Utils.getRawType(returnType) != Observable.class) {
Class<?> rawType = Utils.getRawType(returnType);
boolean isSingle = "rx.Single".equals(rawType.getCanonicalName());
if (rawType != Observable.class && !isSingle) {
return null;
}
if (!(returnType instanceof ParameterizedType)) {
throw new IllegalStateException("Observable return type must be parameterized"
+ " as Observable<Foo> or Observable<? extends Foo>");
String name = isSingle ? "Single" : "Observable";
throw new IllegalStateException(name + " return type must be parameterized"
+ " as " + name + "<Foo> or " + name + "<? extends Foo>");
}

CallAdapter<Object> callAdapter = getCallAdapter(returnType);
if (isSingle) {
// Add Single-converter wrapper from a separate class. This defers classloading such that
// regular Observable operation can be leveraged without relying on this unstable RxJava API.
callAdapter = SingleHelper.makeSingle(callAdapter);
}
return callAdapter;
}

private CallAdapter<Object> getCallAdapter(Type returnType) {
Type observableType = Utils.getSingleParameterUpperBound((ParameterizedType) returnType);
Class<?> rawObservableType = Utils.getRawType(observableType);

if (rawObservableType == Response.class) {
if (!(observableType instanceof ParameterizedType)) {
throw new IllegalStateException("Response must be parameterized"
Expand All @@ -67,7 +79,7 @@ private ObservableCallAdapterFactory() {
return new ResultCallAdapter<>(responseType);
}

return new SimpleCallAdapter(observableType);
return new SimpleCallAdapter<>(observableType);
}

static final class CallOnSubscribe<T> implements Observable.OnSubscribe<Response<T>> {
Expand Down
35 changes: 35 additions & 0 deletions retrofit-adapters/rxjava/src/main/java/retrofit/SingleHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2015 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package retrofit;

import java.lang.reflect.Type;
import rx.Observable;
import rx.Single;

final class SingleHelper {
static CallAdapter<Object> makeSingle(final CallAdapter<Object> callAdapter) {
return new CallAdapter<Object>() {
@Override public Type responseType() {
return callAdapter.responseType();
}

@Override public Single<?> adapt(Call<Object> call) {
Observable<?> observable = (Observable<?>) callAdapter.adapt(call);
return observable.toSingle();
}
};
}
}
Loading

0 comments on commit 53942cc

Please sign in to comment.