Skip to content

Commit

Permalink
Add 'replyWhileStopped' param to BackoffSupervisor
Browse files Browse the repository at this point in the history
  • Loading branch information
chris-martin authored and richardimaoka committed May 9, 2017
1 parent de668e8 commit f5a9422
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,8 @@ class BackoffSupervisorSpec extends AkkaSpec with ImplicitSender {
.withSupervisorStrategy(restartingStrategy)))
}
}

"Reply " in {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import akka.actor.SupervisorStrategy._
* with ``akka.pattern.Backoff.onFailure``.
*/
private class BackoffOnRestartSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: OneForOneStrategy)
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: OneForOneStrategy,
val replyWhileStopped: Option[Any])
extends Actor with HandleBackoff
with ActorLogging {

Expand Down
18 changes: 15 additions & 3 deletions akka-actor/src/main/scala/akka/pattern/BackoffOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import akka.actor.{ Props, OneForOneStrategy, SupervisorStrategy }
* case e: RetryableException => Restart
* }
* )
* .withReplyWhileStopped(TheSystemIsDown)
*
* }}}
*/
Expand Down Expand Up @@ -177,6 +178,15 @@ trait BackoffOptions {
*/
def withDefaultStoppingStrategy: BackoffOptions

/**
* Returns a new BackoffOptions with a constant reply to messages that the supervisor receives while its
* child is stopped. By default, a message received while the child is stopped is forwarded to `deadLetters`.
* With this option, the supervisor will reply to the sender instead.
* @param replyWhileStopped The message that the supervisor will send in response to all messages while
* its child is stopped.
*/
def withReplyWhileStopped(replyWhileStopped: Any): BackoffOptions

/**
* Returns the props to create the back-off supervisor.
*/
Expand All @@ -191,14 +201,16 @@ private final case class BackoffOptionsImpl(
maxBackoff: FiniteDuration,
randomFactor: Double,
reset: Option[BackoffReset] = None,
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider)) extends BackoffOptions {
supervisorStrategy: OneForOneStrategy = OneForOneStrategy()(SupervisorStrategy.defaultStrategy.decider),
replyWhileStopped: Option[Any] = None) extends BackoffOptions {

val backoffReset = reset.getOrElse(AutoReset(minBackoff))

def withAutoReset(resetBackoff: FiniteDuration) = copy(reset = Some(AutoReset(resetBackoff)))
def withManualReset = copy(reset = Some(ManualReset))
def withSupervisorStrategy(supervisorStrategy: OneForOneStrategy) = copy(supervisorStrategy = supervisorStrategy)
def withDefaultStoppingStrategy = copy(supervisorStrategy = OneForOneStrategy()(SupervisorStrategy.stoppingStrategy.decider))
def withReplyWhileStopped(replyWhileStopped: Any) = copy(replyWhileStopped = Some(replyWhileStopped))

def props = {
require(minBackoff > Duration.Zero, "minBackoff must be > 0")
Expand All @@ -212,9 +224,9 @@ private final case class BackoffOptionsImpl(

backoffType match {
case RestartImpliesFailure
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy))
Props(new BackoffOnRestartSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped))
case StopImpliesFailure
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy))
Props(new BackoffSupervisor(childProps, childName, minBackoff, maxBackoff, backoffReset, randomFactor, supervisorStrategy, replyWhileStopped))
}
}
}
Expand Down
25 changes: 14 additions & 11 deletions akka-actor/src/main/scala/akka/pattern/BackoffSupervisor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,14 @@ object BackoffSupervisor {
* with `Backoff.onStop`.
*/
final class BackoffSupervisor(
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy)
val childProps: Props,
val childName: String,
minBackoff: FiniteDuration,
maxBackoff: FiniteDuration,
val reset: BackoffReset,
randomFactor: Double,
strategy: SupervisorStrategy,
val replyWhileStopped: Option[Any])
extends Actor with HandleBackoff {

import BackoffSupervisor._
Expand All @@ -203,7 +204,7 @@ final class BackoffSupervisor(
maxBackoff: FiniteDuration,
randomFactor: Double,
supervisorStrategy: SupervisorStrategy) =
this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy)
this(childProps, childName, minBackoff, maxBackoff, AutoReset(minBackoff), randomFactor, supervisorStrategy, None)

// for binary compatibility with 2.4.0
def this(
Expand All @@ -229,6 +230,7 @@ private[akka] trait HandleBackoff { this: Actor ⇒
def childProps: Props
def childName: String
def reset: BackoffReset
def replyWhileStopped: Option[Any]

var child: Option[ActorRef] = None
var restartCount = 0
Expand Down Expand Up @@ -274,9 +276,10 @@ private[akka] trait HandleBackoff { this: Actor ⇒
// use the BackoffSupervisor as sender
context.parent ! msg

case msg child match {
case Some(c) c.forward(msg)
case None context.system.deadLetters.forward(msg)
case msg (child, replyWhileStopped) match {
case (Some(c), _) c.forward(msg)
case (_, Some(r)) sender ! r
case _ context.system.deadLetters.forward(msg)
}
}
}

0 comments on commit f5a9422

Please sign in to comment.