Skip to content

Commit

Permalink
Merge pull request akka#16361 from smlin/more-informative-timeout-exc…
Browse files Browse the repository at this point in the history
…eption

=act akka#16361 Added more information to AskTimeoutException
  • Loading branch information
ktoso committed Nov 27, 2014
2 parents 9073b2c + dee5ad3 commit 978200f
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 18 deletions.
23 changes: 20 additions & 3 deletions akka-actor-tests/src/test/scala/akka/pattern/AskSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ class AskSpec extends AkkaSpec {
f.isCompleted should be(true)
intercept[IllegalArgumentException] {
Await.result(f, timeout.duration)
}.getMessage should be("Unsupported recipient ActorRef type, question not sent to [null]")
}.getMessage should be("Unsupported recipient ActorRef type, question not sent to [null]. Sender[null] sent the message of type \"java.lang.Double\".")
}

"return broken promises on 0 timeout" in {
implicit val timeout = Timeout(0 seconds)
val echo = system.actorOf(Props(new Actor { def receive = { case x sender() ! x } }))
val f = echo ? "foo"
val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo
val expectedMsg = "Timeout length must not be negative, question not sent to [%s]. Sender[null] sent the message of type \"java.lang.String\"." format echo
intercept[IllegalArgumentException] {
Await.result(f, timeout.duration)
}.getMessage should be(expectedMsg)
Expand All @@ -61,7 +61,7 @@ class AskSpec extends AkkaSpec {
implicit val timeout = Timeout(-1000 seconds)
val echo = system.actorOf(Props(new Actor { def receive = { case x sender() ! x } }))
val f = echo ? "foo"
val expectedMsg = "Timeout length must not be negative, question not sent to [%s]" format echo
val expectedMsg = "Timeout length must not be negative, question not sent to [%s]. Sender[null] sent the message of type \"java.lang.String\"." format echo
intercept[IllegalArgumentException] {
Await.result(f, timeout.duration)
}.getMessage should be(expectedMsg)
Expand All @@ -84,6 +84,23 @@ class AskSpec extends AkkaSpec {
}.getMessage should include(timeout.duration.toMillis.toString)
}

"include sender information in AskTimeout" in {
implicit val timeout = Timeout(0.5 seconds)
implicit val sender = system.actorOf(Props.empty)
val f = system.actorOf(Props.empty) ? "noreply"
intercept[AskTimeoutException] {
Await.result(f, 1 second)
}.getMessage.contains(sender.toString) should be(true)
}

"include message class information in AskTimeout" in {
implicit val timeout = Timeout(0.5 seconds)
val f = system.actorOf(Props.empty) ? "noreply"
intercept[AskTimeoutException] {
Await.result(f, 1 second)
}.getMessage.contains("\"java.lang.String\"") should be(true)
}

"work for ActorSelection" in {
implicit val timeout = Timeout(5 seconds)
import system.dispatcher
Expand Down
32 changes: 19 additions & 13 deletions akka-actor/src/main/scala/akka/pattern/AskSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ trait AskSupport {
*
*/
def ask(actorRef: ActorRef, message: Any)(implicit timeout: Timeout): Future[Any] = actorRef ? message
def ask(actorRef: ActorRef, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] =
actorRef.?(message)(timeout, sender)

/**
* Import this implicit conversion to gain `?` and `ask` methods on
Expand Down Expand Up @@ -119,50 +121,52 @@ trait AskSupport {
*
*/
def ask(actorSelection: ActorSelection, message: Any)(implicit timeout: Timeout): Future[Any] = actorSelection ? message
def ask(actorSelection: ActorSelection, message: Any, sender: ActorRef)(implicit timeout: Timeout): Future[Any] =
actorSelection.?(message)(timeout, sender)
}

/*
* Implementation class of the “ask” pattern enrichment of ActorRef
*/
final class AskableActorRef(val actorRef: ActorRef) extends AnyVal {

def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorRef match {
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = actorRef match {
case ref: InternalActorRef if ref.isTerminated
actorRef ! message
Future.failed[Any](new AskTimeoutException(s"Recipient[$actorRef] had already been terminated."))
Future.failed[Any](new AskTimeoutException(s"""Recipient[$actorRef] had already been terminated. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
case ref: InternalActorRef
if (timeout.duration.length <= 0)
Future.failed[Any](new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorRef]"))
Future.failed[Any](new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString)
val a = PromiseActorRef(ref.provider, timeout, targetName = actorRef.toString, message, sender)
actorRef.tell(message, a)
a.result.future
}
case _ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorRef]"))
case _ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorRef]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
}

def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = ask(message)(timeout, sender)
}

/*
* Implementation class of the “ask” pattern enrichment of ActorSelection
*/
final class AskableActorSelection(val actorSel: ActorSelection) extends AnyVal {

def ask(message: Any)(implicit timeout: Timeout): Future[Any] = actorSel.anchor match {
def ask(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = actorSel.anchor match {
case ref: InternalActorRef
if (timeout.duration.length <= 0)
Future.failed[Any](
new IllegalArgumentException(s"Timeout length must not be negative, question not sent to [$actorSel]"))
new IllegalArgumentException(s"""Timeout length must not be negative, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
else {
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel.toString)
val a = PromiseActorRef(ref.provider, timeout, targetName = actorSel.toString, message, sender)
actorSel.tell(message, a)
a.result.future
}
case _ Future.failed[Any](new IllegalArgumentException(s"Unsupported recipient ActorRef type, question not sent to [$actorSel]"))
case _ Future.failed[Any](new IllegalArgumentException(s"""Unsupported recipient ActorRef type, question not sent to [$actorSel]. Sender[$sender] sent the message of type "${message.getClass.getName}"."""))
}

def ?(message: Any)(implicit timeout: Timeout): Future[Any] = ask(message)(timeout)
def ?(message: Any)(implicit timeout: Timeout, sender: ActorRef = Actor.noSender): Future[Any] = ask(message)(timeout, sender)
}

/**
Expand Down Expand Up @@ -324,13 +328,15 @@ private[akka] object PromiseActorRef {
private case object Stopped
private final case class StoppedWithPath(path: ActorPath)

def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String): PromiseActorRef = {
def apply(provider: ActorRefProvider, timeout: Timeout, targetName: String, message: Any, sender: ActorRef = Actor.noSender): PromiseActorRef = {
val result = Promise[Any]()
val scheduler = provider.guardian.underlying.system.scheduler
val a = new PromiseActorRef(provider, result)
implicit val ec = a.internalCallingThreadExecutionContext
val messageClassName = message.getClass.getName
val f = scheduler.scheduleOnce(timeout.duration) {
result tryComplete Failure(new AskTimeoutException(s"Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]"))
result tryComplete Failure(
new AskTimeoutException(s"""Ask timed out on [$targetName] after [${timeout.duration.toMillis} ms]. Sender[$sender] sent message of type "$messageClassName"."""))
}
result.future onComplete { _ try a.stop() finally f.cancel() }
a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait GracefulStopSupport {
if (target.isTerminated) Future successful true
else {
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), targetName = target.toString)
val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout), targetName = target.toString, message = stopMessage)
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
target.tell(stopMessage, Actor.noSender)
ref.result.future.transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
if (target.isTerminated) Future successful SetThrottleAck
else {
val internalTarget = target.asInstanceOf[InternalActorRef]
val ref = PromiseActorRef(internalTarget.provider, timeout, target.toString)
val ref = PromiseActorRef(internalTarget.provider, timeout, target.toString, mode)
internalTarget.sendSystemMessage(Watch(internalTarget, ref))
target.tell(mode, ref)
ref.result.future.transform({
Expand Down

0 comments on commit 978200f

Please sign in to comment.