From 8a7d7715b5131042fae2c20c39e4444deb6fd71e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 26 May 2015 09:00:40 +0200 Subject: [PATCH] clu #17565 Invoke OnMemberRemoved callback when cluster.shutdown * must also be done when the listener actor stops before the MemberRemoved event has been received * add test for this * clarify docs with example that shuts down actor system and exit jvm --- .../scala/akka/cluster/ClusterDaemon.scala | 6 ++++- .../test/scala/akka/cluster/ClusterSpec.scala | 6 +++++ akka-docs/rst/java/cluster-usage.rst | 13 +++++++--- akka-docs/rst/scala/cluster-usage.rst | 13 +++++++--- .../factorial/FactorialFrontendMain.java | 25 ++++++++++++++++--- .../cluster/factorial/FactorialFrontend.scala | 10 ++++++-- 6 files changed, 60 insertions(+), 13 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 6d5fa627d83..0682a1cd3df 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -1133,8 +1133,12 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status: override def preStart(): Unit = cluster.subscribe(self, to) - override def postStop(): Unit = + + override def postStop(): Unit = { + if (status == Removed) + done() cluster.unsubscribe(self) + } def receive = { case state: CurrentClusterState ⇒ diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala index 23c710df253..565985282b8 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala @@ -15,6 +15,7 @@ import akka.cluster.InternalClusterAction._ import java.lang.management.ManagementFactory import javax.management.ObjectName import akka.actor.ActorRef +import akka.testkit.TestProbe object ClusterSpec { val config = """ @@ -93,12 +94,17 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender { // this should be the last test step, since the cluster is shutdown "publish MemberRemoved when shutdown" in { + val callbackProbe = TestProbe() + cluster.registerOnMemberRemoved(callbackProbe.ref ! "OnMemberRemoved") + cluster.subscribe(testActor, classOf[ClusterEvent.MemberRemoved]) // first, is in response to the subscription expectMsgClass(classOf[ClusterEvent.CurrentClusterState]) cluster.shutdown() expectMsgType[ClusterEvent.MemberRemoved].member.address should ===(selfAddress) + + callbackProbe.expectMsg("OnMemberRemoved") } } diff --git a/akka-docs/rst/java/cluster-usage.rst b/akka-docs/rst/java/cluster-usage.rst index d1ba8f6cf28..4dc82f376f4 100644 --- a/akka-docs/rst/java/cluster-usage.rst +++ b/akka-docs/rst/java/cluster-usage.rst @@ -112,7 +112,8 @@ An actor system can only join a cluster once. Additional attempts will be ignore When it has successfully joined it must be restarted to be able to join another cluster or to join the same cluster again. It can use the same host name and port after the restart, when it come up as new incarnation of existing member in the cluster, -trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join. +trying to join in, then the existing one will be removed from the cluster and then it will +be allowed to join. .. _automatic-vs-manual-downing-java: @@ -287,15 +288,21 @@ has at least the defined number of members. This callback can be used for other things than starting actors. +How To Cleanup when Member is Removed +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + You can do some clean up in a ``registerOnMemberRemoved`` callback, which will be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e. terminate the actor system. +For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM: + .. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved -.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on +.. note:: + Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may - want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting + want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting down when you installing, and depending on the race is not healthy. Cluster Singleton diff --git a/akka-docs/rst/scala/cluster-usage.rst b/akka-docs/rst/scala/cluster-usage.rst index 4b21af5e4e4..57fb77d9021 100644 --- a/akka-docs/rst/scala/cluster-usage.rst +++ b/akka-docs/rst/scala/cluster-usage.rst @@ -107,7 +107,8 @@ An actor system can only join a cluster once. Additional attempts will be ignore When it has successfully joined it must be restarted to be able to join another cluster or to join the same cluster again.It can use the same host name and port after the restart, when it come up as new incarnation of existing member in the cluster, -trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join. +trying to join in, then the existing one will be removed from the cluster and then it will +be allowed to join. .. _automatic-vs-manual-downing-scala: @@ -281,15 +282,21 @@ has at least the defined number of members. This callback can be used for other things than starting actors. +How To Cleanup when Member is Removed +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + You can do some clean up in a ``registerOnMemberRemoved`` callback, which will be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e. terminate the actor system. +For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM: + .. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved -.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on +.. note:: + Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may - want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting + want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting down when you installing, and depending on the race is not healthy. Cluster Singleton diff --git a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java index 69bc399ffc1..ca9ccb14478 100644 --- a/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java +++ b/akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java @@ -1,7 +1,11 @@ package sample.cluster.factorial; +import java.util.concurrent.TimeUnit; + +import scala.concurrent.duration.FiniteDuration; import com.typesafe.config.Config; import com.typesafe.config.ConfigFactory; + import akka.actor.ActorSystem; import akka.actor.Props; import akka.cluster.Cluster; @@ -30,10 +34,23 @@ public void run() { //#registerOnRemoved Cluster.get(system).registerOnMemberRemoved(new Runnable() { - @Override - public void run() { - system.terminate(); - } + @Override + public void run() { + // exit JVM when ActorSystem has been terminated + final Runnable exit = new Runnable() { + @Override + public void run() { + System.exit(-1); + } + }; + system.registerOnTermination(exit); + // in case ActorSystem shutdown takes longer than 10 seconds, + // exit the JVM forcefully anyway + system.scheduler().scheduleOnce(FiniteDuration.create(10, TimeUnit.SECONDS), + exit, system.dispatcher()); + // shut down ActorSystem + system.terminate(); + } }); //#registerOnRemoved diff --git a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala index e4671dae776..64ef9bbdbca 100644 --- a/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala +++ b/akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala @@ -59,10 +59,16 @@ object FactorialFrontend { //#registerOnUp //#registerOnRemoved - Cluster(system).registerOnMemberRemoved{ + Cluster(system).registerOnMemberRemoved { + // exit JVM when ActorSystem has been terminated + system.registerOnTermination(System.exit(-1)) + // in case ActorSystem shutdown takes longer than 10 seconds, + // exit the JVM forcefully anyway + system.scheduler.scheduleOnce(10.seconds)(System.exit(-1))(system.dispatcher) + // shut down ActorSystem system.terminate() } //#registerOnRemoved } -} \ No newline at end of file +}