From b0ab6f4b31783e08ae88aad5f2d002541895a8e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Thu, 23 Jan 2014 17:19:36 +0100 Subject: [PATCH] =act #3562: Do not generate Terminated when unwatched in preRestart - also avoid duplicate DWN for parent --- .../test/scala/akka/actor/UidClashTest.scala | 99 +++++++++++++++++++ .../scala/akka/actor/dungeon/DeathWatch.scala | 9 +- .../akka/actor/dungeon/FaultHandling.scala | 2 +- 3 files changed, 105 insertions(+), 5 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/actor/UidClashTest.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/UidClashTest.scala b/akka-actor-tests/src/test/scala/akka/actor/UidClashTest.scala new file mode 100644 index 00000000000..133bf2b12c7 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/UidClashTest.scala @@ -0,0 +1,99 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.actor + +import akka.testkit.{ TestProbe, AkkaSpec } +import akka.actor.SupervisorStrategy.{ Restart, Stop } +import akka.dispatch.sysmsg.SystemMessage +import akka.event.EventStream +import scala.util.control.NoStackTrace + +object UidClashTest { + + class TerminatedForNonWatchedActor extends Exception("Received Terminated for actor that was not actually watched") + with NoStackTrace + + @volatile var oldActor: ActorRef = _ + + class EvilCollidingActorRef(override val provider: ActorRefProvider, + override val path: ActorPath, + val eventStream: EventStream) extends MinimalActorRef { + + //Ignore everything + override def isTerminated(): Boolean = true + override def sendSystemMessage(message: SystemMessage): Unit = () + override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = () + } + + def createCollidingRef(system: ActorSystem): ActorRef = + new EvilCollidingActorRef(system.asInstanceOf[ActorSystemImpl].provider, oldActor.path, system.eventStream) + + case object PleaseRestart + case object PingMyself + case object RestartedSafely + + class RestartedActor extends Actor { + + def receive = { + case PleaseRestart ⇒ throw new Exception("restart") + case Terminated(ref) ⇒ throw new TerminatedForNonWatchedActor + // This is the tricky part to make this test a positive one (avoid expectNoMsg). + // Since anything enqueued in postRestart will arrive before the Terminated + // the bug triggers, there needs to be a bounce: + // 1. Ping is sent from postRestart to self + // 2. As a response to pint, RestartedSafely is sent to self + // 3a. if Terminated was enqueued during the restart procedure it will arrive before the RestartedSafely message + // 3b. otherwise only the RestartedSafely message arrives + case PingMyself ⇒ self ! RestartedSafely + case RestartedSafely ⇒ context.parent ! RestartedSafely + } + + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { + context.children foreach { child ⇒ + oldActor = child + context.unwatch(child) + context.stop(child) + } + } + + override def preStart(): Unit = context.watch(context.actorOf(Props.empty, "child")) + + override def postRestart(reason: Throwable): Unit = { + context.watch(createCollidingRef(context.system)) + self ! PingMyself + } // Simulate UID clash + } + + class RestartingActor(probe: ActorRef) extends Actor { + override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) { + case _: TerminatedForNonWatchedActor ⇒ + context.stop(self) + Stop + case _ ⇒ Restart + } + val theRestartedOne = context.actorOf(Props[RestartedActor], "theRestartedOne") + + def receive = { + case PleaseRestart ⇒ theRestartedOne ! PleaseRestart + case RestartedSafely ⇒ probe ! RestartedSafely + } + } + +} + +class UidClashTest extends AkkaSpec { + import UidClashTest._ + + "The Terminated message for an old child stopped in preRestart" should { + "not arrive after restart" in { + val watcher = TestProbe() + val topActor = system.actorOf(Props(classOf[RestartingActor], watcher.ref), "top") + watcher.watch(topActor) + + topActor ! PleaseRestart + watcher.expectMsg(RestartedSafely) + } + } + +} diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index 11628f2ca01..3d334c0f475 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -50,7 +50,6 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * it will be propagated to user's receive. */ protected def watchedActorTerminated(actor: ActorRef, existenceConfirmed: Boolean, addressTerminated: Boolean): Unit = { - if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor) if (watchingContains(actor)) { maintainAddressTerminatedSubscription(actor) { watching = removeFromSet(actor, watching) @@ -60,6 +59,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ terminatedQueuedFor(actor) } } + if (childrenRefs.getByRef(actor).isDefined) handleChildTerminated(actor) } private[akka] def terminatedQueuedFor(subject: ActorRef): Unit = @@ -77,12 +77,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ if (subject.path.uid != ActorCell.undefinedUid) (set - subject) - new UndefinedUidActorRef(subject) else set filterNot (_.path == subject.path) - protected def tellWatchersWeDied(actor: Actor): Unit = + protected def tellWatchersWeDied(): Unit = if (!watchedBy.isEmpty) { try { + // Don't need to send to parent parent since it receives a DWN by default def sendTerminated(ifLocal: Boolean)(watcher: ActorRef): Unit = - if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal) watcher.asInstanceOf[InternalActorRef].sendSystemMessage( - DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) + if (watcher.asInstanceOf[ActorRefScope].isLocal == ifLocal && watcher != parent) + watcher.asInstanceOf[InternalActorRef].sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) /* * It is important to notify the remote watchers first, otherwise RemoteDaemon might shut down, causing diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 159a371eedd..4d7d30ee522 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -202,7 +202,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(a), e.getMessage)) } finally try dispatcher.detach(this) finally try parent.sendSystemMessage(DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false)) - finally try tellWatchersWeDied(a) + finally try tellWatchersWeDied() finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure finally { if (system.settings.DebugLifecycle)