Skip to content

Commit

Permalink
Merge pull request akka#22586 from akka/wip-22295-circuit-breaker-imp…
Browse files Browse the repository at this point in the history
…rovement2-patriknw

22295 Allow user define what is failure in terms on circuit breaker
  • Loading branch information
patriknw authored Mar 17, 2017
2 parents edee6ae + c879c03 commit ef3b0f7
Show file tree
Hide file tree
Showing 4 changed files with 277 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import static org.junit.Assert.assertEquals;

Expand All @@ -39,4 +41,19 @@ public void useCircuitBreakerWithCompletableFuture() throws Exception {
assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds));
}

@Test
public void useCircuitBreakerWithCompletableFutureAndCustomDefineFailure() throws Exception {
final FiniteDuration fiveSeconds = FiniteDuration.create(5, TimeUnit.SECONDS);
final FiniteDuration fiveHundredMillis = FiniteDuration.create(500, TimeUnit.MILLISECONDS);
final CircuitBreaker breaker = new CircuitBreaker(system.dispatcher(), system.scheduler(), 1, fiveSeconds, fiveHundredMillis);

final BiFunction<Optional<String>, Optional<Throwable>, java.lang.Boolean> fn =
(result, err) -> (result.isPresent() && result.get().equals("hello"));

final CompletableFuture<String> f = new CompletableFuture<>();
f.complete("hello");
final CompletionStage<String> res = breaker.callWithCircuitBreakerCS(() -> f, fn);
assertEquals("hello", Await.result(FutureConverters.toScala(res), fiveSeconds));
assertEquals(1, breaker.currentFailureCount());
}
}
124 changes: 120 additions & 4 deletions akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
package akka.pattern

import akka.actor.ActorSystem
import language.postfixOps
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException }
import scala.util.{ Try, Success, Failure }
import akka.testkit._
import org.mockito.ArgumentCaptor
import org.scalatest.BeforeAndAfter
import org.scalatest.mockito.MockitoSugar
import scala.concurrent.duration._
import scala.concurrent.{ Await, ExecutionContext, Future, TimeoutException }
import scala.language.postfixOps
import org.mockito.Mockito._

object CircuitBreakerSpec {
Expand Down Expand Up @@ -71,6 +72,11 @@ object CircuitBreakerSpec {

def nonOneFactorCb()(implicit system: ActorSystem, ec: ExecutionContext): Breaker =
new Breaker(new CircuitBreaker(system.scheduler, 1, 2000.millis.dilated, 1000.millis.dilated, 1.day.dilated, 5))

val evenNumberIsFailure: Try[Int] Boolean = {
case Success(i) i % 2 == 0
case _ true
}
}

class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar {
Expand Down Expand Up @@ -159,6 +165,19 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.closedLatch)
}

"pass through next call and close on exception" when {
"exception is defined as call succeeded" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)

val allReturnIsSuccess: Try[String] Boolean = _ false

intercept[TestException] { breaker().withSyncCircuitBreaker(throwException, allReturnIsSuccess) }
checkLatch(breaker.closedLatch)
}
}

"open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
Expand All @@ -168,6 +187,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch)
}

"open on even number" when {
"even number is defined as failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset
breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure)
checkLatch(breaker.openLatch)
}
}

"open on calling fail method" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
Expand Down Expand Up @@ -270,6 +300,18 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().currentFailureCount should ===(1)
}

"increment failure count on even number" when {
"even number is considered failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should ===(0)
val result = breaker().withSyncCircuitBreaker(2, CircuitBreakerSpec.evenNumberIsFailure)
checkLatch(breaker.openLatch)

breaker().currentFailureCount should ===(1)
result should ===(2)
}
}

"increment failure count on fail method" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should ===(0)
Expand All @@ -290,6 +332,30 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().currentFailureCount should ===(0)
}

"reset failure count after exception in call" when {
"exception is defined as Success" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().currentFailureCount should ===(0)
intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" })
}
breaker().currentFailureCount should ===(1)

val harmlessException = new TestException
val harmlessExceptionAsSuccess: Try[String] Boolean = {
case Success(_) false
case Failure(ex) ex != harmlessException
}

intercept[TestException] {
breaker().withSyncCircuitBreaker(throw harmlessException, harmlessExceptionAsSuccess)
}

breaker().currentFailureCount should ===(0)
}
}

"reset failure count after success method" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().currentFailureCount should ===(0)
Expand Down Expand Up @@ -446,6 +512,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.closedLatch)
}

"pass through next call and close on exception" when {
"exception is defined as call succeeded" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
checkLatch(breaker.halfOpenLatch)
val allReturnIsSuccess: Try[String] Boolean = _ false
Await.ready(breaker().withCircuitBreaker(Future(throwException), allReturnIsSuccess), awaitTimeout)
checkLatch(breaker.closedLatch)
}
}

"re-open on exception in call" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
Expand All @@ -455,6 +532,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch)
}

"re-open on even number" when {
"even number is defined as failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
intercept[TestException] { breaker().withSyncCircuitBreaker(throwException) }
checkLatch(breaker.halfOpenLatch)
breaker.openLatch.reset
Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout)
checkLatch(breaker.openLatch)
}
}

"re-open on async failure" in {
val breaker = CircuitBreakerSpec.shortResetTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
Expand Down Expand Up @@ -549,6 +637,17 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
breaker().currentFailureCount should ===(1)
}

"increment failure count on even number" when {
"even number is considered failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().currentFailureCount should ===(0)
val result = Await.result(breaker().withCircuitBreaker(Future(2), CircuitBreakerSpec.evenNumberIsFailure), awaitTimeout)
checkLatch(breaker.openLatch)
breaker().currentFailureCount should ===(1)
result should ===(2)
}
}

"increment failure count on async failure" in {
val breaker = CircuitBreakerSpec.longCallTimeoutCb()
breaker().withCircuitBreaker(Future(throwException))
Expand All @@ -565,6 +664,24 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
awaitCond(breaker().currentFailureCount == 0, awaitTimeout)
}

"reset failure count after exception in call" when {
"exception is defined as Success" in {
val breaker = CircuitBreakerSpec.multiFailureCb()
breaker().withCircuitBreaker(Future(sayHi))
for (n 1 to 4) breaker().withCircuitBreaker(Future(throwException))
awaitCond(breaker().currentFailureCount == 4, awaitTimeout)

val harmlessException = new TestException
val harmlessExceptionAsSuccess: Try[String] Boolean = {
case Success(_) false
case Failure(ex) ex != harmlessException
}

breaker().withCircuitBreaker(Future(throw harmlessException), harmlessExceptionAsSuccess)
awaitCond(breaker().currentFailureCount == 0, awaitTimeout)
}
}

"increment failure count on callTimeout" in {
val breaker = CircuitBreakerSpec.shortCallTimeoutCb()

Expand Down Expand Up @@ -625,5 +742,4 @@ class CircuitBreakerSpec extends AkkaSpec with BeforeAndAfter with MockitoSugar
checkLatch(breaker.openLatch)
}
}

}
Loading

0 comments on commit ef3b0f7

Please sign in to comment.