Skip to content

Commit

Permalink
iss1403: Support @HystrixCommand for rx.Single and rx.Completable sim…
Browse files Browse the repository at this point in the history
…ilar to rx.Observable
  • Loading branch information
dmgcodevil committed Feb 21, 2017
1 parent c674545 commit 8715b71
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import java.lang.reflect.Method;
Expand Down Expand Up @@ -109,8 +111,8 @@ public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinP
return result;
}

private Observable executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return ((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
private Object executeObservable(HystrixInvokable invokable, ExecutionType executionType, final MetaHolder metaHolder) {
return mapObservable(((Observable) CommandExecutor.execute(invokable, executionType, metaHolder))
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
Expand All @@ -122,7 +124,16 @@ public Observable call(Throwable throwable) {
}
return Observable.error(throwable);
}
});
}), metaHolder);
}

private Object mapObservable(Observable observable, final MetaHolder metaHolder) {
if (Completable.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
return observable.toCompletable();
} else if (Single.class.isAssignableFrom(metaHolder.getMethod().getReturnType())) {
return observable.toSingle();
}
return observable;
}

private Throwable hystrixRuntimeExceptionToThrowable(MetaHolder metaHolder, HystrixRuntimeException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
*/
package com.netflix.hystrix.contrib.javanica.command;

import com.google.common.collect.ImmutableSet;
import rx.Completable;
import rx.Observable;
import rx.Single;

import java.util.Set;
import java.util.concurrent.Future;

/**
Expand All @@ -39,6 +43,9 @@ public enum ExecutionType {
*/
OBSERVABLE;

// RX types
private static final Set<? extends Class> RX_TYPES = ImmutableSet.of(Observable.class, Single.class, Completable.class);

/**
* Gets execution type for specified class type.
* @param type the type
Expand All @@ -47,11 +54,19 @@ public enum ExecutionType {
public static ExecutionType getExecutionType(Class<?> type) {
if (Future.class.isAssignableFrom(type)) {
return ExecutionType.ASYNCHRONOUS;
} else if (Observable.class.isAssignableFrom(type)) {
} else if (isRxType(type)) {
return ExecutionType.OBSERVABLE;
} else {
return ExecutionType.SYNCHRONOUS;
}
}

private static boolean isRxType(Class<?> cl) {
for (Class<?> rxType : RX_TYPES) {
if (rxType.isAssignableFrom(cl)) {
return true;
}
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import com.netflix.hystrix.exception.HystrixBadRequestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Completable;
import rx.Observable;
import rx.Single;
import rx.functions.Func1;

import javax.annotation.concurrent.ThreadSafe;
Expand Down Expand Up @@ -67,7 +69,8 @@ public GenericObservableCommand(HystrixCommandBuilder builder) {
protected Observable construct() {
Observable result;
try {
result = ((Observable) commandActions.getCommandAction().execute(executionType))
Observable observable = toObservable(commandActions.getCommandAction().execute(executionType));
result = observable
.onErrorResumeNext(new Func1<Throwable, Observable>() {
@Override
public Observable call(Throwable throwable) {
Expand Down Expand Up @@ -157,4 +160,16 @@ boolean isIgnorable(Throwable throwable) {
}
return false;
}

private Observable toObservable(Object obj) {
if (Observable.class.isAssignableFrom(obj.getClass())) {
return (Observable) obj;
} else if (Completable.class.isAssignableFrom(obj.getClass())) {
return ((Completable) obj).toObservable();
} else if (Single.class.isAssignableFrom(obj.getClass())) {
return ((Single) obj).toObservable();
} else {
throw new IllegalStateException("unsupported rx type: " + obj.getClass());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Test;
import rx.Completable;
import rx.Observable;
import rx.Observer;
import rx.Single;
import rx.Subscriber;
import rx.functions.Action1;
import rx.subjects.ReplaySubject;
Expand Down Expand Up @@ -90,6 +92,19 @@ public void call(User user) {
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testCompletable(){
userService.getUserCompletable("1", "name: ");
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserCompletable");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testSingle(){
userService.getUserSingle("1", "name: ");
com.netflix.hystrix.HystrixInvokableInfo getUserCommand = getHystrixCommandByKey("getUserSingle");
assertTrue(getUserCommand.getExecutionEvents().contains(HystrixEventType.SUCCESS));
}

@Test
public void testGetUserWithRegularFallback() {
Expand Down Expand Up @@ -163,6 +178,18 @@ public Observable<User> getUser(final String id, final String name) {
return createObservable(id, name);
}

@HystrixCommand
public Completable getUserCompletable(final String id, final String name) {
validate(id, name, "getUser has failed");
return createObservable(id, name).toCompletable();
}

@HystrixCommand
public Single getUserSingle(final String id, final String name) {
validate(id, name, "getUser has failed");
return createObservable(id, name).toSingle();
}

@HystrixCommand(fallbackMethod = "regularFallback", observableExecutionMode = ObservableExecutionMode.LAZY)
public Observable<User> getUserRegularFallback(final String id, final String name) {
validate(id, name, "getUser has failed");
Expand Down

0 comments on commit 8715b71

Please sign in to comment.