Skip to content

Commit

Permalink
=rem #3606 Properly reset system message buffer between remote sys re…
Browse files Browse the repository at this point in the history
…starts
  • Loading branch information
drewhk committed Sep 18, 2013
1 parent ea9d244 commit 9fcae8a
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote

import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.remote.testconductor.RoleName
import akka.remote.transport.ThrottlerTransportAdapter.{ForceDisassociate, Direction}
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.actor.ActorIdentity
import akka.remote.testconductor.RoleName
import akka.actor.Identify
import scala.concurrent.Await

object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")

commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString("""
akka.loglevel = INFO
akka.remote.log-remote-lifecycle-events = INFO
#akka.remote.retry-gate-closed-for = 0.5 s
akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s
akka.remote.gate-invalid-addresses-for = 0.5 s
""")))

testTransport(on = true)

class Subject extends Actor {
def receive = {
case "shutdown" => context.system.shutdown()
case msg sender ! msg
}
}

}

class RemoteNodeShutdownAndComesBackMultiJvmNode1 extends RemoteNodeShutdownAndComesBackSpec
class RemoteNodeShutdownAndComesBackMultiJvmNode2 extends RemoteNodeShutdownAndComesBackSpec

abstract class RemoteNodeShutdownAndComesBackSpec
extends MultiNodeSpec(RemoteNodeShutdownAndComesBackSpec)
with STMultiNodeSpec with ImplicitSender {

import RemoteNodeShutdownAndComesBackSpec._

override def initialParticipants = roles.size

def identify(role: RoleName, actorName: String): ActorRef = {
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
expectMsgType[ActorIdentity].ref.get
}

"RemoteNodeShutdownAndComesBack" must {

"properly reset system message buffer state when new system with same Address comes up" taggedAs LongRunningTest in {
runOn(first) {
val secondAddress = node(second).address
system.actorOf(Props[Subject], "subject1")
enterBarrier("actors-started")

val subject = identify(second, "subject")
val sysmsgBarrier = identify(second, "sysmsgBarrier")

// Prime up the system message buffer
watch(subject)
enterBarrier("watch-established")

// Wait for proper system message propagation
// (Using a helper actor to ensure that all previous system messages arrived)
watch(sysmsgBarrier)
system.stop(sysmsgBarrier)
expectTerminated(sysmsgBarrier)

// Drop all messages from this point so no SHUTDOWN is ever received
testConductor.blackhole(second, first, Direction.Send).await
// Shut down all existing connections so that the system can enter recovery mode (association attempts)
Await.result(RARP(system).provider.transport.managementCommand(ForceDisassociate(node(second).address)), 3.seconds)

// Trigger reconnect attempt and also queue up a system message to be in limbo state (UID of remote system
// is unknown, and system message is pending)
system.stop(subject)
subject ! "hello"
subject ! "hello"
subject ! "hello"

// Get rid of old system -- now SHUTDOWN is lost
testConductor.shutdown(second).await
expectTerminated(subject, 10.seconds)

// At this point the second node is restarting, while the first node is trying to reconnect without resetting
// the system message send state

// Now wait until second system becomes alive again
within(30.seconds) {
// retry because the Subject actor might not be started yet
awaitAssert {
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! "echo"
expectMsg(1.second, "echo")
}
}

// Establish watch with the new system. This triggers additional system message traffic. If buffers are out
// of synch the remote system will be quarantined and the rest of the test will fail (or even in earlier
// stages depending on circumstances).
system.actorSelection(RootActorPath(secondAddress) / "user" / "subject") ! Identify("subject")
val subjectNew = expectMsgType[ActorIdentity].ref.get
watch(subjectNew)

subjectNew ! "shutdown"
expectTerminated(subjectNew)
}

runOn(second) {
val addr = system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
system.actorOf(Props[Subject], "subject")
system.actorOf(Props[Subject], "sysmsgBarrier")
val path = node(first)
enterBarrier("actors-started")

enterBarrier("watch-established")

system.awaitTermination(30.seconds)

val freshSystem = ActorSystem(system.name, ConfigFactory.parseString(s"""
akka.remote.netty.tcp {
hostname = ${addr.host.get}
port = ${addr.port.get}
}
""").withFallback(system.settings.config))
freshSystem.actorOf(Props[Subject], "subject")


freshSystem.awaitTermination(30.seconds)
}

}

}
}
62 changes: 43 additions & 19 deletions akka-remote/src/main/scala/akka/remote/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ private[remote] class ReliableDeliverySupervisor(
override val supervisorStrategy = OneForOneStrategy(settings.MaximumRetriesInWindow, settings.RetryWindow, loggingEnabled = false) {
case e @ (_: InvalidAssociation | _: HopelessAssociation | _: QuarantinedUidException) Escalate
case NonFatal(e)
uidConfirmed = false // Need confirmation of UID again
if (retryGateEnabled) {
context.become(gated)
context.system.scheduler.scheduleOnce(settings.RetryGateClosedFor, self, Ungate)
Expand All @@ -198,12 +199,14 @@ private[remote] class ReliableDeliverySupervisor(
var resendDeadline: Deadline = _
var lastCumulativeAck: SeqNo = _
var seqCounter: Long = _
var pendingAcks = Vector.empty[Ack]

def reset() {
resendBuffer = new AckedSendBuffer[Send](settings.SysMsgBufferSize)
resendDeadline = Deadline.now + settings.SysResendTimeout
lastCumulativeAck = SeqNo(-1)
seqCounter = 0L
pendingAcks = Vector.empty
}

reset()
Expand All @@ -216,6 +219,18 @@ private[remote] class ReliableDeliverySupervisor(

var writer: ActorRef = createWriter()
var uid: Option[Int] = handleOrActive map { _.handshakeInfo.uid }
// Processing of Acks has to be delayed until the UID after a reconnect is discovered. Depending whether the
// UID matches the expected one, pending Acks can be processed, or must be dropped. It is guaranteed that for
// any inbound connections (calling createWriter()) the first message from that connection is GotUid() therefore
// it serves a separator.
// If we already have an inbound handle then UID is initially confirmed.
// (This actor is never restarted)
var uidConfirmed: Boolean = uid.isDefined

def unstashAcks(): Unit = {
pendingAcks foreach (self ! _)
pendingAcks = Vector.empty
}

override def postStop(): Unit = {
// All remaining messages in the buffer has to be delivered to dead letters. It is important to clear the sequence
Expand All @@ -242,38 +257,45 @@ private[remote] class ReliableDeliverySupervisor(
case s: Send
handleSend(s)
case ack: Ack
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e)
throw new InvalidAssociationException("Error encountered while processing system message acknowledgement", e)
}
if (!uidConfirmed) pendingAcks = pendingAcks :+ ack
else {
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e)
throw new InvalidAssociationException(s"Error encountered while processing system message acknowledgement $resendBuffer $ack", e)
}

if (lastCumulativeAck < ack.cumulativeAck) {
resendDeadline = Deadline.now + settings.SysResendTimeout
lastCumulativeAck = ack.cumulativeAck
} else if (resendDeadline.isOverdue()) {
resendAll()
resendDeadline = Deadline.now + settings.SysResendTimeout
if (lastCumulativeAck < ack.cumulativeAck) {
resendDeadline = Deadline.now + settings.SysResendTimeout
lastCumulativeAck = ack.cumulativeAck
} else if (resendDeadline.isOverdue()) {
resendAll()
resendDeadline = Deadline.now + settings.SysResendTimeout
}
resendNacked()
}
resendNacked()
case Terminated(_)
currentHandle = None
context.parent ! StoppedReading(self)
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty)
context.system.scheduler.scheduleOnce(settings.SysResendTimeout, self, AttemptSysMsgRedelivery)
context.become(idle)
case GotUid(u)
case GotUid(receivedUid)
// New system that has the same address as the old - need to start from fresh state
if (uid.isDefined && uid.get != u) reset()
uid = Some(u)
uidConfirmed = true
if (uid.exists(_ != receivedUid)) reset()
else unstashAcks()
uid = Some(receivedUid)
resendAll()

case s: EndpointWriter.StopReading writer forward s
}

def gated: Receive = {
case Ungate
if (resendBuffer.nonAcked.nonEmpty || resendBuffer.nacked.nonEmpty) {
writer = createWriter()
resendAll()
// Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive)
} else context.become(idle)
case s @ Send(msg: SystemMessage, _, _, _) tryBuffer(s.copy(seqOpt = Some(nextSeq())))
Expand All @@ -286,12 +308,12 @@ private[remote] class ReliableDeliverySupervisor(
def idle: Receive = {
case s: Send
writer = createWriter()
resendAll()
// Resending will be triggered by the incoming GotUid message after the connection finished
handleSend(s)
context.become(receive)
case AttemptSysMsgRedelivery
writer = createWriter()
resendAll()
// Resending will be triggered by the incoming GotUid message after the connection finished
context.become(receive)
case EndpointWriter.FlushAndStop context.stop(self)
case EndpointWriter.StopReading(w) sender ! EndpointWriter.StoppedReading(w)
Expand All @@ -310,7 +332,9 @@ private[remote] class ReliableDeliverySupervisor(
if (send.message.isInstanceOf[SystemMessage]) {
val sequencedSend = send.copy(seqOpt = Some(nextSeq()))
tryBuffer(sequencedSend)
writer ! sequencedSend
// If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
// GotUid will kick resendAll() causing the messages to be properly written
if (uidConfirmed) writer ! sequencedSend
} else writer ! send

private def resendNacked(): Unit = resendBuffer.nacked foreach { writer ! _ }
Expand Down

0 comments on commit 9fcae8a

Please sign in to comment.