diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java index e526c3c34..a09aaaa03 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/AbstractCommand.java @@ -382,6 +382,7 @@ public void call() { final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call() { + circuitBreaker.markNonSuccess(); if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); diff --git a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java index 29e8f7899..93c542993 100644 --- a/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java +++ b/hystrix-core/src/main/java/com/netflix/hystrix/HystrixCircuitBreaker.java @@ -272,8 +272,11 @@ public boolean attemptExecution() { return true; } else { if (isAfterSleepWindow()) { + //only the first request after sleep window should execute + //if the executing command succeeds, the status will transition to CLOSED + //if the executing command fails, the status will transition to OPEN + //if the executing command gets unsubscribed, the status will transition to OPEN if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) { - //only the first request after sleep window should execute return true; } else { return false; diff --git a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java index b0d0b13f2..bdfcd5588 100644 --- a/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java +++ b/hystrix-core/src/test/java/com/netflix/hystrix/HystrixCircuitBreakerTest.java @@ -32,6 +32,7 @@ import com.netflix.hystrix.strategy.HystrixPlugins; import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; import rx.Observable; +import rx.Subscription; /** * These tests each use a different command key to ensure that running them in parallel doesn't allow the state @@ -565,6 +566,56 @@ public void testLowVolumeDoesNotTripCircuit() { } } + @Test + public void testUnsubscriptionDoesNotLeaveCircuitStuckHalfOpen() { + String key = "cmd-J"; + try { + int sleepWindow = 200; + + // fail + HystrixCommand cmd1 = new FailureCommand(key, 1, sleepWindow); + HystrixCommand cmd2 = new FailureCommand(key, 1, sleepWindow); + HystrixCommand cmd3 = new FailureCommand(key, 1, sleepWindow); + HystrixCommand cmd4 = new FailureCommand(key, 1, sleepWindow); + cmd1.execute(); + cmd2.execute(); + cmd3.execute(); + cmd4.execute(); + + HystrixCircuitBreaker cb = cmd1.circuitBreaker; + + // everything has failed in the test window so we should return false now + Thread.sleep(100); + assertFalse(cb.allowRequest()); + assertTrue(cb.isOpen()); + + //this should occur after the sleep window, so get executed + //however, it is unsubscribed, so never updates state on the circuit-breaker + HystrixCommand cmd5 = new SuccessCommand(key, 5000, sleepWindow); + + //wait for sleep window to pass + Thread.sleep(sleepWindow + 50); + + Observable o = cmd5.observe(); + Subscription s = o.subscribe(); + s.unsubscribe(); + + //wait for 10 sleep windows, then try a successful command. this should return the circuit to CLOSED + + Thread.sleep(10 * sleepWindow); + HystrixCommand cmd6 = new SuccessCommand(key, 1, sleepWindow); + cmd6.execute(); + + Thread.sleep(100); + assertTrue(cb.allowRequest()); + assertFalse(cb.isOpen()); + + } catch (Exception e) { + e.printStackTrace(); + fail("Error occurred: " + e.getMessage()); + } + } + /** * Utility method for creating {@link HystrixCommandMetrics} for unit tests. */