Skip to content

Commit

Permalink
=per #3915 Make become work during recovery for EventsourcedProcessor…
Browse files Browse the repository at this point in the history
… et.c.
  • Loading branch information
bantonsson committed Mar 21, 2014
1 parent 82ee7e2 commit 26c493e
Show file tree
Hide file tree
Showing 21 changed files with 136 additions and 86 deletions.
71 changes: 39 additions & 32 deletions akka-docs/rst/java/lambda-persistence.rst

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions akka-docs/rst/java/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -487,13 +487,20 @@ about successful state changes by publishing events.

When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command. The example also shows how to switch between command different command handlers
with ``getContext().become()`` and ``getContext().unbecome()``.
calls in context of a single command.

The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Java <http://www.typesafe.com/activator/template/akka-sample-persistence-java>`_.
It contains instructions on how to run the ``EventsourcedExample``.

.. note::

It's also possible to switch between different command handlers during normal processing and recovery
with ``getContext().become()`` and ``getContext().unbecome()``. To get the actor into the same state after
recovery you need to take special care to perform the same state transitions with ``become`` and
``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler.


Reliable event delivery
-----------------------

Expand Down
11 changes: 9 additions & 2 deletions akka-docs/rst/scala/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,20 @@ about successful state changes by publishing events.

When persisting events with ``persist`` it is guaranteed that the processor will not receive further commands between
the ``persist`` call and the execution(s) of the associated event handler. This also holds for multiple ``persist``
calls in context of a single command. The example also shows how to switch between command different command handlers
with ``context.become()`` and ``context.unbecome()``.
calls in context of a single command.

The easiest way to run this example yourself is to download `Typesafe Activator <http://www.typesafe.com/platform/getstarted>`_
and open the tutorial named `Akka Persistence Samples with Scala <http://www.typesafe.com/activator/template/akka-sample-persistence-scala>`_.
It contains instructions on how to run the ``EventsourcedExample``.

.. note::

It's also possible to switch between different command handlers during normal processing and recovery
with ``context.become()`` and ``context.unbecome()``. To get the actor into the same state after
recovery you need to take special care to perform the same state transitions with ``become`` and
``unbecome`` in the ``receiveRecover`` method as you would have done in the command handler.


Reliable event delivery
-----------------------

Expand Down
35 changes: 25 additions & 10 deletions akka-persistence/src/main/scala/akka/persistence/Eventsourced.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ private[persistence] trait Eventsourced extends Processor {
* `processingCommands`
*/
private val recovering: State = new State {
// cache the recoveryBehavior since it's a def for binary compatibility in 2.3.x
private val _recoveryBehavior: Receive = recoveryBehavior

override def toString: String = "recovering"

def aroundReceive(receive: Receive, message: Any) {
Eventsourced.super.aroundReceive(receive, message)
// Since we are recovering we can ignore the receive behavior from the stack
Eventsourced.super.aroundReceive(_recoveryBehavior, message)
message match {
case _: ReadHighestSequenceNrSuccess | _: ReadHighestSequenceNrFailure
currentState = processingCommands
Expand Down Expand Up @@ -93,6 +97,21 @@ private[persistence] trait Eventsourced extends Processor {
}
}

/**
* INTERNAL API.
*
* This is a def and not a val because of binary compatibility in 2.3.x.
* It is cached where it is used.
*/
private def recoveryBehavior: Receive = {
case Persistent(payload, _) if recoveryRunning && receiveRecover.isDefinedAt(payload)
receiveRecover(payload)
case s: SnapshotOffer if receiveRecover.isDefinedAt(s)
receiveRecover(s)
case f: RecoveryFailure if receiveRecover.isDefinedAt(f)
receiveRecover(f)
}

private var persistInvocations: List[(Any, Any Unit)] = Nil
private var persistentEventBatch: List[PersistentRepr] = Nil

Expand Down Expand Up @@ -190,14 +209,10 @@ private[persistence] trait Eventsourced extends Processor {

/**
* INTERNAL API.
*
* Only here for binary compatibility in 2.3.x.
*/
protected[persistence] val initialBehavior: Receive = {
case Persistent(payload, _) if receiveRecover.isDefinedAt(payload) && recoveryRunning
receiveRecover(payload)
case s: SnapshotOffer if receiveRecover.isDefinedAt(s)
receiveRecover(s)
case f: RecoveryFailure if receiveRecover.isDefinedAt(f)
receiveRecover(f)
protected[persistence] val initialBehavior: Receive = recoveryBehavior orElse {
case msg if receiveCommand.isDefinedAt(msg)
receiveCommand(msg)
}
Expand All @@ -207,14 +222,14 @@ private[persistence] trait Eventsourced extends Processor {
* An event sourced processor.
*/
trait EventsourcedProcessor extends Processor with Eventsourced {
final def receive = initialBehavior
final def receive = receiveCommand
}

/**
* Java API: an event sourced processor.
*/
abstract class UntypedEventsourcedProcessor extends UntypedProcessor with Eventsourced {
final def onReceive(message: Any) = initialBehavior(message)
final def onReceive(message: Any) = onReceiveCommand(message)

final def receiveRecover: Receive = {
case msg onReceiveRecover(msg)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,23 @@ object EventsourcedSpec {
}
}

class SnapshottingBecomingEventsourcedProcessor(name: String, probe: ActorRef) extends SnapshottingEventsourcedProcessor(name, probe) {
val becomingRecover: Receive = {
case msg: SnapshotOffer
context.become(becomingCommand)
// sending ourself a normal message here also tests
// that we stash them until recovery is complete
self ! "It's changing me"
super.receiveRecover(msg)
}

override def receiveRecover = becomingRecover.orElse(super.receiveRecover)

val becomingCommand: Receive = receiveCommand orElse {
case "It's changing me" probe ! "I am becoming"
}
}

class ReplyInEventHandlerProcessor(name: String) extends ExampleProcessor(name) {
val receiveCommand: Receive = {
case Cmd("a") persist(Evt("a"))(evt sender ! evt.data)
Expand Down Expand Up @@ -297,6 +314,21 @@ abstract class EventsourcedSpec(config: Config) extends AkkaSpec(config) with Pe
processor2 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"))
}
"support context.become during recovery" in {
val processor1 = system.actorOf(Props(classOf[SnapshottingEventsourcedProcessor], name, testActor))
processor1 ! Cmd("b")
processor1 ! "snap"
processor1 ! Cmd("c")
expectMsg("saved")
processor1 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"))

val processor2 = system.actorOf(Props(classOf[SnapshottingBecomingEventsourcedProcessor], name, testActor))
expectMsg("offered")
expectMsg("I am becoming")
processor2 ! GetState
expectMsg(List("a-1", "a-2", "b-41", "b-42", "c-41", "c-42"))
}
"be able to reply within an event handler" in {
val processor = namedProcessor[ReplyInEventHandlerProcessor]
processor ! Cmd("a")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
name=akka-sample-persistence-java-lambda
title=Akka Persistence Samples in Java with Lambdas
description=Akka Persistence Samples in Java with Lambdas
tags=akka,java,java8,sample,persistence,sample
authorName=Akka Team
authorLink=http://akka.io/
sourceLink=https://github.com/akka/akka
12 changes: 12 additions & 0 deletions akka-samples/akka-sample-persistence-java-lambda/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name := "akka-sample-persistence-java-lambda"

version := "1.0"

scalaVersion := "2.10.3"

javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-persistence-experimental" % "2.3-SNAPSHOT"
)

Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=0.13.1
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,12 @@ public int getNumEvents() {
state.update(evt);
if (evt.equals(evt2)) {
context().system().eventStream().publish(evt);
if (data.equals("foo")) { context().become(och, true); }
}
});
}).
match(String.class, s -> s.equals("snap"), s -> saveSnapshot(state.copy())).
match(String.class, s -> s.equals("print"), s -> System.out.println(state)).build();
}

PartialFunction<Object, BoxedUnit> och = ReceiveBuilder.
match(Cmd.class, cmd -> cmd.getData().equals("bar"), cmd -> {
persist(new Evt("bar-" + getNumEvents()), event -> {
state.update(event);
context().unbecome();
});
unstashAll();
}).
matchAny(o -> stash()).build();
}
//#eventsourced-example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ public void apply(Evt evt) throws Exception {
state.update(evt);
if (evt.equals(evt2)) {
getContext().system().eventStream().publish(evt);
if (data.equals("foo")) getContext().become(otherCommandHandler);
}
}
});
Expand All @@ -100,22 +99,6 @@ public void apply(Evt evt) throws Exception {
System.out.println(state);
}
}

private Procedure<Object> otherCommandHandler = new Procedure<Object>() {
public void apply(Object msg) throws Exception {
if (msg instanceof Cmd && ((Cmd)msg).getData().equals("bar")) {
persist(new Evt("bar-" + getNumEvents()), new Procedure<Evt>() {
public void apply(Evt event) throws Exception {
state.update(event);
getContext().unbecome();
}
});
unstashAll();
} else {
stash();
}
}
};
}
//#eventsourced-example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,11 @@ class ExampleProcessor extends EventsourcedProcessor {
persist(Evt(s"${data}-${numEvents + 1}")) { event =>
updateState(event)
context.system.eventStream.publish(event)
if (data == "foo") context.become(otherCommandHandler)
}
case "snap" => saveSnapshot(state)
case "print" => println(state)
}

val otherCommandHandler: Receive = {
case Cmd("bar") =>
persist(Evt(s"bar-${numEvents}")) { event =>
updateState(event)
context.unbecome()
}
unstashAll()
case other => stash()
}
}
//#eventsourced-example

Expand All @@ -57,7 +47,7 @@ object EventsourcedExample extends App {
val processor = system.actorOf(Props[ExampleProcessor], "processor-4-scala")

processor ! Cmd("foo")
processor ! Cmd("baz") // will be stashed
processor ! Cmd("baz")
processor ! Cmd("bar")
processor ! "snap"
processor ! Cmd("buzz")
Expand Down
2 changes: 1 addition & 1 deletion scripts/build/extra-build-steps.sh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@ mvncleantest "$java8_home" "akka-samples/akka-docs-java-lambda"

mvncleantest "$java8_home" "akka-samples/akka-sample-fsm-java-lambda"

mvncleantest "$java8_home" "akka-samples/akka-sample-persistence-java8"
mvncleantest "$java8_home" "akka-samples/akka-sample-persistence-java-lambda"

mvncleantest "$java8_home" "akka-samples/akka-sample-supervision-java-lambda"

0 comments on commit 26c493e

Please sign in to comment.