Skip to content

Commit

Permalink
=per akka#15212 Avoid half written snapshots in LocalSnapshotStore
Browse files Browse the repository at this point in the history
Conflicts:
	project/AkkaBuild.scala
  • Loading branch information
bantonsson committed May 30, 2014
1 parent f0d18cf commit 2c3c13e
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,31 +73,36 @@ private[persistence] class LocalSnapshotStore extends SnapshotStore with ActorLo
Try(withInputStream(md)(deserialize)) match {
case Success(s) Some(SelectedSnapshot(md, s.data))
case Failure(e)
log.error(e, s"error loading snapshot ${md}")
log.error(e, s"Error loading snapshot [${md}]")
load(metadata.init) // try older snapshot
}
}

private def save(metadata: SnapshotMetadata, snapshot: Any): Unit =
withOutputStream(metadata)(serialize(_, Snapshot(snapshot)))
protected def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
val tmpFile = withOutputStream(metadata)(serialize(_, Snapshot(snapshot)))
tmpFile.renameTo(snapshotFile(metadata))
}

protected def deserialize(inputStream: InputStream): Snapshot =
serializationExtension.deserialize(streamToBytes(inputStream), classOf[Snapshot]).get

protected def serialize(outputStream: OutputStream, snapshot: Snapshot): Unit =
outputStream.write(serializationExtension.findSerializerFor(snapshot).toBinary(snapshot))

private def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit): Unit =
withStream(new BufferedOutputStream(new FileOutputStream(snapshotFile(metadata))), p)
protected def withOutputStream(metadata: SnapshotMetadata)(p: (OutputStream) Unit): File = {
val tmpFile = snapshotFile(metadata, extension = "tmp")
withStream(new BufferedOutputStream(new FileOutputStream(tmpFile)), p)
tmpFile
}

private def withInputStream[T](metadata: SnapshotMetadata)(p: (InputStream) T): T =
withStream(new BufferedInputStream(new FileInputStream(snapshotFile(metadata))), p)

private def withStream[A <: Closeable, B](stream: A, p: A B): B =
try { p(stream) } finally { stream.close() }

private def snapshotFile(metadata: SnapshotMetadata): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}")
private def snapshotFile(metadata: SnapshotMetadata, extension: String = ""): File =
new File(snapshotDir, s"snapshot-${URLEncoder.encode(metadata.processorId, "UTF-8")}-${metadata.sequenceNr}-${metadata.timestamp}${extension}")

private def snapshotMetadata(processorId: String, criteria: SnapshotSelectionCriteria): immutable.Seq[SnapshotMetadata] =
snapshotDir.listFiles(new SnapshotFilenameFilter(processorId)).map(_.getName).collect {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/

package akka.persistence

import akka.actor.{ Props, ActorRef }
import akka.testkit.{ TestEvent, EventFilter, ImplicitSender, AkkaSpec }
import scala.concurrent.duration._
import akka.persistence.snapshot.local.LocalSnapshotStore
import akka.persistence.serialization.Snapshot
import akka.event.Logging

import scala.language.postfixOps

object SnapshotFailureRobustnessSpec {

class SaveSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, snr) saveSnapshot(payload)
case SaveSnapshotSuccess(md) probe ! md.sequenceNr
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
}
}

class LoadSnapshotTestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) {
def receive = {
case Persistent(payload, snr) probe ! s"${payload}-${snr}"
case SnapshotOffer(md, s) probe ! ((md, s))
case other probe ! other
}
override def preStart() = ()
}

class FailingLocalSnapshotStore extends LocalSnapshotStore {
override def save(metadata: SnapshotMetadata, snapshot: Any): Unit = {
if (metadata.sequenceNr == 2) {
val bytes = "b0rk".getBytes("UTF-8")
withOutputStream(metadata)(_.write(bytes))
} else super.save(metadata, snapshot)
}
}
}

class SnapshotFailureRobustnessSpec extends AkkaSpec(PersistenceSpec.config("leveldb", "SnapshotFailureRobustnessSpec", serialization = "off", extraConfig = Some(
"""
|akka.persistence.snapshot-store.local.class = "akka.persistence.SnapshotFailureRobustnessSpec$FailingLocalSnapshotStore"
""".stripMargin))) with PersistenceSpec with ImplicitSender {

import SnapshotFailureRobustnessSpec._

"A processor with a failing snapshot" must {
"recover state starting from the most recent complete snapshot" in {
val sProcessor = system.actorOf(Props(classOf[SaveSnapshotTestProcessor], name, testActor))
val processorId = name

sProcessor ! Persistent("blahonga")
expectMsg(1)
sProcessor ! Persistent("kablama")
expectMsg(2)
system.eventStream.publish(TestEvent.Mute(
EventFilter.error(start = "Error loading snapshot [")))
system.eventStream.subscribe(testActor, classOf[Logging.Error])
try {
val lProcessor = system.actorOf(Props(classOf[LoadSnapshotTestProcessor], name, testActor))
lProcessor ! Recover()
expectMsgPF() {
case (SnapshotMetadata(`processorId`, 1, timestamp), state)
state should be("blahonga")
timestamp should be > (0L)
}
expectMsg("kablama-2")
expectNoMsg(1 second)
} finally {
system.eventStream.unsubscribe(testActor, classOf[Logging.Error])
system.eventStream.publish(TestEvent.UnMute(
EventFilter.error(start = "Error loading snapshot [")))
}
}
}
}

0 comments on commit 2c3c13e

Please sign in to comment.