Skip to content

Commit

Permalink
!per #3729 Journaling protocol optimization
Browse files Browse the repository at this point in the history
- internal batching of individually received Persistent messages
- testing fault tolerance of Processor in presence of random
  * journaling failures
  * processing failures
  • Loading branch information
krasserm committed Nov 27, 2013
1 parent 5af6d07 commit 6e2f80b
Show file tree
Hide file tree
Showing 39 changed files with 562 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,7 @@ public Future<Long> doReplayAsync(String processorId, long fromSequenceNr, long
}

@Override
public Future<Void> doWriteAsync(PersistentRepr persistent) {
return null;
}

@Override
public Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch) {
public Future<Void> doWriteAsync(Iterable<PersistentRepr> persistentBatch) {
return null;
}

Expand Down
27 changes: 19 additions & 8 deletions akka-docs/rst/java/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -381,17 +381,28 @@ another behavior, defined by ``otherCommandHandler``, and back using ``getContex
Batch writes
============

Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message.
To optimize throughput, an ``UntypedProcessor`` internally batches received ``Persistent`` messages under high load before
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
to a configurable maximum size (default is ``200``) under high load.

.. includecode:: ../scala/code/docs/persistence/PersistencePluginDocSpec.scala#max-batch-size

A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed
writing the previous batch. Batch writes are never timer-based which keeps latencies as low as possible.

Applications that want to have more explicit control over batch writes and batch sizes can send processors
``PersistentBatch`` messages.

.. includecode:: code/docs/persistence/PersistenceDocTest.java#batch-write

``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are
received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes
can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example,
in :ref:`event-sourcing-java`, all events that are generated and persisted by a single command are batch-written to
the journal (even if ``persist`` is called multiple times per command). The recovery of an
``UntypedEventsourcedProcessor`` will therefore never be done partially i.e. with only a subset of events persisted
by a single command.
``Persistent`` messages contained in a ``PersistentBatch`` message are always written atomically, even if the batch
size is greater than ``max-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches.
``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor.

``PersistentBatch`` messages, for example, are used internally by an ``UntypedEventsourcedProcessor`` to ensure atomic
writes of events. All events that are persisted in context of a single command are written as single batch to the
journal (even if ``persist`` is called multiple times per command). The recovery of an ``UntypedEventsourcedProcessor``
will therefore never be done partially i.e. with only a subset of events persisted by a single command.

Storage plugins
===============
Expand Down
9 changes: 4 additions & 5 deletions akka-docs/rst/scala/code/docs/event/LoggingDocSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,22 +60,21 @@ object LoggingDocSpec {
reqId += 1
val always = Map("requestId" -> reqId)
val perMessage = currentMessage match {
case r: Req => Map("visitorId" -> r.visitorId)
case _ => Map()
case r: Req Map("visitorId" -> r.visitorId)
case _ Map()
}
always ++ perMessage
}

def receive: Receive = {
case r: Req => {
case r: Req {
log.info(s"Starting new request: ${r.work}")
}
}
}

//#mdc-actor


//#my-event-listener
import akka.event.Logging.InitializeLogger
import akka.event.Logging.LoggerInitialized
Expand Down Expand Up @@ -117,7 +116,7 @@ object LoggingDocSpec {

class LoggingDocSpec extends AkkaSpec {

import LoggingDocSpec.{MyActor, MdcActor, MdcActorMixin, Req}
import LoggingDocSpec.{ MyActor, MdcActor, MdcActorMixin, Req }

"use a logging actor" in {
val myActor = system.actorOf(Props[MyActor])
Expand Down
34 changes: 13 additions & 21 deletions akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,12 @@ trait PersistenceDocSpec {

class MyProcessor extends Processor {
def receive = {
case Persistent(payload, sequenceNr) {
// message successfully written to journal
}
case PersistenceFailure(payload, sequenceNr, cause) {
// message failed to be written to journal
}
case other {
// message not written to journal
}
case Persistent(payload, sequenceNr)
// message successfully written to journal
case PersistenceFailure(payload, sequenceNr, cause)
// message failed to be written to journal
case other
// message not written to journal
}
}
//#definition
Expand Down Expand Up @@ -109,18 +106,16 @@ trait PersistenceDocSpec {
val channel = context.actorOf(Channel.props(), name = "myChannel")

def receive = {
case p @ Persistent(payload, _) {
case p @ Persistent(payload, _)
channel ! Deliver(p.withPayload(s"processed ${payload}"), destination)
}
}
}

class MyDestination extends Actor {
def receive = {
case p @ ConfirmablePersistent(payload, _) {
case p @ ConfirmablePersistent(payload, _)
println(s"received ${payload}")
p.confirm()
}
}
}
//#channel-example
Expand All @@ -135,7 +130,7 @@ trait PersistenceDocSpec {
//#channel-id-override

def receive = {
case p @ Persistent(payload, _) {
case p @ Persistent(payload, _)
//#channel-example-reply
channel ! Deliver(p.withPayload(s"processed ${payload}"), sender)
//#channel-example-reply
Expand All @@ -144,8 +139,7 @@ trait PersistenceDocSpec {
//#resolve-destination
//#resolve-sender
channel forward Deliver(p, destination, Resolve.Sender)
//#resolve-sender
}
//#resolve-sender
}
}

Expand Down Expand Up @@ -175,15 +169,13 @@ trait PersistenceDocSpec {
startWith("closed", 0)

when("closed") {
case Event(Persistent("open", _), counter) {
case Event(Persistent("open", _), counter)
goto("open") using (counter + 1) replying (counter)
}
}

when("open") {
case Event(Persistent("close", _), counter) {
case Event(Persistent("close", _), counter)
goto("closed") using (counter + 1) replying (counter)
}
}
}
//#fsm-example
Expand Down Expand Up @@ -239,7 +231,7 @@ trait PersistenceDocSpec {
val system = ActorSystem("example")
val processor = system.actorOf(Props[MyProcessor])

processor ! PersistentBatch(Vector(Persistent("a"), Persistent("b")))
processor ! PersistentBatch(List(Persistent("a"), Persistent("b")))
//#batch-write
system.shutdown()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import akka.persistence.snapshot._
object PersistencePluginDocSpec {
val config =
"""
//#max-batch-size
akka.persistence.journal.max-batch-size = 200
//#max-batch-size
//#journal-config
akka.persistence.journal.leveldb.dir = "target/journal"
//#journal-config
Expand Down Expand Up @@ -69,8 +72,7 @@ class PersistencePluginDocSpec extends WordSpec {
}

class MyJournal extends AsyncWriteJournal {
def writeAsync(persistent: PersistentRepr): Future[Unit] = ???
def writeBatchAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ???
def writeAsync(persistentBatch: Seq[PersistentRepr]): Future[Unit] = ???
def deleteAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long, permanent: Boolean): Future[Unit] = ???
def confirmAsync(processorId: String, sequenceNr: Long, channelId: String): Future[Unit] = ???
def replayAsync(processorId: String, fromSequenceNr: Long, toSequenceNr: Long)(replayCallback: (PersistentRepr) Unit): Future[Long] = ???
Expand Down
22 changes: 17 additions & 5 deletions akka-docs/rst/scala/persistence.rst
Original file line number Diff line number Diff line change
Expand Up @@ -392,14 +392,26 @@ See also the API docs of ``persist`` for further details.
Batch writes
============

Applications may also send a batch of ``Persistent`` messages to a processor via a ``PersistentBatch`` message.
To optimize throughput, a ``Processor`` internally batches received ``Persistent`` messages under high load before
writing them to the journal (as a single batch). The batch size dynamically grows from 1 under low and moderate loads
to a configurable maximum size (default is ``200``) under high load.

.. includecode:: code/docs/persistence/PersistencePluginDocSpec.scala#max-batch-size

A new batch write is triggered by a processor as soon as a batch reaches the maximum size or if the journal completed
writing the previous batch. Batch writes are never timer-based which keeps latencies as low as possible.

Applications that want to have more explicit control over batch writes and batch sizes can send processors
``PersistentBatch`` messages.

.. includecode:: code/docs/persistence/PersistenceDocSpec.scala#batch-write

``Persistent`` messages contained in a ``PersistentBatch`` message are written to the journal atomically but are
received by the processor separately (as ``Persistent`` messages). They are also replayed separately. Batch writes
can not only increase the throughput of a processor but may also be necessary for consistency reasons. For example,
in :ref:`event-sourcing`, all events that are generated and persisted by a single command are batch-written to the
``Persistent`` messages contained in a ``PersistentBatch`` message are always written atomically, even if the batch
size is greater than ``max-batch-size``. Also, a ``PersistentBatch`` is written isolated from other batches.
``Persistent`` messages contained in a ``PersistentBatch`` are received individually by a processor.

``PersistentBatch`` messages, for example, are used internally by an ``EventsourcedProcessor`` to ensure atomic
writes of events. All events that are persisted in context of a single command are written as single batch to the
journal (even if ``persist`` is called multiple times per command). The recovery of an ``EventsourcedProcessor``
will therefore never be done partially i.e. with only a subset of events persisted by a single command.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,12 @@

interface AsyncWritePlugin {
//#async-write-plugin-api
/**
* Java API, Plugin API: asynchronously writes a `persistent` message to the journal.
*/
Future<Void> doWriteAsync(PersistentRepr persistent);

/**
* Java API, Plugin API: asynchronously writes a batch of persistent messages to the
* journal. The batch write must be atomic i.e. either all persistent messages in the
* batch are written or none.
*/
Future<Void> doWriteBatchAsync(Iterable<PersistentRepr> persistentBatch);
Future<Void> doWriteAsync(Iterable<PersistentRepr> persistentBatch);

/**
* Java API, Plugin API: asynchronously deletes all persistent messages within the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,12 @@

interface SyncWritePlugin {
//#sync-write-plugin-api
/**
* Java API, Plugin API: synchronously writes a `persistent` message to the journal.
*/
void doWrite(PersistentRepr persistent) throws Exception;

/**
* Java API, Plugin API: synchronously writes a batch of persistent messages to the
* journal. The batch write must be atomic i.e. either all persistent messages in the
* batch are written or none.
*/
void doWriteBatch(Iterable<PersistentRepr> persistentBatch);
void doWrite(Iterable<PersistentRepr> persistentBatch);

/**
* Java API, Plugin API: synchronously deletes all persistent messages within the
Expand Down
6 changes: 6 additions & 0 deletions akka-persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ akka {

journal {

# Maximum size of a persistent message batch written to the journal. Only applies to
# internally created batches by processors that receive persistent messages individually.
# Application-defined batches, even if larger than this setting, are always written as
# a single isolated batch.
max-batch-size = 200

# Path to the journal plugin to be used
plugin = "akka.persistence.journal.leveldb"

Expand Down
39 changes: 21 additions & 18 deletions akka-persistence/src/main/scala/akka/persistence/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import akka.persistence.serialization.Message
* val channel = context.actorOf(Channel.props(), "myChannel")
*
* def receive = {
* case m @ Persistent(payload, _) => {
* case m @ Persistent(payload, _) =>
* // forward modified message to destination
* channel forward Deliver(m.withPayload(s"fw: ${payload}"), destination)
* }
* }
* }
* }}}
Expand All @@ -39,10 +38,9 @@ import akka.persistence.serialization.Message
* val channel = context.actorOf(Channel.props(), "myChannel")
*
* def receive = {
* case m @ Persistent(payload, _) => {
* case m @ Persistent(payload, _) =>
* // reply modified message to sender
* channel ! Deliver(m.withPayload(s"re: ${payload}"), sender)
* }
* }
* }
* }}}
Expand Down Expand Up @@ -78,28 +76,27 @@ sealed class Channel private[akka] (_channelId: Option[String]) extends Actor wi
import ResolvedDelivery._

private val delivering: Actor.Receive = {
case Deliver(persistent: PersistentRepr, destination, resolve) {
case Deliver(persistent: PersistentRepr, destination, resolve)
if (!persistent.confirms.contains(id)) {
val prepared = prepareDelivery(persistent)
resolve match {
case Resolve.Sender if !prepared.resolved {
case Resolve.Sender if !prepared.resolved
context.actorOf(Props(classOf[ResolvedSenderDelivery], prepared, destination, sender)) ! DeliverResolved
context.become(buffering, false)
}
case Resolve.Destination if !prepared.resolved {
case Resolve.Destination if !prepared.resolved
context.actorOf(Props(classOf[ResolvedDestinationDelivery], prepared, destination, sender)) ! DeliverResolved
context.become(buffering, false)
}
case _ destination tell (prepared, sender)
}
}
unstash()
}
}

private val buffering: Actor.Receive = {
case DeliveredResolved | DeliveredUnresolved { context.unbecome(); unstash() }
case _: Deliver stash()
case DeliveredResolved | DeliveredUnresolved
context.unbecome()
unstash()
case _: Deliver stash()
}

def receive = delivering
Expand Down Expand Up @@ -164,7 +161,7 @@ final class PersistentChannel private[akka] (_channelId: Option[String], persist
}

def receiveCommand: Receive = {
case d @ Deliver(persistent: PersistentRepr, destination, resolve) {
case d @ Deliver(persistent: PersistentRepr, destination, resolve)
if (!persistent.confirms.contains(processorId)) {
persist(d) { _
val prepared = prepareDelivery(persistent)
Expand All @@ -179,7 +176,6 @@ final class PersistentChannel private[akka] (_channelId: Option[String], persist
deliver(prepared, destination, resolve)
}
}
}
case c: Confirm deleteMessage(c.sequenceNr, true)
case DisableDelivery deliveryEnabled = false
case EnableDelivery if (!deliveryEnabled) throw new ChannelRestartRequiredException
Expand Down Expand Up @@ -393,10 +389,17 @@ private trait ResolvedDelivery extends Actor {
def onResolveFailure(): Unit

def receive = {
case DeliverResolved context.actorSelection(path) ! Identify(1)
case ActorIdentity(1, Some(ref)) { onResolveSuccess(ref); shutdown(DeliveredResolved) }
case ActorIdentity(1, None) { onResolveFailure(); shutdown(DeliveredUnresolved) }
case ReceiveTimeout { onResolveFailure(); shutdown(DeliveredUnresolved) }
case DeliverResolved
context.actorSelection(path) ! Identify(1)
case ActorIdentity(1, Some(ref))
onResolveSuccess(ref)
shutdown(DeliveredResolved)
case ActorIdentity(1, None)
onResolveFailure()
shutdown(DeliveredUnresolved)
case ReceiveTimeout
onResolveFailure()
shutdown(DeliveredUnresolved)
}

def shutdown(message: Any) {
Expand Down
Loading

0 comments on commit 6e2f80b

Please sign in to comment.