Skip to content

Commit

Permalink
Made circuit-opening happen in background, as health counts stream pr…
Browse files Browse the repository at this point in the history
…oduces new values

* This allows for methods which imply reading the circuit-breaker status (isOpen/allowRequest) to be idempotent
* Added new method (attemptExecution), which command internally uses to actually manipulate status
  • Loading branch information
mattrjacobs committed May 7, 2017
1 parent d3e0991 commit c9241ac
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
executionHook.onStart(_cmd);

/* determine if we're allowed to execute */
if (circuitBreaker.allowRequest()) {
if (circuitBreaker.attemptExecution()) {
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
Expand Down Expand Up @@ -601,6 +601,7 @@ public void call() {
final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
@Override
public Observable<R> call(Throwable t) {
circuitBreaker.markNonSuccess();
Exception e = getExceptionFromThrowable(t);
executionResult = executionResult.setExecutionException(e);
if (e instanceof RejectedExecutionException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,58 @@
package com.netflix.hystrix;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import com.netflix.hystrix.HystrixCommandMetrics.HealthCounts;
import rx.Subscriber;
import rx.Subscription;

/**
* Circuit-breaker logic that is hooked into {@link HystrixCommand} execution and will stop allowing executions if failures have gone past the defined threshold.
* <p>
* It will then allow single retries after a defined sleepWindow until the execution succeeds at which point it will again close the circuit and allow executions again.
* The default (and only) implementation will then allow a single retry after a defined sleepWindow until the execution
* succeeds at which point it will again close the circuit and allow executions again.
*/
public interface HystrixCircuitBreaker {

/**
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not.
* <p>
* This takes into account the half-open logic which allows some requests through when determining if it should be closed again.
* Every {@link HystrixCommand} requests asks this if it is allowed to proceed or not. It is idempotent and does
* not modify any internal state, and takes into account the half-open logic which allows some requests through
* after the circuit has been opened
*
* @return boolean whether a request should be permitted
*/
public boolean allowRequest();
boolean allowRequest();

/**
* Whether the circuit is currently open (tripped).
*
* @return boolean state of circuit breaker
*/
public boolean isOpen();
boolean isOpen();

/**
* Invoked on successful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
/* package */void markSuccess();
void markSuccess();

/**
* Invoked on unsuccessful executions from {@link HystrixCommand} as part of feedback mechanism when in a half-open state.
*/
void markNonSuccess();

/**
* Invoked at start of command execution to attempt an execution. This is non-idempotent - it may modify internal
* state.
*/
boolean attemptExecution();

/**
* @ExcludeFromJavadoc
* @ThreadSafe
*/
public static class Factory {
class Factory {
// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();

Expand Down Expand Up @@ -114,102 +128,161 @@ public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
}
}


/**
* The default production implementation of {@link HystrixCircuitBreaker}.
*
* @ExcludeFromJavadoc
* @ThreadSafe
*/
/* package */static class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
/* package */class HystrixCircuitBreakerImpl implements HystrixCircuitBreaker {
private final HystrixCommandProperties properties;
private final HystrixCommandMetrics metrics;

/* track whether this circuit is open/closed at any given point in time (default to false==closed) */
private AtomicBoolean circuitOpen = new AtomicBoolean(false);
enum Status {
CLOSED, OPEN, HALF_OPEN;
}

/* when the circuit was marked open or was last allowed to try a 'singleTest' */
private AtomicLong circuitOpenedOrLastTestedTime = new AtomicLong();
private final AtomicReference<Status> status = new AtomicReference<Status>(Status.CLOSED);
private final AtomicLong circuitOpened = new AtomicLong(-1);
private final AtomicReference<Subscription> activeSubscription = new AtomicReference<Subscription>(null);

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, final HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
this.properties = properties;
this.metrics = metrics;

//On a timer, this will set the circuit between OPEN/CLOSED as command executions occur
Subscription s = subscribeToStream();
activeSubscription.set(s);
}

private Subscription subscribeToStream() {
/*
* This stream will recalculate the OPEN/CLOSED status on every onNext from the health stream
*/
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber<HealthCounts>() {
@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(HealthCounts hc) {
// check if we are past the statisticalWindowVolumeThreshold
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
//we are not past the minimum error threshold for the stat window,
// so no change to circuit status.
// if it was CLOSED, it stays CLOSED
// if it was half-open, we need to wait for a successful command execution
// if it was open, we need to wait for sleep window to elapse
} else {
// our failure rate is too high, we need to set the state to OPEN
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}

@Override
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
//win the thread race to reset metrics
//Unsubscribe from the current stream to reset the health counts stream. This only affects the health counts view,
//and all other metric consumers are unaffected by the reset
metrics.resetStream();
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//This thread wins the race to close the circuit - it resets the stream to start it over from 0
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
circuitOpened.set(-1L);
}
}

@Override
public void markNonSuccess() {
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//This thread wins the race to re-open the circuit - it resets the start time for the sleep window
circuitOpened.set(System.currentTimeMillis());
}
}

@Override
public boolean isOpen() {
if (properties.circuitBreakerForceOpen().get()) {
return true;
}
if (properties.circuitBreakerForceClosed().get()) {
return false;
}
return circuitOpened.get() >= 0;
}

@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}

public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
if (circuitOpened.get() == -1) {
return true;
} else {
if (status.get().equals(Status.HALF_OPEN)) {
return false;
} else {
return isAfterSleepWindow();
}
}
return false;
}

@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}

// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
private boolean isAfterSleepWindow() {
final long circuitOpenTime = circuitOpened.get();
final long currentTime = System.currentTimeMillis();
final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();
return currentTime > circuitOpenTime + sleepWindowTime;
}

// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
@Override
public boolean attemptExecution() {
if (properties.circuitBreakerForceOpen().get()) {
return false;
}

if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
if (circuitOpened.get() == -1) {
return true;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
if (isAfterSleepWindow()) {
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
//only the first request after sleep window should execute
return true;
} else {
return false;
}
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
return false;
}
}
}

}

/**
Expand All @@ -234,6 +307,15 @@ public void markSuccess() {

}

@Override
public void markNonSuccess() {

}

@Override
public boolean attemptExecution() {
return true;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,10 @@ public int getCurrentConcurrentExecutionCount() {
}
}

/* package-private */ HealthCountsStream getHealthCountsStream() {
return healthCountsStream;
}

/**
* Retrieve a snapshot of total requests, error count and error percentage.
*
Expand Down
Loading

0 comments on commit c9241ac

Please sign in to comment.