Skip to content

Commit

Permalink
+act akka#15163 allows to suppress certain messages from being dead-l…
Browse files Browse the repository at this point in the history
…etter-ed

Conflicts:
	akka-actor/src/main/scala/akka/actor/ActorRef.scala
  • Loading branch information
ktoso committed Dec 1, 2014
1 parent 3cf00ce commit 73cd1d7
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

import akka.event.Logging
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestActors
import akka.testkit.TestProbe

import scala.concurrent.duration._

object DeadLetterSupressionSpec {

case object NormalMsg

case object SuppressedMsg extends DeadLetterSuppression

}

class DeadLetterSupressionSpec extends AkkaSpec with ImplicitSender {
import DeadLetterSupressionSpec._

val deadActor = system.actorOf(TestActors.echoActorProps)
watch(deadActor)
deadActor ! PoisonPill
expectTerminated(deadActor)

s"must suppress message from default dead-letters logging (sent to dead: ${Logging.simpleName(deadActor)})" in {
val deadListener = TestProbe()
system.eventStream.subscribe(deadListener.ref, classOf[DeadLetter])

val suppressedListener = TestProbe()
system.eventStream.subscribe(suppressedListener.ref, classOf[SuppressedDeadLetter])

val allListener = TestProbe()
system.eventStream.subscribe(allListener.ref, classOf[AllDeadLetters])

deadActor ! SuppressedMsg
deadActor ! NormalMsg

deadListener.expectMsg(DeadLetter(NormalMsg, testActor, deadActor))
deadListener.expectNoMsg(200.millis)

suppressedListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))
suppressedListener.expectNoMsg(200.millis)

allListener.expectMsg(SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))
allListener.expectMsg(DeadLetter(NormalMsg, testActor, deadActor))
allListener.expectNoMsg(200.millis)
}

s"must suppress message from default dead-letters logging (sent to dead: ${Logging.simpleName(system.deadLetters)})" in {
val deadListener = TestProbe()
system.eventStream.subscribe(deadListener.ref, classOf[DeadLetter])

val suppressedListener = TestProbe()
system.eventStream.subscribe(suppressedListener.ref, classOf[SuppressedDeadLetter])

val allListener = TestProbe()
system.eventStream.subscribe(allListener.ref, classOf[AllDeadLetters])

system.deadLetters ! SuppressedMsg
system.deadLetters ! NormalMsg

deadListener.expectMsg(200.millis, DeadLetter(NormalMsg, testActor, system.deadLetters))

suppressedListener.expectMsg(200.millis, SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))

allListener.expectMsg(200.millis, SuppressedDeadLetter(SuppressedMsg, testActor, system.deadLetters))
allListener.expectMsg(200.millis, DeadLetter(NormalMsg, testActor, system.deadLetters))

Thread.sleep(200)
deadListener.expectNoMsg(Duration.Zero)
suppressedListener.expectNoMsg(Duration.Zero)
allListener.expectNoMsg(Duration.Zero)
}
}

30 changes: 29 additions & 1 deletion akka-actor/src/main/scala/akka/actor/ActorRef.scala
Original file line number Diff line number Diff line change
Expand Up @@ -459,12 +459,37 @@ private[akka] trait MinimalActorRef extends InternalActorRef with LocalRef {
protected def writeReplace(): AnyRef = SerializedActorRef(this)
}

/** Subscribe to this class to be notified about all DeadLetters (also the supressed ones). */
sealed trait AllDeadLetters {
def message: Any
def sender: ActorRef
def recipient: ActorRef
}

/**
* When a message is sent to an Actor that is terminated before receiving the message, it will be sent as a DeadLetter
* to the ActorSystem's EventStream
*/
@SerialVersionUID(1L)
final case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) {
final case class DeadLetter(message: Any, sender: ActorRef, recipient: ActorRef) extends AllDeadLetters {
require(sender ne null, "DeadLetter sender may not be null")
require(recipient ne null, "DeadLetter recipient may not be null")
}

/**
* Use with caution: Messages extending this trait will not be logged by the default dead-letters listener.
* Instead they will be wrapped as [[SuppressedDeadLetter]] and may be subscribed for explicitly.
*/
trait DeadLetterSuppression

/**
* Similar to [[DeadLetter]] with the slight twist of NOT being logged by the default dead letters listener.
* Messages which end up being suppressed dead letters are internal messages for which ending up as dead-letter is both expected and harmless.
*
* It is possible to subscribe to suppressed dead letters on the ActorSystem's EventStream explicitly.
*/
@SerialVersionUID(1L)
final case class SuppressedDeadLetter(message: DeadLetterSuppression, sender: ActorRef, recipient: ActorRef) extends AllDeadLetters {
require(sender ne null, "DeadLetter sender may not be null")
require(recipient ne null, "DeadLetter recipient may not be null")
}
Expand Down Expand Up @@ -523,6 +548,9 @@ private[akka] class EmptyLocalActorRef(override val provider: ActorRefProvider,
eventStream.publish(DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
}
true
case m: DeadLetterSuppression
eventStream.publish(SuppressedDeadLetter(m, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
true
case _ false
}
}
Expand Down
29 changes: 28 additions & 1 deletion akka-docs/rst/java/code/docs/event/LoggingDocTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
package docs.event;

//#imports
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.AllDeadLetters;
import akka.actor.SuppressedDeadLetter;
import akka.event.Logging;
import akka.event.LoggingAdapter;

Expand Down Expand Up @@ -64,7 +68,30 @@ public void subscribeToDeadLetters() {
//#deadletters
JavaTestKit.shutdownActorSystem(system);
}


@Test
public void subscribeToSuppressedDeadLetters() {
final ActorSystem system = ActorSystem.create("SuppressedDeadLetters");
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));

//#suppressed-deadletters
system.eventStream().subscribe(actor, SuppressedDeadLetter.class);
//#suppressed-deadletters

JavaTestKit.shutdownActorSystem(system);
}
@Test
public void subscribeToAllDeadLetters() {
final ActorSystem system = ActorSystem.create("AllDeadLetters");
final ActorRef actor = system.actorOf(Props.create(DeadLetterActor.class));

//#all-deadletters
system.eventStream().subscribe(actor, AllDeadLetters.class);
//#all-deadletters

JavaTestKit.shutdownActorSystem(system);
}

@Test
public void demonstrateMultipleArgs() {
final ActorSystem system = ActorSystem.create("multiArg");
Expand Down
13 changes: 13 additions & 0 deletions akka-docs/rst/java/event-bus.rst
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,19 @@ which by default will publish the messages wrapped in :class:`DeadLetter`. This
wrapper holds the original sender, receiver and message of the envelope which
was redirected.

Some internal messages (marked with the :class:`DeadLetterSuppression` trait) will not end up as
dead letters like normal messages. These are by design safe and expected to sometimes arrive at a terminated actor
and since they are nothing to worry about, they are suppressed from the default dead letters logging mechanism.

However, in case you find yourself in need of debugging these kinds of low level suppressed dead letters,
it's still possible to subscribe to them explicitly:

.. includecode:: code/docs/event/LoggingDocTest.java#suppressed-deadletters

or all dead letters (including the suppressed ones):

.. includecode:: code/docs/event/LoggingDocTest.java#all-deadletters

Other Uses
----------

Expand Down
18 changes: 18 additions & 0 deletions akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
*/
package docs.event

import akka.actor.AllDeadLetters
import akka.testkit.AkkaSpec
import akka.actor.Actor
import akka.actor.Props
Expand Down Expand Up @@ -149,6 +150,23 @@ class LoggingDocSpec extends AkkaSpec {
}
}

"allow registration to suppressed dead letters" in {
new AnyRef {
import akka.actor.Props
val listener = system.actorOf(Props[MyActor])

//#suppressed-deadletters
import akka.actor.SuppressedDeadLetter
system.eventStream.subscribe(listener, classOf[SuppressedDeadLetter])
//#suppressed-deadletters

//#all-deadletters
import akka.actor.AllDeadLetters
system.eventStream.subscribe(listener, classOf[AllDeadLetters])
//#all-deadletters
}
}

"demonstrate logging more arguments" in {
//#array
val args = Array("The", "brown", "fox", "jumps", 42)
Expand Down
13 changes: 13 additions & 0 deletions akka-docs/rst/scala/event-bus.rst
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,19 @@ which by default will publish the messages wrapped in :class:`DeadLetter`. This
wrapper holds the original sender, receiver and message of the envelope which
was redirected.

Some internal messages (marked with the :class:`DeadLetterSuppression` trait) will not end up as
dead letters like normal messages. These are by design safe and expected to sometimes arrive at a terminated actor
and since they are nothing to worry about, they are suppressed from the default dead letters logging mechanism.

However, in case you find yourself in need of debugging these kinds of low level suppressed dead letters,
it's still possible to subscribe to them explicitly:

.. includecode:: code/docs/event/LoggingDocSpec.scala#suppressed-deadletters

or all dead letters (including the suppressed ones):

.. includecode:: code/docs/event/LoggingDocSpec.scala#all-deadletters

Other Uses
----------

Expand Down

0 comments on commit 73cd1d7

Please sign in to comment.