diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala new file mode 100644 index 00000000000..b675b4474ef --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala @@ -0,0 +1,147 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.remote + +import language.postfixOps +import scala.concurrent.duration._ +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.remote.testconductor.RoleName +import akka.remote.transport.ThrottlerTransportAdapter.{ForceDisassociate, Direction} +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import akka.actor.ActorIdentity +import akka.remote.testconductor.RoleName +import akka.actor.Identify +import scala.concurrent.Await + +object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(debugConfig(on = false).withFallback( + ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.remote.log-remote-lifecycle-events = INFO + #akka.remote.retry-gate-closed-for = 0.5 s + akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s + akka.remote.gate-invalid-addresses-for = 0.5 s + """))) + + testTransport(on = true) + + class Subject extends Actor { + def receive = { + case "shutdown" => context.system.shutdown() + case msg ⇒ sender ! msg + } + } + +} + +class RemoteNodeShutdownAndComesBackMultiJvmNode1 extends RemoteNodeShutdownAndComesBackSpec +class RemoteNodeShutdownAndComesBackMultiJvmNode2 extends RemoteNodeShutdownAndComesBackSpec + +abstract class RemoteNodeShutdownAndComesBackSpec + extends MultiNodeSpec(RemoteNodeShutdownAndComesBackSpec) + with STMultiNodeSpec with ImplicitSender { + + import RemoteNodeShutdownAndComesBackSpec._ + + override def initialParticipants = roles.size + + def identify(role: RoleName, actorName: String): ActorRef = { + system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) + expectMsgType[ActorIdentity].ref.get + } + + "RemoteNodeShutdownAndComesBack" must { + + "properly reset system message buffer state when new system with same Address comes up" taggedAs LongRunningTest in { + runOn(first) { + val secondAddress = node(second).address + system.actorOf(Props[Subject], "subject1") + enterBarrier("actors-started") + + val subject = identify(second, "subject") + val sysmsgBarrier = identify(second, "sysmsgBarrier") + + // Prime up the system message buffer + watch(subject) + enterBarrier("watch-established") + + // Wait for proper system message propagation + // (Using a helper actor to ensure that all previous system messages arrived) + watch(sysmsgBarrier) + system.stop(sysmsgBarrier) + expectTerminated(sysmsgBarrier) + + // Drop all messages from this point so no SHUTDOWN is ever received + testConductor.blackhole(second, first, Direction.Send).await + // Shut down all existing connections so that the system can enter recovery mode (association attempts) + Await.result(RARP(system).provider.transport.managementCommand(ForceDisassociate(node(second).address)), 3.seconds) + + // Trigger reconnect attempt and also queue up a system message to be in limbo state (UID of remote system + // is unknown, and system message is pending) + system.stop(subject) + subject ! "hello" + subject ! "hello" + subject ! "hello" + + // Get rid of old system -- now SHUTDOWN is lost + testConductor.shutdown(second).await + expectTerminated(subject, 10.seconds) + + // At this point the second node is restarting, while the first node is trying to reconnect without resetting + // the system message send state + + // Now wait until second system becomes alive again + within(30.seconds) { + // retry because the Subject actor might not be started yet + awaitAssert { + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "echo" + expectMsg(1.second, "echo") + } + } + + // Establish watch with the new system. This triggers additional system message traffic. If buffers are out + // of synch the remote system will be quarantined and the rest of the test will fail (or even in earlier + // stages depending on circumstances). + system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject") + val subjectNew = expectMsgType[ActorIdentity].ref.get + watch(subjectNew) + + subjectNew ! "shutdown" + expectTerminated(subjectNew) + } + + runOn(second) { + val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress + system.actorOf(Props[Subject], "subject") + system.actorOf(Props[Subject], "sysmsgBarrier") + val path = node(first) + enterBarrier("actors-started") + + enterBarrier("watch-established") + + system.awaitTermination(30.seconds) + + val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s""" + akka.remote.netty.tcp { + hostname = ${addr.host.get} + port = ${addr.port.get} + } + """).withFallback(system.settings.config)) + freshSystem.actorOf(Props[Subject], "subject") + + + freshSystem.awaitTermination(30.seconds) + } + + } + + } +} diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 4c460287daa..ef276f5a830 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -180,6 +180,7 @@ private[remote] class ReliableDeliverySupervisor( override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) { case e @ (_: InvalidAssociation | _: HopelessAssociation | _: QuarantinedUidException) ⇒ Escalate case NonFatal(e) ⇒ + uidConfirmed = false // Need confirmation of UID again if (retryGateEnabled) { context.become(gated) context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate) @@ -198,12 +199,14 @@ private[remote] class ReliableDeliverySupervisor( var resendDeadline: Deadline = _ var lastCumulativeAck: SeqNo = _ var seqCounter: Long = _ + var pendingAcks = Vector.empty[Ack] def reset() { resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize) resendDeadline = Deadline.now + settings.SysResendTimeout lastCumulativeAck = SeqNo(-1) seqCounter = 0L + pendingAcks = Vector.empty } reset() @@ -216,6 +219,18 @@ private[remote] class ReliableDeliverySupervisor( var writer: ActorRef = createWriter() var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid } + // Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the + // UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for + // any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore + // it serves a separator. + // If we already have an inbound handle then UID is initially confirmed. + // (This actor is never restarted) + var uidConfirmed: Boolean = uid.isDefined + + def unstashAcks(): Unit = { + pendingAcks foreach (self ! _) + pendingAcks = Vector.empty + } override def postStop(): Unit = { // All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence @@ -242,30 +257,37 @@ private[remote] class ReliableDeliverySupervisor( case s: Send ⇒ handleSend(s) case ack: Ack ⇒ - try resendBuffer = resendBuffer.acknowledge(ack) - catch { - case NonFatal(e) ⇒ - throw new InvalidAssociationException("Error encountered while processing system message acknowledgement", e) - } + if (!uidConfirmed) pendingAcks = pendingAcks :+ ack + else { + try resendBuffer = resendBuffer.acknowledge(ack) + catch { + case NonFatal(e) ⇒ + throw new InvalidAssociationException(s"Error encountered while processing system message acknowledgement $resendBuffer $ack", e) + } - if (lastCumulativeAck < ack.cumulativeAck) { - resendDeadline = Deadline.now + settings.SysResendTimeout - lastCumulativeAck = ack.cumulativeAck - } else if (resendDeadline.isOverdue()) { - resendAll() - resendDeadline = Deadline.now + settings.SysResendTimeout + if (lastCumulativeAck < ack.cumulativeAck) { + resendDeadline = Deadline.now + settings.SysResendTimeout + lastCumulativeAck = ack.cumulativeAck + } else if (resendDeadline.isOverdue()) { + resendAll() + resendDeadline = Deadline.now + settings.SysResendTimeout + } + resendNacked() } - resendNacked() case Terminated(_) ⇒ currentHandle = None context.parent ! StoppedReading(self) if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery) context.become(idle) - case GotUid(u) ⇒ + case GotUid(receivedUid) ⇒ // New system that has the same address as the old - need to start from fresh state - if (uid.isDefined && uid.get != u) reset() - uid = Some(u) + uidConfirmed = true + if (uid.exists(_ != receivedUid)) reset() + else unstashAcks() + uid = Some(receivedUid) + resendAll() + case s: EndpointWriter.StopReading ⇒ writer forward s } @@ -273,7 +295,7 @@ private[remote] class ReliableDeliverySupervisor( case Ungate ⇒ if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) { writer = createWriter() - resendAll() + // Resending will be triggered by the incoming GotUid message after the connection finished context.become(receive) } else context.become(idle) case s @ Send(msg: SystemMessage, _, _, _) ⇒ tryBuffer(s.copy(seqOpt = Some(nextSeq()))) @@ -286,12 +308,12 @@ private[remote] class ReliableDeliverySupervisor( def idle: Receive = { case s: Send ⇒ writer = createWriter() - resendAll() + // Resending will be triggered by the incoming GotUid message after the connection finished handleSend(s) context.become(receive) case AttemptSysMsgRedelivery ⇒ writer = createWriter() - resendAll() + // Resending will be triggered by the incoming GotUid message after the connection finished context.become(receive) case EndpointWriter.FlushAndStop ⇒ context.stop(self) case EndpointWriter.StopReading(w) ⇒ sender ! EndpointWriter.StoppedReading(w) @@ -310,7 +332,9 @@ private[remote] class ReliableDeliverySupervisor( if (send.message.isInstanceOf[SystemMessage]) { val sequencedSend = send.copy(seqOpt = Some(nextSeq())) tryBuffer(sequencedSend) - writer ! sequencedSend + // If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it. + // GotUid will kick resendAll() causing the messages to be properly written + if (uidConfirmed) writer ! sequencedSend } else writer ! send private def resendNacked(): Unit = resendBuffer.nacked foreach { writer ! _ }