diff --git a/akka-docs/rst/java/code/docs/persistence/PersistenceMultiDocTest.java b/akka-docs/rst/java/code/docs/persistence/PersistenceMultiDocTest.java new file mode 100644 index 00000000000..92ad569ed7e --- /dev/null +++ b/akka-docs/rst/java/code/docs/persistence/PersistenceMultiDocTest.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +package docs.persistence; + +import akka.persistence.UntypedPersistentActor; + +public class PersistenceMultiDocTest { + + //#default-plugins + abstract class ActorWithDefaultPlugins extends UntypedPersistentActor { + @Override + public String persistenceId() { return "123"; } + } + //#default-plugins + + //#override-plugins + abstract class ActorWithOverridePlugins extends UntypedPersistentActor { + @Override + public String persistenceId() { return "123"; } + // Absolute path to the journal plugin configuration entry in the `reference.conf` + @Override + public String journalPluginId() { return "akka.persistence.chronicle.journal"; } + // Absolute path to the snapshot store plugin configuration entry in the `reference.conf` + @Override + public String snapshotPluginId() { return "akka.persistence.chronicle.snapshot-store"; } + } + //#override-plugins + +} diff --git a/akka-docs/rst/java/lambda-persistence.rst b/akka-docs/rst/java/lambda-persistence.rst index 71a26d9848b..e7258e534c2 100644 --- a/akka-docs/rst/java/lambda-persistence.rst +++ b/akka-docs/rst/java/lambda-persistence.rst @@ -613,3 +613,25 @@ or .. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#shared-store-native-config in your Akka configuration. The LevelDB Java port is for testing purposes only. + +Multiple persistence plugin configurations +========================================== + +By default, persistent actor or view will use "default" journal and snapshot store plugins +configured in the following sections of the ``reference.conf`` configuration resource: + +.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#default-config + +Note that in this case actor or view overrides only ``persistenceId`` method: + +.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#default-plugins + +When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods, +the actor or view will be serviced by these specific persistence plugins instead of the defaults: + +.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#override-plugins + +Note that ``journalPluginId`` and ``snapshotPluginId`` must refer to properly configured ``reference.conf`` +plugin entires with standard ``class`` property as well as settings which are specific for those plugins, i.e.: + +.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#override-config diff --git a/akka-docs/rst/java/persistence.rst b/akka-docs/rst/java/persistence.rst index 18628415941..251f420ce0a 100644 --- a/akka-docs/rst/java/persistence.rst +++ b/akka-docs/rst/java/persistence.rst @@ -666,3 +666,24 @@ Configuration There are several configuration properties for the persistence module, please refer to the :ref:`reference configuration `. +Multiple persistence plugin configurations +========================================== + +By default, persistent actor or view will use "default" journal and snapshot store plugins +configured in the following sections of the ``reference.conf`` configuration resource: + +.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#default-config + +Note that in this case actor or view overrides only ``persistenceId`` method: + +.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#default-plugins + +When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods, +the actor or view will be serviced by these specific persistence plugins instead of the defaults: + +.. includecode:: ../java/code/docs/persistence/PersistenceMultiDocTest.java#override-plugins + +Note that ``journalPluginId`` and ``snapshotPluginId`` must refer to properly configured ``reference.conf`` +plugin entires with standard ``class`` property as well as settings which are specific for those plugins, i.e.: + +.. includecode:: ../scala/code/docs/persistence/PersistenceMultiDocSpec.scala#override-config diff --git a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst index 0f089fd2481..a9e173228f3 100644 --- a/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-persistence-experimental-2.3.x-2.4.x.rst @@ -178,8 +178,3 @@ persistent actor on the sending side. Read more about at-least-once delivery in the :ref:`documentation for Scala ` and :ref:`documentation for Java `. - - - - - \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala new file mode 100644 index 00000000000..bfb62d38b33 --- /dev/null +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceMultiDocSpec.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ + +import akka.persistence.PersistentActor + +object PersistenceMultiDocSpec { + + val DefaultConfig = """ + //#default-config + # Absolute path to the default journal plugin configuration entry. + akka.persistence.journal.plugin = "akka.persistence.journal.inmem" + # Absolute path to the default snapshot store plugin configuration entry. + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + //#default-config + """ + + //#default-plugins + trait ActorWithDefaultPlugins extends PersistentActor { + override def persistenceId = "123" + } + //#default-plugins + + val OverrideConfig = """ + //#override-config + # Configuration entry for the custom journal plugin, see `journalPluginId`. + akka.persistence.chronicle.journal { + # Standard persistence extension property: provider FQCN. + class = "akka.persistence.chronicle.ChronicleSyncJournal" + # Custom setting specific for the journal `ChronicleSyncJournal`. + folder = ${user.dir}/store/journal + # Standard persistence extension property: plugin actor uses config injection. + inject-config = true + } + # Configuration entry for the custom snapshot store plugin, see `snapshotPluginId`. + akka.persistence.chronicle.snapshot-store { + # Standard persistence extension property: provider FQCN. + class = "akka.persistence.chronicle.ChronicleSnapshotStore" + # Custom setting specific for the snapshot store `ChronicleSnapshotStore`. + folder = ${user.dir}/store/snapshot + # Standard persistence extension property: plugin actor uses config injection. + inject-config = true + } + //#override-config + """ + + //#override-plugins + trait ActorWithOverridePlugins extends PersistentActor { + override def persistenceId = "123" + // Absolute path to the journal plugin configuration entry in the `reference.conf`. + override def journalPluginId = "akka.persistence.chronicle.journal" + // Absolute path to the snapshot store plugin configuration entry in the `reference.conf`. + override def snapshotPluginId = "akka.persistence.chronicle.snapshot-store" + } + //#override-plugins + +} diff --git a/akka-docs/rst/scala/persistence.rst b/akka-docs/rst/scala/persistence.rst index 1c5dd5079d2..cd657ee551d 100644 --- a/akka-docs/rst/scala/persistence.rst +++ b/akka-docs/rst/scala/persistence.rst @@ -668,3 +668,24 @@ Configuration There are several configuration properties for the persistence module, please refer to the :ref:`reference configuration `. +Multiple persistence plugin configurations +========================================== + +By default, persistent actor or view will use "default" journal and snapshot store plugins +configured in the following sections of the ``reference.conf`` configuration resource: + +.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#default-config + +Note that in this case actor or view overrides only ``persistenceId`` method: + +.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#default-plugins + +When persistent actor or view overrides ``journalPluginId`` and ``snapshotPluginId`` methods, +the actor or view will be serviced by these specific persistence plugins instead of the defaults: + +.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#override-plugins + +Note that ``journalPluginId`` and ``snapshotPluginId`` must refer to properly configured ``reference.conf`` +plugin entires with standard ``class`` property as well as settings which are specific for those plugins, i.e.: + +.. includecode:: code/docs/persistence/PersistenceMultiDocSpec.scala#override-config diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 5a287874dc4..076fba17b42 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -1,182 +1,163 @@ -########################################## -# Akka Persistence Reference Config File # -########################################## +########################################################### +# Akka Persistence Extension Reference Configuration File # +########################################################### +# This is the reference config file that contains all the default settings. +# Make your edits in your application.conf in order to override these settings. +# Note that both journal and snapshot store plugin configuration entries require few fields: +# `class` : Fully qualified class name providing journal-plugin-api or snapshot-store-plugin-api implementation. +# `inject-config` : Plugin actor has a constructor which expects plugin configuration entry. This boolean field is optional. +# `plugin-dispatcher` : Absolute configuration path to the akka dispatcher configuration entry. This string field is optional. -akka { +# Note that journal and snapshot store plugins included with the extension are suitable for testing purposes only. +# You should change extension defaults or override `journalPluginId` and `snapshotPluginId` in the persistent actor or view. +# Directory of persistence journal and snapshot store plugins is available at the Akka Community Projects page http://akka.io/community/ - # Protobuf serialization for persistent messages - actor { +# Default persistence extension settings. +akka.persistence { + # Default journal settings. + journal { + # Absolute path to the journal plugin configuration entry used by persistent actor or view by default. + # Persistent actor or view can override `journalPluginId` method in order to rely on a different journal plugin. + plugin = "akka.persistence.journal.inmem" + # Maximum size of a persistent message batch written to the journal. + max-message-batch-size = 200 + # Maximum size of a deletion batch written to the journal. + max-deletion-batch-size = 10000 + } + # Default snapshot store settings. + snapshot-store { + # Absolute path to the snapshot plugin configuration entry used by persistent actor or view by default. + # Persistent actor or view can override `snapshotPluginId` method in order to rely on a different snapshot plugin. + plugin = "akka.persistence.snapshot-store.local" + } + # Default persistent view settings. + view { + # Automated incremental view update. + auto-update = on + # Interval between incremental updates. + auto-update-interval = 5s + # Maximum number of messages to replay per incremental view update. Set to -1 for no upper limit. + auto-update-replay-max = -1 + } + # Default reliable delivery settings. + at-least-once-delivery { + # Interval between re-delivery attempts. + redeliver-interval = 5s + # Maximum number of unconfirmed messages that will be sent in one re-delivery burst. + redelivery-burst-limit = 10000 + # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning`, message will be sent to the actor. + warn-after-number-of-unconfirmed-attempts = 5 + # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is allowed to hold in memory. + max-unconfirmed-messages = 100000 + } + # Default persistent extension thread pools. + dispatchers { + # Dispatcher used by every plugin which does not declare explicit `plugin-dispatcher` field. + default-plugin-dispatcher { + type = PinnedDispatcher + executor = "thread-pool-executor" + } + default-replay-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-max = 8 + } + } + default-stream-dispatcher { + type = Dispatcher + executor = "fork-join-executor" + fork-join-executor { + parallelism-min = 2 + parallelism-max = 8 + } + } + } +} +# Protobuf serialization for the persistent extension messages. +akka.actor { serializers { - - akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" - akka-persistence-message = "akka.persistence.serialization.MessageSerializer" + akka-persistence-message = "akka.persistence.serialization.MessageSerializer" + akka-persistence-snapshot = "akka.persistence.serialization.SnapshotSerializer" } - serialization-bindings { - - "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot - "akka.persistence.serialization.Message" = akka-persistence-message + "akka.persistence.serialization.Message" = akka-persistence-message + "akka.persistence.serialization.Snapshot" = akka-persistence-snapshot } - } - - persistence { - - journal { - - # Maximum size of a persistent message batch written to the journal. - max-message-batch-size = 200 - - # Maximum size of a deletion batch written to the journal. - max-deletion-batch-size = 10000 - - # Path to the journal plugin to be used - plugin = "akka.persistence.journal.leveldb" - - # In-memory journal plugin. - inmem { - - # Class name of the plugin. - class = "akka.persistence.journal.inmem.InmemJournal" + serialization-identifiers { + "akka.persistence.serialization.MessageSerializer" = 7 + "akka.persistence.serialization.SnapshotSerializer" = 8 + } +} - # Dispatcher for the plugin actor. - plugin-dispatcher = "akka.actor.default-dispatcher" - } +################################################### +# Persistence plugins included with the extension # +################################################### - # LevelDB journal plugin. - leveldb { +# In-memory journal plugin. +akka.persistence.journal.inmem { + # Class name of the plugin. + class = "akka.persistence.journal.inmem.InmemJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" +} - # Class name of the plugin. - class = "akka.persistence.journal.leveldb.LeveldbJournal" +# Local file system snapshot store plugin. +akka.persistence.snapshot-store.local { + # Class name of the plugin. + class = "akka.persistence.snapshot.local.LocalSnapshotStore" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + # Dispatcher for streaming snapshot IO. + stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" + # Storage location of snapshot files. + dir = "snapshots" +} - # Dispatcher for the plugin actor. - plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" +# LevelDB journal plugin. +# TODO move to separate module: https://github.com/akka/akka/issues/15884 +akka.persistence.journal.leveldb { + # Class name of the plugin. + class = "akka.persistence.journal.leveldb.LeveldbJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" + # Dispatcher for message replay. + replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" + # Storage location of LevelDB files. + dir = "journal" + # Use fsync on write. + fsync = on + # Verify checksum on read. + checksum = off + # Native LevelDB (via JNI) or LevelDB Java port. + native = on +} +# Shared LevelDB journal plugin (for testing only). +# TODO move to separate module: https://github.com/akka/akka/issues/15884 +akka.persistence.journal.leveldb-shared { + # Class name of the plugin. + class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" + # Dispatcher for the plugin actor. + plugin-dispatcher = "akka.actor.default-dispatcher" + # Timeout for async journal operations. + timeout = 10s + store { + # Dispatcher for shared store actor. + store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Dispatcher for message replay. - replay-dispatcher = "akka.persistence.dispatchers.default-replay-dispatcher" - + replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" # Storage location of LevelDB files. dir = "journal" - - # Use fsync on write + # Use fsync on write. fsync = on - # Verify checksum on read. checksum = off - - # Native LevelDB (via JNI) or LevelDB Java port + # Native LevelDB (via JNI) or LevelDB Java port. native = on - } - - # Shared LevelDB journal plugin (for testing only). - leveldb-shared { - - # Class name of the plugin. - class = "akka.persistence.journal.leveldb.SharedLeveldbJournal" - - # Dispatcher for the plugin actor. - plugin-dispatcher = "akka.actor.default-dispatcher" - - # timeout for async journal operations - timeout = 10s - - store { - - # Dispatcher for shared store actor. - store-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" - - # Dispatcher for message replay. - replay-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" - - # Storage location of LevelDB files. - dir = "journal" - - # Use fsync on write - fsync = on - - # Verify checksum on read. - checksum = off - - # Native LevelDB (via JNI) or LevelDB Java port - native = on - } - } - } - - snapshot-store { - - # Path to the snapshot store plugin to be used - plugin = "akka.persistence.snapshot-store.local" - - # Local filesystem snapshot store plugin. - local { - - # Class name of the plugin. - class = "akka.persistence.snapshot.local.LocalSnapshotStore" - - # Dispatcher for the plugin actor. - plugin-dispatcher = "akka.persistence.dispatchers.default-plugin-dispatcher" - - # Dispatcher for streaming snapshot IO. - stream-dispatcher = "akka.persistence.dispatchers.default-stream-dispatcher" - - # Storage location of snapshot files. - dir = "snapshots" - } - } - - view { - - # Automated incremental view update. - auto-update = on - - # Interval between incremental updates - auto-update-interval = 5s - - # Maximum number of messages to replay per incremental view update. Set to - # -1 for no upper limit. - auto-update-replay-max = -1 - } - - at-least-once-delivery { - # Interval between redelivery attempts - redeliver-interval = 5s - - # Maximum number of unconfirmed messages that will be sent in one redelivery burst - redelivery-burst-limit = 10000 - - # After this number of delivery attempts a `ReliableRedelivery.UnconfirmedWarning` - # message will be sent to the actor. - warn-after-number-of-unconfirmed-attempts = 5 - - # Maximum number of unconfirmed messages that an actor with AtLeastOnceDelivery is - # allowed to hold in memory. - max-unconfirmed-messages = 100000 - } - - dispatchers { - default-plugin-dispatcher { - type = PinnedDispatcher - executor = "thread-pool-executor" - } - default-replay-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - parallelism-min = 2 - parallelism-max = 8 - } - } - default-stream-dispatcher { - type = Dispatcher - executor = "fork-join-executor" - fork-join-executor { - parallelism-min = 2 - parallelism-max = 8 - } - } } - } } diff --git a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala index b06a546c414..dfda18e43b7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala @@ -12,6 +12,7 @@ import akka.actor.Stash import akka.actor.StashFactory import akka.event.Logging import akka.event.LoggingAdapter +import akka.actor.ActorRef /** * INTERNAL API @@ -36,13 +37,15 @@ private[persistence] object Eventsourced { * Scala API and implementation details of [[PersistentActor]], [[AbstractPersistentActor]] and * [[UntypedPersistentActor]]. */ -private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory { +private[persistence] trait Eventsourced extends Snapshotter with Stash with StashFactory with PersistenceIdentity { import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult import Eventsourced._ private val extension = Persistence(context.system) - private lazy val journal = extension.journalFor(persistenceId) + + private[persistence] lazy val journal = extension.journalFor(journalPluginId) + private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId) private val instanceId: Int = Eventsourced.instanceIdCounter.getAndIncrement() @@ -68,11 +71,6 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas case _ ⇒ true } - /** - * Id of the persistent entity for which messages should be replayed. - */ - def persistenceId: String - /** * Returns `persistenceId`. */ @@ -114,15 +112,18 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas override def preStart(): Unit = self ! Recover() - /** - * INTERNAL API. - */ + /** INTERNAL API. */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = currentState.stateReceive(receive, message) - /** - * INTERNAL API. - */ + /** INTERNAL API. */ + override protected[akka] def aroundPreStart(): Unit = { + // Fail fast on missing plugins. + val j = journal; val s = snapshotStore + super.aroundPreStart() + } + + /** INTERNAL API. */ override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = { try { internalStash.unstashAll() @@ -145,9 +146,7 @@ private[persistence] trait Eventsourced extends Snapshotter with Stash with Stas } } - /** - * INTERNAL API. - */ + /** INTERNAL API. */ override protected[akka] def aroundPostStop(): Unit = try { internalStash.unstashAll() diff --git a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala index cca664d91f3..6a8480eb3ef 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Persistence.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Persistence.scala @@ -5,13 +5,15 @@ package akka.persistence import scala.concurrent.duration._ - import com.typesafe.config.Config - import akka.actor._ import akka.dispatch.Dispatchers import akka.persistence.journal.AsyncWriteJournal import akka.util.Helpers.ConfigOps +import akka.event.LoggingAdapter +import akka.event.Logging +import java.util.concurrent.atomic.AtomicReference +import scala.annotation.tailrec /** * Persistence configuration. @@ -70,68 +72,142 @@ final class PersistenceSettings(config: Config) { } /** - * Persistence extension. + * Identification of [[PersistentActor]] or [[PersistentView]]. */ -object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { +//#persistence-identity +trait PersistenceIdentity { + /** - * Java API. + * Id of the persistent entity for which messages should be replayed. */ - override def get(system: ActorSystem): Persistence = super.get(system) + def persistenceId: String - def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) + /** + * Configuration id of the journal plugin servicing this persistent actor or view. + * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. + * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. + * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. + */ + def journalPluginId: String = "" + /** + * Configuration id of the snapshot plugin servicing this persistent actor or view. + * When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. + * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. + * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. + */ + def snapshotPluginId: String = "" + +} +//#persistence-identity + +/** + * Persistence extension provider. + */ +object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider { + /** Java API. */ + override def get(system: ActorSystem): Persistence = super.get(system) + def createExtension(system: ExtendedActorSystem): Persistence = new Persistence(system) def lookup() = Persistence + /** INTERNAL API. */ + private[persistence] case class PluginHolder(actor: ActorRef) extends Extension } /** * Persistence extension. */ class Persistence(val system: ExtendedActorSystem) extends Extension { + import Persistence._ + + private def log: LoggingAdapter = Logging(system, getClass.getName) + private val DefaultPluginDispatcherId = "akka.persistence.dispatchers.default-plugin-dispatcher" + private val config = system.settings.config.getConfig("akka.persistence") + // Lazy, so user is not forced to configure defaults when she is not using them. + private lazy val defaultJournalPluginId = config.getString("journal.plugin") + + // Lazy, so user is not forced to configure defaults when she is not using them. + private lazy val defaultSnapshotPluginId = config.getString("snapshot-store.plugin") + val settings = new PersistenceSettings(config) - private val snapshotStore = createPlugin("snapshot-store") { _ ⇒ + private def journalDispatchSelector(klaz: Class[_]): String = + if (classOf[AsyncWriteJournal].isAssignableFrom(klaz)) Dispatchers.DefaultDispatcherId else DefaultPluginDispatcherId + + private def snapshotDispatchSelector(klaz: Class[_]): String = DefaultPluginDispatcherId - } - private val journal = createPlugin("journal") { clazz ⇒ - if (classOf[AsyncWriteJournal].isAssignableFrom(clazz)) Dispatchers.DefaultDispatcherId - else DefaultPluginDispatcherId - } + /** Check for default identity. */ + private def isDefault(text: String) = text == null || text.length == 0 - /** - * Creates a canonical persistent actor id from a persistent actor ref. - */ - def persistenceId(persistentActor: ActorRef): String = id(persistentActor) + /** Discovered persistence journal plugins. */ + private val journalPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) + + /** Discovered persistence snapshot store plugins. */ + private val snapshotPluginExtensionId = new AtomicReference[Map[String, ExtensionId[PluginHolder]]](Map.empty) /** - * Returns a snapshot store for a persistent actor identified by `persistenceId`. + * Returns a journal plugin actor identified by `journalPluginId`. + * When empty, looks in `akka.persistence.journal.plugin` to find configuration entry path. + * When configured, uses `journalPluginId` as absolute path to the journal configuration entry. + * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - def snapshotStoreFor(persistenceId: String): ActorRef = { - // Currently returns a snapshot store singleton but this methods allows for later - // optimizations where each persistent actor can have its own snapshot store actor. - snapshotStore + @tailrec final def journalFor(journalPluginId: String): ActorRef = { + val configPath = if (isDefault(journalPluginId)) defaultJournalPluginId else journalPluginId + val extensionIdMap = journalPluginExtensionId.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system).actor + case None ⇒ + val extensionId = new ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = + PluginHolder(createPlugin(configPath)(journalDispatchSelector)) + } + journalPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + journalFor(journalPluginId) // Recursive invocation. + } } /** - * Returns a journal for a persistent actor identified by `persistenceId`. + * Returns a snapshot store plugin actor identified by `snapshotPluginId`. + * When empty, looks in `akka.persistence.snapshot-store.plugin` to find configuration entry path. + * When configured, uses `snapshotPluginId` as absolute path to the snapshot store configuration entry. + * Configuration entry must contain few required fields, such as `class`. See `src/main/resources/reference.conf`. */ - def journalFor(persistenceId: String): ActorRef = { - // Currently returns a journal singleton but this methods allows for later - // optimizations where each persistent actor can have its own journal actor. - journal + @tailrec final def snapshotStoreFor(snapshotPluginId: String): ActorRef = { + val configPath = if (isDefault(snapshotPluginId)) defaultSnapshotPluginId else snapshotPluginId + val extensionIdMap = snapshotPluginExtensionId.get + extensionIdMap.get(configPath) match { + case Some(extensionId) ⇒ + extensionId(system).actor + case None ⇒ + val extensionId = new ExtensionId[PluginHolder] { + override def createExtension(system: ExtendedActorSystem): PluginHolder = + PluginHolder(createPlugin(configPath)(snapshotDispatchSelector)) + } + snapshotPluginExtensionId.compareAndSet(extensionIdMap, extensionIdMap.updated(configPath, extensionId)) + snapshotStoreFor(snapshotPluginId) // Recursive invocation. + } } - private def createPlugin(pluginType: String)(dispatcherSelector: Class[_] ⇒ String) = { - val pluginConfigPath = config.getString(s"${pluginType}.plugin") - val pluginConfig = system.settings.config.getConfig(pluginConfigPath) + private def createPlugin(configPath: String)(dispatcherSelector: Class[_] ⇒ String) = { + val pluginActorName = configPath + val pluginConfig = system.settings.config.getConfig(configPath) val pluginClassName = pluginConfig.getString("class") + log.debug(s"Create plugin: ${pluginActorName} ${pluginClassName}") val pluginClass = system.dynamicAccess.getClassFor[AnyRef](pluginClassName).get + val pluginInjectConfig = if (pluginConfig.hasPath("inject-config")) pluginConfig.getBoolean("inject-config") else false val pluginDispatcherId = if (pluginConfig.hasPath("plugin-dispatcher")) pluginConfig.getString("plugin-dispatcher") else dispatcherSelector(pluginClass) - system.systemActorOf(Props(pluginClass).withDispatcher(pluginDispatcherId), pluginType) + val pluginActorArgs = if (pluginInjectConfig) List(pluginConfig) else Nil + val pluginActorProps = Props(Deploy(dispatcher = pluginDispatcherId), pluginClass, pluginActorArgs) + system.systemActorOf(pluginActorProps, pluginActorName) } + /** Creates a canonical persistent actor id from a persistent actor ref. */ + def persistenceId(persistentActor: ActorRef): String = id(persistentActor) + private def id(ref: ActorRef) = ref.path.toStringWithoutAddress + } diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala index 2d9b9f76ed4..48717fad6e7 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentActor.scala @@ -116,14 +116,14 @@ object Recover { /** * An persistent Actor - can be used to implement command or event sourcing. */ -trait PersistentActor extends Eventsourced { +trait PersistentActor extends Eventsourced with PersistenceIdentity { def receive = receiveCommand } /** * Java API: an persistent actor - can be used to implement command or event sourcing. */ -abstract class UntypedPersistentActor extends UntypedActor with Eventsourced { +abstract class UntypedPersistentActor extends UntypedActor with Eventsourced with PersistenceIdentity { final def onReceive(message: Any) = onReceiveCommand(message) diff --git a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala index 141befac730..f1961c9bffb 100644 --- a/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala +++ b/akka-persistence/src/main/scala/akka/persistence/PersistentView.scala @@ -80,7 +80,7 @@ private[akka] object PersistentView { * - [[autoUpdate]] for turning automated updates on or off * - [[autoUpdateReplayMax]] for limiting the number of replayed messages per view update cycle */ -trait PersistentView extends Actor with Snapshotter with Stash with StashFactory { +trait PersistentView extends Actor with Snapshotter with Stash with StashFactory with PersistenceIdentity { import PersistentView._ import JournalProtocol._ import SnapshotProtocol.LoadSnapshotResult @@ -88,7 +88,9 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory private val extension = Persistence(context.system) private val viewSettings = extension.settings.view - private lazy val journal = extension.journalFor(persistenceId) + + private[persistence] lazy val journal = extension.journalFor(journalPluginId) + private[persistence] lazy val snapshotStore = extension.snapshotStoreFor(snapshotPluginId) private var schedule: Option[Cancellable] = None @@ -118,11 +120,6 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory */ def viewId: String - /** - * Id of the persistent entity for which messages should be replayed. - */ - def persistenceId: String - /** * Returns `viewId`. */ @@ -187,13 +184,18 @@ trait PersistentView extends Actor with Snapshotter with Stash with StashFactory self, ScheduledUpdate(autoUpdateReplayMax))) } - /** - * INTERNAL API. - */ + /** INTERNAL API. */ override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = { currentState.stateReceive(receive, message) } + /** INTERNAL API. */ + override protected[akka] def aroundPreStart(): Unit = { + // Fail fast on missing plugins. + val j = journal; val s = snapshotStore + super.aroundPreStart() + } + override def preRestart(reason: Throwable, message: Option[Any]): Unit = { try internalStash.unstashAll() finally super.preRestart(reason, message) } diff --git a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala index f03408322b6..413b386d4a5 100644 --- a/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala +++ b/akka-persistence/src/main/scala/akka/persistence/Snapshotter.scala @@ -11,7 +11,9 @@ import akka.persistence.SnapshotProtocol._ * Snapshot API on top of the internal snapshot protocol. */ trait Snapshotter extends Actor { - private lazy val snapshotStore = Persistence(context.system).snapshotStoreFor(snapshotterId) + + /** Snapshot store plugin actor. */ + private[persistence] def snapshotStore: ActorRef /** * Snapshotter id. diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 4fce00eb681..7d056f1a313 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -30,8 +30,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends Serializer { val PersistentImplClass = classOf[PersistentImpl] val AtLeastOnceDeliverySnapshotClass = classOf[AtLeastOnceDeliverySnap] - def identifier: Int = 7 - def includeManifest: Boolean = true + val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]] + override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""") + override val includeManifest: Boolean = true private lazy val transportInformation: Option[Serialization.Information] = { val address = system.provider.getDefaultAddress diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala index cea4366907a..7af2ef9e850 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/SnapshotSerializer.scala @@ -31,8 +31,10 @@ private[serialization] final case class SnapshotHeader(serializerId: Int, manife * [[Snapshot]] serializer. */ class SnapshotSerializer(system: ExtendedActorSystem) extends Serializer { - def identifier: Int = 8 - def includeManifest: Boolean = false + + val SerializationIdentifiers = "akka.actor.serialization-identifiers" // TODO move to [[Serializer]] + override val identifier: Int = system.settings.config.getInt(s"""${SerializationIdentifiers}."${getClass.getName}"""") + override val includeManifest: Boolean = false private lazy val transportInformation: Option[Serialization.Information] = { val address = system.provider.getDefaultAddress