Skip to content

Commit

Permalink
clu akka#17565 Invoke OnMemberRemoved callback when
Browse files Browse the repository at this point in the history
 cluster.shutdown

* must also be done when the listener actor stops before the
  MemberRemoved event has been received
* add test for this
* clarify docs with example that shuts down actor system and
  exit jvm
  • Loading branch information
patriknw committed May 27, 2015
1 parent ba8756d commit 8a7d771
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 13 deletions.
6 changes: 5 additions & 1 deletion akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,12 @@ private[cluster] class OnMemberStatusChangedListener(callback: Runnable, status:

override def preStart(): Unit =
cluster.subscribe(self, to)
override def postStop(): Unit =

override def postStop(): Unit = {
if (status == Removed)
done()
cluster.unsubscribe(self)
}

def receive = {
case state: CurrentClusterState
Expand Down
6 changes: 6 additions & 0 deletions akka-cluster/src/test/scala/akka/cluster/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import akka.cluster.InternalClusterAction._
import java.lang.management.ManagementFactory
import javax.management.ObjectName
import akka.actor.ActorRef
import akka.testkit.TestProbe

object ClusterSpec {
val config = """
Expand Down Expand Up @@ -93,12 +94,17 @@ class ClusterSpec extends AkkaSpec(ClusterSpec.config) with ImplicitSender {

// this should be the last test step, since the cluster is shutdown
"publish MemberRemoved when shutdown" in {
val callbackProbe = TestProbe()
cluster.registerOnMemberRemoved(callbackProbe.ref ! "OnMemberRemoved")

cluster.subscribe(testActor, classOf[ClusterEvent.MemberRemoved])
// first, is in response to the subscription
expectMsgClass(classOf[ClusterEvent.CurrentClusterState])

cluster.shutdown()
expectMsgType[ClusterEvent.MemberRemoved].member.address should ===(selfAddress)

callbackProbe.expectMsg("OnMemberRemoved")
}

}
Expand Down
13 changes: 10 additions & 3 deletions akka-docs/rst/java/cluster-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ An actor system can only join a cluster once. Additional attempts will be ignore
When it has successfully joined it must be restarted to be able to join another
cluster or to join the same cluster again. It can use the same host name and port
after the restart, when it come up as new incarnation of existing member in the cluster,
trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join.
trying to join in, then the existing one will be removed from the cluster and then it will
be allowed to join.


.. _automatic-vs-manual-downing-java:
Expand Down Expand Up @@ -287,15 +288,21 @@ has at least the defined number of members.

This callback can be used for other things than starting actors.

How To Cleanup when Member is Removed
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can do some clean up in a ``registerOnMemberRemoved`` callback, which will
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e.
terminate the actor system.

For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM:

.. includecode:: ../../../akka-samples/akka-sample-cluster-java/src/main/java/sample/cluster/factorial/FactorialFrontendMain.java#registerOnRemoved

.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
.. note::
Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting
down when you installing, and depending on the race is not healthy.

Cluster Singleton
Expand Down
13 changes: 10 additions & 3 deletions akka-docs/rst/scala/cluster-usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ An actor system can only join a cluster once. Additional attempts will be ignore
When it has successfully joined it must be restarted to be able to join another
cluster or to join the same cluster again.It can use the same host name and port
after the restart, when it come up as new incarnation of existing member in the cluster,
trying to join in ,then the existing one will be removed from the cluster and then it will be allowed to join.
trying to join in, then the existing one will be removed from the cluster and then it will
be allowed to join.

.. _automatic-vs-manual-downing-scala:

Expand Down Expand Up @@ -281,15 +282,21 @@ has at least the defined number of members.

This callback can be used for other things than starting actors.

How To Cleanup when Member is Removed
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can do some clean up in a ``registerOnMemberRemoved`` callback, which will
be invoked when the current member status is changed to 'Removed' or the cluster have been shutdown,i.e.
terminate the actor system.

For example, this is how to shut down the ``ActorSystem`` and thereafter exit the JVM:

.. includecode:: ../../../akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/factorial/FactorialFrontend.scala#registerOnRemoved

.. note:: Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
.. note::
Register a OnMemberRemoved callback on a cluster that have been shutdown, the callback will be invoked immediately on
the caller thread, otherwise it will be invoked later when the current member status changed to 'Removed'. You may
want to install some cleanup handling after the cluster was started up,but the cluster might already be shutting
want to install some cleanup handling after the cluster was started up, but the cluster might already be shutting
down when you installing, and depending on the race is not healthy.

Cluster Singleton
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package sample.cluster.factorial;

import java.util.concurrent.TimeUnit;

import scala.concurrent.duration.FiniteDuration;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.Cluster;
Expand Down Expand Up @@ -30,10 +34,23 @@ public void run() {

//#registerOnRemoved
Cluster.get(system).registerOnMemberRemoved(new Runnable() {
@Override
public void run() {
system.terminate();
}
@Override
public void run() {
// exit JVM when ActorSystem has been terminated
final Runnable exit = new Runnable() {
@Override
public void run() {
System.exit(-1);
}
};
system.registerOnTermination(exit);
// in case ActorSystem shutdown takes longer than 10 seconds,
// exit the JVM forcefully anyway
system.scheduler().scheduleOnce(FiniteDuration.create(10, TimeUnit.SECONDS),
exit, system.dispatcher());
// shut down ActorSystem
system.terminate();
}
});
//#registerOnRemoved

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,16 @@ object FactorialFrontend {
//#registerOnUp

//#registerOnRemoved
Cluster(system).registerOnMemberRemoved{
Cluster(system).registerOnMemberRemoved {
// exit JVM when ActorSystem has been terminated
system.registerOnTermination(System.exit(-1))
// in case ActorSystem shutdown takes longer than 10 seconds,
// exit the JVM forcefully anyway
system.scheduler.scheduleOnce(10.seconds)(System.exit(-1))(system.dispatcher)
// shut down ActorSystem
system.terminate()
}
//#registerOnRemoved

}
}
}

0 comments on commit 8a7d771

Please sign in to comment.