Skip to content

Commit

Permalink
Hooked up thread-interrupt behavior as defined by HystrixCommandPrope…
Browse files Browse the repository at this point in the history
…rties to HystrixContextScheduler

* There are now 2 varieties of HystrixContextScheduler, 1 that produces Actions which cause thread-interrupts on unsubscription
and 1 that produces Actions which never interrupt threads.
* The proper Scheduler gets chosen when the subscribeOn is wired into the command flow
  • Loading branch information
Matt Jacobs committed Feb 7, 2015
1 parent 24c01e8 commit a2ec5b4
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import rx.Notification;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Operator;
import rx.Producer;
import rx.Scheduler;
import rx.Subscriber;
import rx.functions.Action0;
import rx.functions.Action1;
Expand Down Expand Up @@ -521,8 +524,7 @@ public void call(Subscriber<? super R> s) {
getExecutionObservableWithLifecycle().unsafeSubscribe(s); //the getExecutionObservableWithLifecycle method already wraps sync exceptions, so no need to catch here
}
}

}).subscribeOn(threadPool.getScheduler());
}).subscribeOn(threadPool.getScheduler(properties.executionIsolationThreadInterruptOnTimeout().get()));
} else {
// semaphore isolated
executionHook.onRunStart(_self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public interface HystrixThreadPool {

public Scheduler getScheduler();

public Scheduler getScheduler(boolean shouldInterruptThread);

/**
* Mark when a thread begins executing a command.
*/
Expand Down Expand Up @@ -153,7 +155,8 @@ public interface HystrixThreadPool {
private final BlockingQueue<Runnable> queue;
private final ThreadPoolExecutor threadPool;
private final HystrixThreadPoolMetrics metrics;
private final Scheduler scheduler;
private final Scheduler nonInterruptingScheduler;
private final Scheduler interruptingScheduler;

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
Expand All @@ -164,7 +167,8 @@ public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThrea
concurrencyStrategy.getThreadPool(threadPoolKey, properties.coreSize(), properties.coreSize(), properties.keepAliveTimeMinutes(), TimeUnit.MINUTES, queue),
properties);
this.threadPool = metrics.getThreadPool();
this.scheduler = new HystrixContextScheduler(concurrencyStrategy, this);
this.nonInterruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, false);
this.interruptingScheduler = new HystrixContextScheduler(concurrencyStrategy, this, true);

/* strategy: HystrixMetricsPublisherThreadPool */
HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
Expand All @@ -178,8 +182,18 @@ public ThreadPoolExecutor getExecutor() {

@Override
public Scheduler getScheduler() {
//by default, interrupt underlying threads on timeout
return getScheduler(true);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
touchConfig();
return scheduler;
if (shouldInterruptThread) {
return interruptingScheduler;
} else {
return nonInterruptingScheduler;
}
}

// allow us to change things via fast-properties by setting it each time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,14 @@ public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, S
}

public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool) {
this(concurrencyStrategy, threadPool, true);
}


public HystrixContextScheduler(HystrixConcurrencyStrategy concurrencyStrategy, HystrixThreadPool threadPool, boolean shouldInterruptThread) {
this.concurrencyStrategy = concurrencyStrategy;
this.threadPool = threadPool;
this.actualScheduler = new ThreadPoolScheduler(threadPool);
this.actualScheduler = new ThreadPoolScheduler(threadPool, shouldInterruptThread);
}

@Override
Expand Down Expand Up @@ -101,14 +106,16 @@ public Subscription schedule(Action0 action) {
private static class ThreadPoolScheduler extends Scheduler {

private final HystrixThreadPool threadPool;
private final boolean shouldInterruptThread;

public ThreadPoolScheduler(HystrixThreadPool threadPool) {
public ThreadPoolScheduler(HystrixThreadPool threadPool, boolean shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
public Worker createWorker() {
return new ThreadPoolWorker(threadPool);
return new ThreadPoolWorker(threadPool, shouldInterruptThread);
}

}
Expand All @@ -126,9 +133,11 @@ private static class ThreadPoolWorker extends Worker {

private final HystrixThreadPool threadPool;
private final CompositeSubscription subscription = new CompositeSubscription();
private final boolean shouldInterruptThread;

public ThreadPoolWorker(HystrixThreadPool threadPool) {
public ThreadPoolWorker(HystrixThreadPool threadPool, boolean shouldInterruptThread) {
this.threadPool = threadPool;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
Expand All @@ -147,16 +156,19 @@ public Subscription schedule(final Action0 action) {
// don't schedule, we are unsubscribed
return Subscriptions.unsubscribed();
}


//Schedulers.submitTo(executor, action, subscription, shouldInterrupt);


// This is internal RxJava API but it is too useful.
ScheduledAction sa = new ScheduledAction(action);

subscription.add(sa);
sa.addParent(subscription);

Future<?> f = threadPool.getExecutor().submit(sa);
sa.add(f);

sa.add(new FutureCompleterWithConfigurableInterrupt(f, shouldInterruptThread));

return sa;
}
Expand All @@ -168,4 +180,26 @@ public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {

}

/**
* Very similar to rx.internal.schedulers.ScheduledAction.FutureCompleter, but with configurable interrupt behavior
*/
private static class FutureCompleterWithConfigurableInterrupt implements Subscription {
private final Future<?> f;
private final boolean shouldInterruptThread;

private FutureCompleterWithConfigurableInterrupt(Future<?> f, boolean shouldInterruptThread) {
this.f = f;
this.shouldInterruptThread = shouldInterruptThread;
}

@Override
public void unsubscribe() {
f.cancel(shouldInterruptThread);
}
@Override
public boolean isUnsubscribed() {
return f.isCancelled();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -1680,6 +1680,7 @@ public void testRejectedThreadUsingQueueSize() {
SingleThreadedPoolWithQueue pool = new SingleThreadedPoolWithQueue(10, 1);
// put 1 item in the queue
// the thread pool won't pick it up because we're bypassing the pool and adding to the queue directly so this will keep the queue full

pool.queue.add(new Runnable() {

@Override
Expand All @@ -1694,6 +1695,7 @@ public void run() {

});


TestCommandRejection command = null;
try {
// this should fail as we already have 1 in the queue
Expand Down Expand Up @@ -5295,30 +5297,82 @@ public void testExceptionConvertedToBadRequestExceptionInExecutionHookBypassesCi
@Test
public void testInterruptFutureOnTimeout() throws InterruptedException, ExecutionException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker());
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);

// when
Future<Boolean> f = cmd.queue();

// then
Thread.sleep(3000);
System.out.println("RESULT : " + f.get());
Thread.sleep(500);
assertTrue(cmd.hasBeenInterrupted());
}

@Test
public void testInterruptObservableOnTimeout() throws InterruptedException {
public void testInterruptObserveOnTimeout() throws InterruptedException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker());
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);

// when
cmd.observe().subscribe();

// then
Thread.sleep(3000);
Thread.sleep(500);
assertTrue(cmd.hasBeenInterrupted());
}

@Test
public void testInterruptToObservableOnTimeout() throws InterruptedException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), true);

// when
cmd.toObservable().subscribe();

// then
Thread.sleep(500);
assertTrue(cmd.hasBeenInterrupted());
}

@Test
public void testDoNotInterruptFutureOnTimeoutIfPropertySaysNotTo() throws InterruptedException, ExecutionException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), false);

// when
Future<Boolean> f = cmd.queue();

// then
Thread.sleep(500);
assertFalse(cmd.hasBeenInterrupted());
}

@Test
public void testDoNotInterruptObserveOnTimeoutIfPropertySaysNotTo() throws InterruptedException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), false);

// when
cmd.observe().subscribe();

// then
Thread.sleep(500);
assertFalse(cmd.hasBeenInterrupted());
}

@Test
public void testDoNotInterruptToObservableOnTimeoutIfPropertySaysNotTo() throws InterruptedException {
// given
InterruptibleCommand cmd = new InterruptibleCommand(new TestCircuitBreaker(), false);

// when
cmd.toObservable().subscribe();

// then
Thread.sleep(500);
assertFalse(cmd.hasBeenInterrupted());
}


/* ******************************************************************************** */
/* ******************************************************************************** */
/* private HystrixCommand class implementations for unit testing */
Expand Down Expand Up @@ -5791,6 +5845,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

@Override
public void markThreadExecution() {
// not used for this test
Expand Down Expand Up @@ -5831,6 +5890,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

@Override
public void markThreadExecution() {
// not used for this test
Expand Down Expand Up @@ -5928,7 +5992,7 @@ private static class TestSemaphoreCommandWithSlowFallback extends TestHystrixCom

private TestSemaphoreCommandWithSlowFallback(TestCircuitBreaker circuitBreaker, int fallbackSemaphoreExecutionCount, long fallbackSleep) {
super(testPropsBuilder().setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withFallbackIsolationSemaphoreMaxConcurrentRequests(fallbackSemaphoreExecutionCount)));
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter().withFallbackIsolationSemaphoreMaxConcurrentRequests(fallbackSemaphoreExecutionCount).withExecutionIsolationThreadInterruptOnTimeout(false)));
this.fallbackSleep = fallbackSleep;
}

Expand Down Expand Up @@ -6207,6 +6271,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

}));
this.completionLatch = completionLatch;
}
Expand Down Expand Up @@ -6317,11 +6386,11 @@ protected Boolean run() throws Exception {

private static class InterruptibleCommand extends TestHystrixCommand<Boolean> {

public InterruptibleCommand(TestCircuitBreaker circuitBreaker) {
public InterruptibleCommand(TestCircuitBreaker circuitBreaker, boolean shouldInterrupt) {
super(testPropsBuilder()
.setCircuitBreaker(circuitBreaker).setMetrics(circuitBreaker.metrics)
.setCommandPropertiesDefaults(HystrixCommandPropertiesTest.getUnitTestPropertiesSetter()
.withExecutionIsolationThreadInterruptOnTimeout(true)
.withExecutionIsolationThreadInterruptOnTimeout(shouldInterrupt)
.withExecutionIsolationThreadTimeoutInMilliseconds(100)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5506,6 +5506,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

})) {

@Override
Expand Down Expand Up @@ -7518,6 +7523,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

@Override
public void markThreadExecution() {
// not used for this test
Expand Down Expand Up @@ -7558,6 +7568,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

@Override
public void markThreadExecution() {
// not used for this test
Expand Down Expand Up @@ -8121,6 +8136,11 @@ public Scheduler getScheduler() {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this);
}

@Override
public Scheduler getScheduler(boolean shouldInterruptThread) {
return new HystrixContextScheduler(HystrixPlugins.getInstance().getConcurrencyStrategy(), this, shouldInterruptThread);
}

}));
this.completionLatch = completionLatch;
}
Expand Down

0 comments on commit a2ec5b4

Please sign in to comment.