Skip to content

Commit

Permalink
RxJava 0.18
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed Apr 25, 2014
1 parent 65ad7ec commit f39fcdc
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import rx.Observable;
import rx.Observer;
import rx.util.functions.Action1;
import rx.functions.Action1;

import static com.netflix.hystrix.contrib.javanica.CommonUtils.getHystrixCommandByKey;
import static org.junit.Assert.assertEquals;
Expand Down
2 changes: 1 addition & 1 deletion hystrix-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ apply plugin: 'idea'

dependencies {
compile 'com.netflix.archaius:archaius-core:0.4.1'
compile 'com.netflix.rxjava:rxjava-core:0.17.3'
compile 'com.netflix.rxjava:rxjava-core:0.18.1'
compile 'org.slf4j:slf4j-api:1.7.0'
compile 'com.google.code.findbugs:jsr305:2.0.0'
testCompile 'junit:junit-dep:4.10'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ public HystrixExecutableBase<R> getCommand() {

@Override
public void call(Subscriber<? super R> observer) {
originalObservable.subscribe(observer);
originalObservable.unsafeSubscribe(observer);
}
});
this.command = command;
Expand All @@ -673,7 +673,7 @@ protected static class CachedObservableOriginal<R> extends ObservableCommand<R>

@Override
public void call(final Subscriber<? super R> observer) {
actual.subscribe(observer);
actual.unsafeSubscribe(observer);
}
}, command);
this.originalCommand = command;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,22 +315,22 @@ public void call() {
executionSemaphore.release();

}
}).subscribe(observer);
}).unsafeSubscribe(observer);
} catch (RuntimeException e) {
observer.onError(e);
}
} else {
metrics.markSemaphoreRejection();
logger.debug("HystrixCommand Execution Rejection by Semaphore."); // debug only since we're throwing the exception and someone higher will do something with it
// retrieve a fallback or throw an exception if no fallback available
getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution").subscribe(observer);
getFallbackOrThrowException(HystrixEventType.SEMAPHORE_REJECTED, FailureType.REJECTED_SEMAPHORE_EXECUTION, "could not acquire a semaphore for execution").unsafeSubscribe(observer);
}
} else {
// record that we are returning a short-circuited fallback
metrics.markShortCircuited();
// short-circuit and go directly to fallback (or throw an exception if no fallback implemented)
try {
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").subscribe(observer);
getFallbackOrThrowException(HystrixEventType.SHORT_CIRCUITED, FailureType.SHORTCIRCUIT, "short-circuited").unsafeSubscribe(observer);
} catch (Exception e) {
observer.onError(e);
}
Expand Down Expand Up @@ -441,11 +441,11 @@ public void call() {
executionHook.onThreadComplete(_self);
endCurrentThread.call();
}
}).subscribe(s);
}).unsafeSubscribe(s);
} catch (Throwable t) {
// the run() method is a user provided implementation so can throw instead of using Observable.onError
// so we catch it here and turn it into Observable.error
Observable.<R> error(t).subscribe(s);
Observable.<R> error(t).unsafeSubscribe(s);
}
}
}
Expand Down Expand Up @@ -798,7 +798,7 @@ public void run() {
* THREAD isolation pool as it may be saturated and that's the reason we're in fallback. The fallback logic
* should not perform IO and thus we run on the computation event loops.
*/
v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).subscribe(child);
v.subscribeOn(new HystrixContextScheduler(originalCommand.concurrencyStrategy, Schedulers.computation())).unsafeSubscribe(child);
} catch (HystrixRuntimeException re) {
child.onError(re);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,11 @@
package com.netflix.hystrix.strategy.concurrency;

import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;

import rx.Scheduler.Inner;
import rx.functions.Action1;
import rx.functions.Action0;
import rx.functions.Func2;

import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler.HystrixContextInnerScheduler;

/**
* Wrapper around {@link Func2} that manages the {@link HystrixRequestContext} initialization and cleanup for the execution of the {@link Func2}
Expand All @@ -33,30 +30,17 @@
*
* @ExcludeFromJavadoc
*/
public class HystrixContexSchedulerAction implements Action1<Inner> {
public class HystrixContexSchedulerAction implements Action0 {

private final Action1<Inner> actual;
private final Action0 actual;
private final HystrixRequestContext parentThreadState;
private final Callable<Void> c;

/*
* This is a workaround to needing to use Callable<Void> but
* needing to pass `Inner t1` into it after construction.
*
* Think of it like sticking t1 on the stack and then calling the function
* that uses them.
*
* This should all be thread-safe without issues despite multi-step execution
* because this Action0 is only ever executed once by Hystrix and construction will always
* precede `call` being invoked once.
*/
private final AtomicReference<Inner> t1Holder = new AtomicReference<Inner>();

public HystrixContexSchedulerAction(Action1<Inner> action) {
public HystrixContexSchedulerAction(Action0 action) {
this(HystrixPlugins.getInstance().getConcurrencyStrategy(), action);
}

public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action1<Inner> action) {
public HystrixContexSchedulerAction(final HystrixConcurrencyStrategy concurrencyStrategy, Action0 action) {
this.actual = action;
this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();

Expand All @@ -68,8 +52,8 @@ public Void call() throws Exception {
try {
// set the state of this thread to that of its parent
HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
// execute actual Action1<Inner> with the state of the parent
actual.call(new HystrixContextInnerScheduler(concurrencyStrategy, t1Holder.get()));
// execute actual Action0 with the state of the parent
actual.call();
return null;
} finally {
// restore this thread back to its original state
Expand All @@ -80,12 +64,11 @@ public Void call() throws Exception {
}

@Override
public void call(Inner inner) {
public void call() {
try {
this.t1Holder.set(inner);
c.call();
} catch (Exception e) {
throw new RuntimeException("Failed executing wrapped Func2", e);
throw new RuntimeException("Failed executing wrapped Action0", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import rx.functions.Action0;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;

import com.netflix.hystrix.HystrixThreadPool;
import com.netflix.hystrix.strategy.HystrixPlugins;
Expand All @@ -42,7 +44,7 @@ public HystrixContextScheduler(Scheduler scheduler) {
this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
this.threadPool = null;
}

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Scheduler scheduler) {
this.actualScheduler = scheduler;
this.concurrencyStrategy = concurrencyStrategy;
Expand All @@ -52,26 +54,22 @@ public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, S
public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = Schedulers.executor(threadPool.getExecutor());
this.actualScheduler = new ThreadPoolScheduler(threadPool);
}

@Override
public Subscription schedule(Action1<Inner> action) {
InnerHystrixContextScheduler inner = new InnerHystrixContextScheduler();
inner.schedule(action);
return inner;
public Worker createWorker() {
return new HystrixContextSchedulerWorker(actualScheduler.createWorker());
}

@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerHystrixContextScheduler inner = new InnerHystrixContextScheduler();
inner.schedule(action, delayTime, unit);
return inner;
}

private class InnerHystrixContextScheduler extends Inner {
private class HystrixContextSchedulerWorker extends Worker {

private BooleanSubscription s = new BooleanSubscription();
private final Worker worker;

private HystrixContextSchedulerWorker(Worker actualWorker) {
this.worker = actualWorker;
}

@Override
public void unsubscribe() {
Expand All @@ -84,56 +82,108 @@ public boolean isUnsubscribed() {
}

@Override
public void schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
actualScheduler.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
}

@Override
public void schedule(Action1<Inner> action) {
public Subscription schedule(Action0 action) {
if (threadPool != null) {
if (!threadPool.isQueueSpaceAvailable()) {
throw new RejectedExecutionException("Rejected command because thread-pool queueSize is at rejection threshold.");
}
}
actualScheduler.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
return worker.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
}

}

public static class HystrixContextInnerScheduler extends Inner {
private static class ThreadPoolScheduler extends Scheduler {

private final HystrixThreadPool threadPool;

public ThreadPoolScheduler(HystrixThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool);
}

private final HystrixConcurrencyStrategy concurrencyStrategy;
private final Inner actual;
}

HystrixContextInnerScheduler(HystrixConcurrencyStrategy concurrencyStrategy, Inner actual) {
this.concurrencyStrategy = concurrencyStrategy;
this.actual = actual;
/**
* Purely for scheduling work on a thread-pool.
* <p>
* This is not natively supported by RxJava as of 0.18.0 because thread-pools
* are contrary to sequential execution.
* <p>
* For the Hystrix case, each Command invocation has a single action so the concurrency
* issue is not a problem.
*/
private static class ThreadPoolWorker extends Worker {

private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();

public ThreadPoolWorker(HystrixThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public void unsubscribe() {
actual.unsubscribe();
subscription.unsubscribe();
}

@Override
public boolean isUnsubscribed() {
return actual.isUnsubscribed();
return subscription.isUnsubscribed();
}

@Override
public void schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
actual.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action), delayTime, unit);
public Subscription schedule(final Action0 action) {
if (subscription.isUnsubscribed()) {
// don't schedule, we are unsubscribed
return Subscriptions.empty();
}

final AtomicReference<Subscription> sf = new AtomicReference<Subscription>();
Subscription s = Subscriptions.from(threadPool.getExecutor().submit(new Runnable() {

@Override
public void run() {
try {
if (subscription.isUnsubscribed()) {
return;
}
action.call();
} finally {
// remove the subscription now that we're completed
Subscription s = sf.get();
if (s != null) {
subscription.remove(s);
}
}
}
}));

sf.set(s);
subscription.add(s);
return s;
}

@Override
public void schedule(Action1<Inner> action) {
actual.schedule(new HystrixContexSchedulerAction(concurrencyStrategy, action));
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
System.out.println("delayed scheduling");
throw new IllegalStateException("Hystrix does not support delayed scheduling");
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import rx.Observable;
import rx.Observer;
import rx.util.functions.Action1;
import rx.functions.Action1;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
Expand Down

0 comments on commit f39fcdc

Please sign in to comment.