Skip to content

Commit

Permalink
=act #3562: Do not generate Terminated when unwatched in preRestart
Browse files Browse the repository at this point in the history
 - also avoid duplicate DWN for parent
  • Loading branch information
drewhk committed Jan 30, 2014
1 parent 2416be7 commit b0ab6f4
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 5 deletions.
99 changes: 99 additions & 0 deletions akka-actor-tests/src/test/scala/akka/actor/UidClashTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor

import akka.testkit.{ TestProbe, AkkaSpec }
import akka.actor.SupervisorStrategy.{ Restart, Stop }
import akka.dispatch.sysmsg.SystemMessage
import akka.event.EventStream
import scala.util.control.NoStackTrace

object UidClashTest {

class TerminatedForNonWatchedActor extends Exception("Received Terminated for actor that was not actually watched")
with NoStackTrace

@volatile var oldActor: ActorRef = _

class EvilCollidingActorRef(override val provider: ActorRefProvider,
override val path: ActorPath,
val eventStream: EventStream) extends MinimalActorRef {

//Ignore everything
override def isTerminated(): Boolean = true
override def sendSystemMessage(message: SystemMessage): Unit = ()
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = ()
}

def createCollidingRef(system: ActorSystem): ActorRef =
new EvilCollidingActorRef(system.asInstanceOf[ActorSystemImpl].provider, oldActor.path, system.eventStream)

case object PleaseRestart
case object PingMyself
case object RestartedSafely

class RestartedActor extends Actor {

def receive = {
case PleaseRestart throw new Exception("restart")
case Terminated(ref) throw new TerminatedForNonWatchedActor
// This is the tricky part to make this test a positive one (avoid expectNoMsg).
// Since anything enqueued in postRestart will arrive before the Terminated
// the bug triggers, there needs to be a bounce:
// 1. Ping is sent from postRestart to self
// 2. As a response to pint, RestartedSafely is sent to self
// 3a. if Terminated was enqueued during the restart procedure it will arrive before the RestartedSafely message
// 3b. otherwise only the RestartedSafely message arrives
case PingMyself self ! RestartedSafely
case RestartedSafely context.parent ! RestartedSafely
}

override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
context.children foreach { child
oldActor = child
context.unwatch(child)
context.stop(child)
}
}

override def preStart(): Unit = context.watch(context.actorOf(Props.empty, "child"))

override def postRestart(reason: Throwable): Unit = {
context.watch(createCollidingRef(context.system))
self ! PingMyself
} // Simulate UID clash
}

class RestartingActor(probe: ActorRef) extends Actor {
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case _: TerminatedForNonWatchedActor
context.stop(self)
Stop
case _ Restart
}
val theRestartedOne = context.actorOf(Props[RestartedActor], "theRestartedOne")

def receive = {
case PleaseRestart theRestartedOne ! PleaseRestart
case RestartedSafely probe ! RestartedSafely
}
}

}

class UidClashTest extends AkkaSpec {
import UidClashTest._

"The Terminated message for an old child stopped in preRestart" should {
"not arrive after restart" in {
val watcher = TestProbe()
val topActor = system.actorOf(Props(classOf[RestartingActor], watcher.ref), "top")
watcher.watch(topActor)

topActor ! PleaseRestart
watcher.expectMsg(RestartedSafely)
}
}

}
9 changes: 5 additions & 4 deletions akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
* it will be propagated to user's receive.
*/
protected def watchedActorTerminated(actor: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = {
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
if (watchingContains(actor)) {
maintainAddressTerminatedSubscription(actor) {
watching = removeFromSet(actor, watching)
Expand All @@ -60,6 +59,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
terminatedQueuedFor(actor)
}
}
if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor)
}

private[akka] def terminatedQueuedFor(subject: ActorRef): Unit =
Expand All @@ -77,12 +77,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject)
else set filterNot (_.path == subject.path)

protected def tellWatchersWeDied(actor: Actor): Unit =
protected def tellWatchersWeDied(): Unit =
if (!watchedBy.isEmpty) {
try {
// Don't need to send to parent parent since it receives a DWN by default
def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit =
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage(
DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal && watcher != parent)
watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))

/*
* It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
catch handleNonFatalOrInterruptedException { e publish(Error(e, self.path.toString, clazz(a), e.getMessage)) }
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
finally try tellWatchersWeDied(a)
finally try tellWatchersWeDied()
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
finally {
if (system.settings.DebugLifecycle)
Expand Down

0 comments on commit b0ab6f4

Please sign in to comment.