Skip to content

Commit

Permalink
Merge pull request akka#17154 from akka/wip-13869-cluster-singleton-s…
Browse files Browse the repository at this point in the history
…peedup-patriknw

=con akka#13869 Speedup startup of Cluster singleton
  • Loading branch information
rkuhn committed Apr 14, 2015
2 parents b205800 + 2ecfa42 commit 2bb2082
Show file tree
Hide file tree
Showing 3 changed files with 260 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ object ClusterSingletonManager {
/**
* The first event, corresponding to CurrentClusterState.
*/
final case class InitialOldestState(oldest: Option[Address], memberCount: Int)
final case class InitialOldestState(oldest: Option[Address], safeToBeOldest: Boolean)

final case class OldestChanged(oldest: Option[Address])
}
Expand Down Expand Up @@ -191,8 +191,9 @@ object ClusterSingletonManager {

def handleInitial(state: CurrentClusterState): Unit = {
membersByAge = immutable.SortedSet.empty(ageOrdering) ++ state.members.filter(m
m.status == MemberStatus.Up && matchingRole(m))
val initial = InitialOldestState(membersByAge.headOption.map(_.address), membersByAge.size)
(m.status == MemberStatus.Up || m.status == MemberStatus.Leaving) && matchingRole(m))
val safeToBeOldest = !state.members.exists { m (m.status == MemberStatus.Down || m.status == MemberStatus.Exiting) }
val initial = InitialOldestState(membersByAge.headOption.map(_.address), safeToBeOldest)
changes :+= initial
}

Expand Down Expand Up @@ -421,10 +422,10 @@ class ClusterSingletonManager(
getNextOldestChanged()
stay

case Event(InitialOldestState(oldestOption, memberCount), _)
case Event(InitialOldestState(oldestOption, safeToBeOldest), _)
oldestChangedReceived = true
if (oldestOption == selfAddressOption && memberCount == 1)
// alone, oldest immediately
if (oldestOption == selfAddressOption && safeToBeOldest)
// oldest immediately
gotoOldest()
else if (oldestOption == selfAddressOption)
goto(BecomingOldest) using BecomingOldestData(None)
Expand Down Expand Up @@ -592,7 +593,10 @@ class ClusterSingletonManager(
val newOldest = handOverTo.map(_.path.address)
logInfo("Singleton terminated, hand-over done [{} -> {}]", cluster.selfAddress, newOldest)
handOverTo foreach { _ ! HandOverDone }
if (selfExited || removed.contains(cluster.selfAddress))
if (removed.contains(cluster.selfAddress)) {
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
} else if (selfExited)
goto(End) using EndData
else
goto(Younger) using YoungerData(newOldest)
Expand All @@ -612,6 +616,9 @@ class ClusterSingletonManager(
logInfo("Exited [{}]", m.address)
}
stay
case Event(MemberRemoved(m, _), _) if m.address == cluster.selfAddress && !selfExited
logInfo("Self removed, stopping ClusterSingletonManager")
stop()
case Event(MemberRemoved(m, _), _)
if (!selfExited) logInfo("Member removed [{}]", m.address)
addRemoved(m.address)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.contrib.pattern

import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.ActorSelection
import akka.cluster.MemberStatus

object ClusterSingletonManagerLeaveSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")

commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = off
"""))

case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
override def postStop(): Unit = {
testActor ! "stopped"
}

def receive = {
case _
sender() ! self
}
}
}

class ClusterSingletonManagerLeaveMultiJvmNode1 extends ClusterSingletonManagerLeaveSpec
class ClusterSingletonManagerLeaveMultiJvmNode2 extends ClusterSingletonManagerLeaveSpec
class ClusterSingletonManagerLeaveMultiJvmNode3 extends ClusterSingletonManagerLeaveSpec

class ClusterSingletonManagerLeaveSpec extends MultiNodeSpec(ClusterSingletonManagerLeaveSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerLeaveSpec._

override def initialParticipants = roles.size

lazy val cluster = Cluster(system)

def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
createSingleton()
}
}

def createSingleton(): ActorRef = {
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
singletonName = "echo",
terminationMessage = PoisonPill,
role = None),
name = "singleton")
}

lazy val echoProxy: ActorRef = {
system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/echo",
role = None),
name = "echoProxy")
}

"Leaving ClusterSingletonManager" must {

"hand-over to new instance" in {
join(first, first)

runOn(first) {
echoProxy ! "hello"
expectMsgType[ActorRef](5.seconds)
}
enterBarrier("first-active")

join(second, first)
join(third, first)
within(10.seconds) {
awaitAssert(cluster.state.members.count(m m.status == MemberStatus.Up) should be(3))
}

runOn(second) {
cluster.leave(node(first).address)
}

runOn(first) {
expectMsg(10.seconds, "stopped")
}
enterBarrier("first-stopped")

runOn(second, third) {
val p = TestProbe()
val firstAddress = node(first).address
p.within(10.seconds) {
p.awaitAssert {
echoProxy.tell("hello2", p.ref)
p.expectMsgType[ActorRef](1.seconds).path.address should not be (firstAddress)

}
}
}

enterBarrier("hand-over-done")
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.contrib.pattern

import language.postfixOps
import scala.collection.immutable
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Address
import akka.actor.Props
import akka.actor.PoisonPill
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.Member
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import akka.testkit.TestEvent._
import akka.actor.Terminated
import akka.actor.ActorSelection
import akka.cluster.MemberStatus

object ClusterSingletonManagerStartupSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")

commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
"""))

case object EchoStarted
/**
* The singleton actor
*/
class Echo(testActor: ActorRef) extends Actor {
def receive = {
case _
sender() ! self
}
}
}

class ClusterSingletonManagerStartupMultiJvmNode1 extends ClusterSingletonManagerStartupSpec
class ClusterSingletonManagerStartupMultiJvmNode2 extends ClusterSingletonManagerStartupSpec
class ClusterSingletonManagerStartupMultiJvmNode3 extends ClusterSingletonManagerStartupSpec

class ClusterSingletonManagerStartupSpec extends MultiNodeSpec(ClusterSingletonManagerStartupSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterSingletonManagerStartupSpec._

override def initialParticipants = roles.size

def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
createSingleton()
}
}

def createSingleton(): ActorRef = {
system.actorOf(ClusterSingletonManager.props(
singletonProps = Props(classOf[Echo], testActor),
singletonName = "echo",
terminationMessage = PoisonPill,
role = None),
name = "singleton")
}

lazy val echoProxy: ActorRef = {
system.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/singleton/echo",
role = None),
name = "echoProxy")
}

"Startup of Cluster Singleton" must {

"be quick" in {
join(first, first)
join(second, first)
join(third, first)

within(7.seconds) {
awaitAssert {
val members = Cluster(system).state.members
members.size should be(3)
members.forall(_.status == MemberStatus.Up) should be(true)
}
}
enterBarrier("all-up")

// the singleton instance is expected to start "instantly"
echoProxy ! "hello"
expectMsgType[ActorRef](3.seconds)

enterBarrier("done")
}

}
}

0 comments on commit 2bb2082

Please sign in to comment.