Skip to content

Commit

Permalink
Merge pull request akka#1757 from eligosource/wip-3652-custom-seriali…
Browse files Browse the repository at this point in the history
…zation-krasserm

!per #3652 Custom snapshot and persistent message serialization
  • Loading branch information
patriknw committed Oct 14, 2013
2 parents 4a2171e + 2a30399 commit a30ca0d
Show file tree
Hide file tree
Showing 35 changed files with 4,045 additions and 386 deletions.
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence;

import scala.Option;
Expand Down
151 changes: 34 additions & 117 deletions akka-docs/rst/java/code/docs/persistence/PersistencePluginDocTest.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence;

//#plugin-imports
Expand All @@ -10,132 +14,45 @@
//#plugin-imports

public class PersistencePluginDocTest {
static Object o1 = new Object() {
abstract class MySnapshotStore extends SnapshotStore {
//#snapshot-store-plugin-api
/**
* Plugin Java API.
*
* Asynchronously loads a snapshot.
*
* @param processorId processor id.
* @param criteria selection criteria for loading.
*/
public abstract Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria);

/**
* Plugin Java API.
*
* Asynchronously saves a snapshot.
*
* @param metadata snapshot metadata.
* @param snapshot snapshot.
*/
public abstract Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot);

/**
* Plugin Java API.
*
* Called after successful saving of a snapshot.
*
* @param metadata snapshot metadata.
*/
public abstract void onSaved(SnapshotMetadata metadata) throws Exception;

/**
* Plugin Java API.
*
* Deletes the snapshot identified by `metadata`.
*
* @param metadata snapshot metadata.
*/
public abstract void doDelete(SnapshotMetadata metadata) throws Exception;
//#snapshot-store-plugin-api
class MySnapshotStore extends SnapshotStore {
@Override
public Future<Option<SelectedSnapshot>> doLoadAsync(String processorId, SnapshotSelectionCriteria criteria) {
return null;
}

abstract class MySyncWriteJournal extends SyncWriteJournal {
//#sync-write-plugin-api
/**
* Plugin Java API.
*
* Synchronously writes a `persistent` message to the journal.
*/
@Override
public abstract void doWrite(PersistentImpl persistent) throws Exception;
@Override
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
return null;
}

/**
* Plugin Java API.
*
* Synchronously marks a `persistent` message as deleted.
*/
@Override
public abstract void doDelete(PersistentImpl persistent) throws Exception;
@Override
public void onSaved(SnapshotMetadata metadata) throws Exception {
}

/**
* Plugin Java API.
*
* Synchronously writes a delivery confirmation to the journal.
*/
@Override
public abstract void doConfirm(String processorId, long sequenceNr, String channelId) throws Exception;
//#sync-write-plugin-api
@Override
public void doDelete(SnapshotMetadata metadata) throws Exception {
}
}

abstract class MyAsyncWriteJournal extends AsyncWriteJournal {
//#async-write-plugin-api
/**
* Plugin Java API.
*
* Asynchronously writes a `persistent` message to the journal.
*/
@Override
public abstract Future<Void> doWriteAsync(PersistentImpl persistent);
class MyAsyncJournal extends AsyncWriteJournal {
@Override
public Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback) {
return null;
}

/**
* Plugin Java API.
*
* Asynchronously marks a `persistent` message as deleted.
*/
@Override
public abstract Future<Void> doDeleteAsync(PersistentImpl persistent);
@Override
public Future<Void> doWriteAsync(PersistentImpl persistent) {
return null;
}

/**
* Plugin Java API.
*
* Asynchronously writes a delivery confirmation to the journal.
*/
@Override
public abstract Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId);
//#async-write-plugin-api
@Override
public Future<Void> doDeleteAsync(PersistentImpl persistent) {
return null;
}

abstract class MyAsyncReplay extends AsyncReplay {
//#async-replay-plugin-api
/**
* Plugin Java API.
*
* Asynchronously replays persistent messages. Implementations replay a message
* by calling `replayCallback`. The returned future must be completed when all
* messages (matching the sequence number bounds) have been replayed. The future
* `Long` value must be the highest stored sequence number in the journal for the
* specified processor. The future must be completed with a failure if any of
* the persistent messages could not be replayed.
*
* The `replayCallback` must also be called with messages that have been marked
* as deleted. In this case a replayed message's `deleted` field must be set to
* `true`.
*
* The channel ids of delivery confirmations that are available for a replayed
* message must be contained in that message's `confirms` sequence.
*
* @param processorId processor id.
* @param fromSequenceNr sequence number where replay should start.
* @param toSequenceNr sequence number where replay should end (inclusive).
* @param replayCallback called to replay a single message.
*/
@Override
public abstract Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long toSequenceNr, Procedure<PersistentImpl> replayCallback);
//#async-replay-plugin-api
@Override
public Future<Void> doConfirmAsync(String processorId, long sequenceNr, String channelId) {
return null;
}
};
}
}
63 changes: 49 additions & 14 deletions akka-docs/rst/java/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,21 @@
Persistence
###########

This section describes an early access version of the Akka persistence module. Akka persistence is heavily inspired
by the `eventsourced`_ library. It follows the same concepts and architecture of `eventsourced`_ but significantly
differs on API and implementation level.
Akka persistence enables stateful actors to persist their internal state so that it can be recovered when an actor
is started, restarted by a supervisor or migrated in a cluster. It also allows stateful actors to recover from JVM
crashes, for example. The key concept behind Akka persistence is that only changes to an actor's internal state are
persisted but never its current state directly (except for optional snapshots). These changes are only ever appended
to storage, nothing is ever mutated, which allows for very high transaction rates and efficient replication. Stateful
actors are recovered by replaying stored changes to these actors from which they can rebuild internal state. This can
be either the full history of changes or starting from a snapshot of internal actor state which can dramatically
reduce recovery times.

Storage backends for state changes and snapshots are pluggable in Akka persistence. Currently, these are written to
the local filesystem. Distributed and replicated storage, with the possibility of scaling writes, will be available
soon.

Akka persistence is inspired by the `eventsourced`_ library. It follows the same concepts and architecture of
`eventsourced`_ but significantly differs on API and implementation level.

.. warning::

Expand All @@ -31,13 +43,20 @@ Akka persistence is a separate jar file. Make sure that you have the following d
Architecture
============

* *Processor*: A processor is a persistent actor. Messages sent to a processor are written to a journal before
its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed
* *Processor*: A processor is a persistent, stateful actor. Messages sent to a processor are written to a journal
before its ``onReceive`` method is called. When a processor is started or restarted, journaled messages are replayed
to that processor, so that it can recover internal state from these messages.

* *Channel*: Channels are used by processors to communicate with other actors. They prevent that replayed messages
are redundantly delivered to these actors.

* *Journal*: A journal stores the sequence of messages sent to a processor. An application can control which messages
are stored and which are received by the processor without being journaled. The storage backend of a journal is
pluggable.

* *Snapshot store*: A snapshot store persists snapshots of a processor's internal state. Snapshots are used for
optimizing recovery times. The storage backend of a snapshot store is pluggable.

Use cases
=========

Expand Down Expand Up @@ -69,10 +88,11 @@ A processor can be implemented by extending the abstract ``UntypedProcessor`` cl

Processors only write messages of type ``Persistent`` to the journal, others are received without being persisted.
When a processor's ``onReceive`` method is called with a ``Persistent`` message it can safely assume that this message
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
receives a ``PersistenceFailure`` message instead of a ``Persistent`` message. In this case, a processor may want to
inform the sender about the failure, so that the sender can re-send the message, if needed, under the assumption that
the journal recovered from a temporary failure.
has been successfully written to the journal. If a journal fails to write a ``Persistent`` message then the processor
is stopped, by default. If an application wants that a processors continues to run on persistence failures it must
handle ``PersistenceFailure`` messages. In this case, a processor may want to inform the sender about the failure,
so that the sender can re-send the message, if needed, under the assumption that the journal recovered from a
temporary failure.

An ``UntypedProcessor`` itself is an ``Actor`` and can therefore be instantiated with ``actorOf``.

Expand Down Expand Up @@ -268,16 +288,16 @@ A journal plugin either extends ``SyncWriteJournal`` or ``AsyncWriteJournal``.
actor that should be extended when the storage backend API only supports synchronous, blocking writes. The
methods to be implemented in this case are:

.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#sync-write-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/SyncWritePlugin.java#sync-write-plugin-api

``AsyncWriteJournal`` is an actor that should be extended if the storage backend API supports asynchronous,
non-blocking writes. The methods to be implemented in that case are:

.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-write-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncWritePlugin.java#async-write-plugin-api

Message replays are always asynchronous, therefore, any journal plugin must implement:

.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#async-replay-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/journal/japi/AsyncReplayPlugin.java#async-replay-plugin-api

A journal plugin can be activated with the following minimal configuration:

Expand All @@ -292,7 +312,7 @@ Snapshot store plugin API

A snapshot store plugin must extend the ``SnapshotStore`` actor and implement the following methods:

.. includecode:: code/docs/persistence/PersistencePluginDocTest.java#snapshot-store-plugin-api
.. includecode:: ../../../akka-persistence/src/main/java/akka/persistence/snapshot/japi/SnapshotStorePlugin.java#snapshot-store-plugin-api

A snapshot store plugin can be activated with the following minimal configuration:

Expand All @@ -301,10 +321,25 @@ A snapshot store plugin can be activated with the following minimal configuratio
The specified plugin ``class`` must have a no-arg constructor. The ``plugin-dispatcher`` is the dispatcher
used for the plugin actor. If not specified, it defaults to ``akka.persistence.dispatchers.default-plugin-dispatcher``.

Custom serialization
====================

Serialization of snapshots and payloads of ``Persistent`` messages is configurable with Akka's
:ref:`serialization-java` infrastructure. For example, if an application wants to serialize

* payloads of type ``MyPayload`` with a custom ``MyPayloadSerializer`` and
* snapshots of type ``MySnapshot`` with a custom ``MySnapshotSerializer``

it must add

.. includecode:: ../scala/code/docs/persistence/PersistenceSerializerDocSpec.scala#custom-serializer-config

to the application configuration. If not specified, a default serializer is used, which is the ``JavaSerializer``
in this example.

Upcoming features
=================

* Reliable channels
* Custom serialization of messages and snapshots
* Extended deletion of messages and snapshots
* ...
14 changes: 1 addition & 13 deletions akka-docs/rst/java/serialization.rst
Original file line number Diff line number Diff line change
Expand Up @@ -166,19 +166,7 @@ There is also a default remote address which is the one used by cluster support
Deep serialization of Actors
----------------------------

The current recommended approach to do deep serialization of internal actor state is to use Event Sourcing,
for more reading on the topic, see these examples:

`Martin Krasser on EventSourcing Part1 <http://krasserm.blogspot.com/2011/11/building-event-sourced-web-application.html>`_

`Martin Krasser on EventSourcing Part2 <http://krasserm.blogspot.com/2012/01/building-event-sourced-web-application.html>`_


.. note::

Built-in API support for persisting Actors will come in a later release, see the roadmap for more info:

`Akka 2.0 roadmap <https://docs.google.com/a/typesafe.com/document/d/18W9-fKs55wiFNjXL9q50PYOnR7-nnsImzJqHOPPbM4E>`_
The recommended approach to do deep serialization of internal actor state is to use Akka :ref:`persistence-java`.

A Word About Java Serialization
===============================
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence

import akka.actor.ActorSystem
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence

//#plugin-imports
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/

package docs.persistence

import com.typesafe.config._

import org.scalatest.WordSpec

import akka.actor.ActorSystem
import akka.serialization.{ Serializer, SerializationExtension }

class PersistenceSerializerDocSpec extends WordSpec {

val customSerializerConfig =
"""
//#custom-serializer-config
akka.actor {
serializers {
my-payload = "docs.persistence.MyPayloadSerializer"
my-snapshot = "docs.persistence.MySnapshotSerializer"
}
serialization-bindings {
"docs.persistence.MyPayload" = my-payload
"docs.persistence.MySnapshot" = my-snapshot
}
}
//#custom-serializer-config
""".stripMargin

SerializationExtension(ActorSystem("doc", ConfigFactory.parseString(customSerializerConfig)))
}

class MyPayload
class MySnapshot

class MyPayloadSerializer extends Serializer {
def identifier: Int = 77124
def includeManifest: Boolean = false
def toBinary(o: AnyRef): Array[Byte] = ???
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}

class MySnapshotSerializer extends Serializer {
def identifier: Int = 77125
def includeManifest: Boolean = false
def toBinary(o: AnyRef): Array[Byte] = ???
def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = ???
}
Loading

0 comments on commit a30ca0d

Please sign in to comment.