Skip to content

Commit

Permalink
!clt akka#15110 Use buffer instead of stash in singleton proxy
Browse files Browse the repository at this point in the history
* drop first in singleton proxy
  • Loading branch information
patriknw committed Jun 18, 2015
1 parent b8ef08a commit e2608e7
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
.scalastyle
.settings
.cache*
.tmpBin
.tags
.tags_sorted_by_file
.target
Expand Down
11 changes: 10 additions & 1 deletion akka-cluster-tools/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,14 @@ akka.cluster.singleton-proxy {
role = ""

# Interval at which the proxy will try to resolve the singleton instance.
singleton-identification-interval = 1s
singleton-identification-interval = 1s

# If the location of the singleton is unknown the proxy will buffer this
# number of messages and deliver them when the singleton is identified.
# When the buffer is full old messages will be dropped when new messages are
# sent via the proxy.
# Use 0 to disable buffering, i.e. messages will be dropped immediately if
# the location of the singleton is unknown.
# Maximum allowed buffer size is 10000.
buffer-size = 1000
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ object ClusterSingletonProxySettings {
def apply(config: Config): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(
role = roleOption(config.getString("role")),
singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis)
singletonIdentificationInterval = config.getDuration("singleton-identification-interval", MILLISECONDS).millis,
bufferSize = config.getInt("buffer-size"))

/**
* Java API: Create settings from the default configuration
Expand All @@ -59,10 +60,17 @@ object ClusterSingletonProxySettings {
/**
* @param role The role of the cluster nodes where the singleton can be deployed. If None, then any node will do.
* @param singletonIdentificationInterval Interval at which the proxy will try to resolve the singleton instance.
* @param bufferSize If the location of the singleton is unknown the proxy will buffer this number of messages
* and deliver them when the singleton is identified. When the buffer is full old messages will be dropped
* when new messages are sent viea the proxy. Use 0 to disable buffering, i.e. messages will be dropped
* immediately if the location of the singleton is unknown.
*/
final class ClusterSingletonProxySettings(
val role: Option[String],
val singletonIdentificationInterval: FiniteDuration) extends NoSerializationVerificationNeeded {
val singletonIdentificationInterval: FiniteDuration,
val bufferSize: Int) extends NoSerializationVerificationNeeded {

require(bufferSize >= 0 && bufferSize <= 10000, "bufferSize must be >= 0 and <= 10000")

def withRole(role: String): ClusterSingletonProxySettings = copy(role = ClusterSingletonProxySettings.roleOption(role))

Expand All @@ -71,9 +79,13 @@ final class ClusterSingletonProxySettings(
def withSingletonIdentificationInterval(singletonIdentificationInterval: FiniteDuration): ClusterSingletonProxySettings =
copy(singletonIdentificationInterval = singletonIdentificationInterval)

def withBufferSize(bufferSize: Int): ClusterSingletonProxySettings =
copy(bufferSize = bufferSize)

private def copy(role: Option[String] = role,
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(role, singletonIdentificationInterval)
singletonIdentificationInterval: FiniteDuration = singletonIdentificationInterval,
bufferSize: Int = bufferSize): ClusterSingletonProxySettings =
new ClusterSingletonProxySettings(role, singletonIdentificationInterval, bufferSize)
}

object ClusterSingletonProxy {
Expand All @@ -96,19 +108,19 @@ object ClusterSingletonProxy {
*
* The proxy can be started on every node where the singleton needs to be reached and used as if it were the singleton
* itself. It will then act as a router to the currently running singleton instance. If the singleton is not currently
* available, e.g., during hand off or startup, the proxy will stash the messages sent to the singleton and then unstash
* them when the singleton is finally available. The proxy mixes in the [[akka.actor.Stash]] trait, so it can be
* configured accordingly.
* available, e.g., during hand off or startup, the proxy will buffer the messages sent to the singleton and then deliver
* them when the singleton is finally available. The size of the buffer is configurable and it can be disabled by using
* a buffer size of 0. When the buffer is full old messages will be dropped when new messages are sent via the proxy.
*
* The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g., because
* The proxy works by keeping track of the oldest cluster member. When a new oldest member is identified, e.g. because
* the older one left the cluster, or at startup, the proxy will try to identify the singleton on the oldest member by
* periodically sending an [[akka.actor.Identify]] message until the singleton responds with its
* [[akka.actor.ActorIdentity]].
*
* Note that this is a best effort implementation: messages can always be lost due to the distributed nature of the
* actors involved.
*/
class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with Stash with ActorLogging {
final class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingletonProxySettings) extends Actor with ActorLogging {
import settings._
val singletonPath = singletonPathString.split("/")
var identifyCounter = 0
Expand All @@ -124,6 +136,8 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle
}
var membersByAge: immutable.SortedSet[Member] = immutable.SortedSet.empty(ageOrdering)

var buffer = new java.util.LinkedList[(Any, ActorRef)]

// subscribe to MemberEvent, re-subscribe when restart
override def preStart(): Unit = {
cancelTimer()
Expand Down Expand Up @@ -206,11 +220,12 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle

// singleton identification logic
case ActorIdentity(identifyId, Some(s))
// if the new singleton is defined, unstash all messages
// if the new singleton is defined, deliver all buffered messages
log.info("Singleton identified: {}", s.path)
singleton = Some(s)
context.watch(s)
cancelTimer()
unstashAll()
sendBuffered()
case _: ActorIdentity // do nothing
case ClusterSingletonProxy.TryToIdentifySingleton if identifyTimer.isDefined
membersByAge.headOption.foreach {
Expand All @@ -219,16 +234,41 @@ class ClusterSingletonProxy(singletonPathString: String, settings: ClusterSingle
log.debug("Trying to identify singleton at {}", singletonAddress)
context.actorSelection(singletonAddress) ! Identify(identifyId)
}
case Terminated(ref)
if (singleton.exists(_ == ref)) {
// buffering mode, identification of new will start when old node is removed
singleton = None
}

// forwarding/stashing logic
case msg: Any
singleton match {
case Some(s)
log.debug("Forwarding message to current singleton instance {}", msg)
log.debug("Forwarding message type [{}] to current singleton instance", msg.getClass.getName)
s forward msg
case None
log.debug("No singleton available, stashing message {}", msg)
stash()
buffer(msg)
}
}

def buffer(msg: Any): Unit =
if (settings.bufferSize == 0)
log.debug("Singleton not available and buffering is disabled, dropping message [{}]", msg.getClass.getName)
else if (buffer.size == settings.bufferSize) {
val (m, _) = buffer.removeFirst()
log.debug("Singleton not available, buffer is full, dropping first message [{}]", m.getClass.getName)
buffer.addLast((msg, sender()))
} else {
log.debug("Singleton not available, buffering message type [{}]", msg.getClass.getName)
buffer.addLast((msg, sender()))
}

def sendBuffered(): Unit = {
log.debug("Sending buffered messages to current singleton instance")
val target = singleton.get
while (!buffer.isEmpty) {
val (msg, snd) = buffer.removeFirst()
target.tell(msg, snd)
}
}
}
4 changes: 4 additions & 0 deletions akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ Parameters to the ``Props`` factory methods have been moved to settings object `
and ``ClusterSingletonProxySettings``. These can be created from system configuration properties and also
amended with API as needed.

The buffer size of the ``ClusterSingletonProxy`` can be defined in the ``ClusterSingletonProxySettings``
instead of defining ``stash-capacity`` of the mailbox. Buffering can be disabled by using a
buffer size of 0.

DistributedPubSub construction
==============================

Expand Down
12 changes: 8 additions & 4 deletions akka-docs/rst/scala/cluster-singleton.rst
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,14 @@ the oldest node in the cluster and resolve the singleton's ``ActorRef`` by expli
singleton's ``actorSelection`` the ``akka.actor.Identify`` message and waiting for it to reply.
This is performed periodically if the singleton doesn't reply within a certain (configurable) time.
Given the implementation, there might be periods of time during which the ``ActorRef`` is unavailable,
e.g., when a node leaves the cluster. In these cases, the proxy will stash away all messages until it
is able to identify the singleton. It's worth noting that messages can always be lost because of the
distributed nature of these actors. As always, additional logic should be implemented in the singleton
(acknowledgement) and in the client (retry) actors to ensure at-least-once message delivery.
e.g., when a node leaves the cluster. In these cases, the proxy will buffer the messages sent to the
singleton and then deliver them when the singleton is finally available. If the buffer is full
the ``ClusterSingletonProxy`` will drop old messages when new messages are sent via the proxy.
The size of the buffer is configurable and it can be disabled by using a buffer size of 0.

It's worth noting that messages can always be lost because of the distributed nature of these actors.
As always, additional logic should be implemented in the singleton (acknowledgement) and in the
client (retry) actors to ensure at-least-once message delivery.

Potential problems to be aware of
---------------------------------
Expand Down

0 comments on commit e2608e7

Please sign in to comment.