Skip to content

Commit

Permalink
Merge pull request akka#143 from akka/wip-snap-patriknw
Browse files Browse the repository at this point in the history
Fail snapshot loading after max attempts
  • Loading branch information
patriknw authored Nov 22, 2016
2 parents 20803c4 + 59b831f commit 50366a8
Show file tree
Hide file tree
Showing 21 changed files with 19 additions and 33 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
// disable parallel tests
parallelExecution in Test := false

val AkkaVersion = "2.4.12"
val AkkaVersion = "2.4.14"

libraryDependencies ++= Seq(
"com.datastax.cassandra" % "cassandra-driver-core" % "3.1.0",
Expand Down
7 changes: 7 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,13 @@ cassandra-snapshot-store {
# use the default from the underlying Netty transport (Java NIO or native epoll)
receive-buffer-size = 0
}

# Number load attempts when recovering from the latest snapshot fails
# yet older snapshot files are available. Each recovery attempt will try
# to recover using an older than previously failed-on snapshot file
# (if any are present). If all attempts fail the recovery will fail and
# the persistent actor will be stopped.
max-load-attempts = 3
}

# This configures the default settings for all CassandraReadJournal plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,21 @@ class CassandraSnapshotStore(cfg: Config) extends SnapshotStore with CassandraSt

override def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = for {
prepStmt <- preparedSelectSnapshotMetadataForLoad
mds <- metadata(prepStmt, persistenceId, criteria).map(_.take(3))
mds <- metadata(prepStmt, persistenceId, criteria).map(_.take(maxLoadAttempts))
res <- loadNAsync(mds)
} yield res

def loadNAsync(metadata: immutable.Seq[SnapshotMetadata]): Future[Option[SelectedSnapshot]] = metadata match {
case Seq() => Future.successful(None)
case Seq() => Future.successful(None) // no snapshots stored
case md +: mds => load1Async(md) map {
case Snapshot(s) => Some(SelectedSnapshot(md, s))
} recoverWith {
case e =>
log.warning("Failed to load snapshot, trying older one. Caused by: [{}: {}]", e.getClass.getName, e.getMessage)
loadNAsync(mds) // try older snapshot
if (mds.isEmpty)
Future.failed(e) // all attempts failed
else
loadNAsync(mds) // try older snapshot
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ import akka.actor.ActorSystem

class CassandraSnapshotStoreConfig(system: ActorSystem, config: Config) extends CassandraPluginConfig(system, config) {
val maxMetadataResultSize = config.getInt("max-metadata-result-size")
val maxLoadAttempts = config.getInt("max-load-attempts")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import java.util.concurrent.TimeUnit

object CassandraLifecycle {

val config = ConfigFactory.parseString("""
val config = ConfigFactory.parseString(s"""
akka.persistence.journal.plugin = "cassandra-journal"
akka.persistence.snapshot-store.plugin = "cassandra-snapshot-store"
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
cassandra-journal.circuit-breaker.call-timeout = 30s
akka.test.single-expect-default = 20s
akka.actor.serialize-messages=on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import scala.concurrent.Await
object CassandraCompactionStrategySpec {
lazy val config = ConfigFactory.parseString(
s"""
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=CassandraCompactionStrategySpec
|cassandra-snapshot-store.keyspace=CassandraCompactionStrategySpecSnapshot
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ object CassandraConfigCheckerSpec {
|akka.persistence.publish-plugin-commands = on
|cassandra-journal.target-partition-size = 5
|cassandra-journal.max-result-size = 3
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
""".stripMargin
).withFallback(CassandraLifecycle.config)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ object CassandraIntegrationSpec {
|akka.persistence.publish-plugin-commands = on
|cassandra-journal.target-partition-size = 5
|cassandra-journal.max-result-size = 3
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=CassandraIntegrationSpec
|cassandra-snapshot-store.keyspace=CassandraIntegrationSpecSnapshot
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import com.typesafe.config.ConfigFactory
object CassandraJournalConfiguration {
lazy val config = ConfigFactory.parseString(
s"""
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=CassandraJournalSpec
|cassandra-snapshot-store.keyspace=CassandraJournalSpecSnapshot
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import scala.language.postfixOps
object CassandraLoadSpec {
val config = ConfigFactory.parseString(
s"""
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.replication-strategy = NetworkTopologyStrategy
|cassandra-journal.data-center-replication-factors = ["dc1:1"]
|cassandra-journal.keyspace=CassandraLoadSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ object CassandraSerializationSpec {
|akka.persistence.publish-plugin-commands = on
|cassandra-journal.target-partition-size = 5
|cassandra-journal.max-result-size = 3
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=CassandraIntegrationSpec
|cassandra-snapshot-store.keyspace=CassandraIntegrationSpecSnapshot
|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ object CassandraSslSpec {
|akka.persistence.publish-plugin-commands = on
|cassandra-journal.target-partition-size = 5
|cassandra-journal.max-result-size = 3
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=CassandraSslSpec${if (keyStore) 1 else 2}
|cassandra-snapshot-store.keyspace=CassandraSslSpec${if (keyStore) 1 else 2}Snapshot
|cassandra-snapshot-store.ssl.truststore.path="src/test/resources/security/cts_truststore.jks"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import akka.persistence.cassandra.CassandraLifecycle.AwaitPersistenceInit
object ReconnectSpec {
val config = ConfigFactory.parseString(
s"""
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=ReconnectSpec
|cassandra-snapshot-store.keyspace=ReconnectSpecSnapshot
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import org.scalatest._
object StartupLoadSpec {
val config = ConfigFactory.parseString(
s"""
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=StartupLoadSpec
|cassandra-snapshot-store.keyspace=StartupLoadSpecSnapshot
""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import akka.NotUsed
object AllPersistenceIdsSpec {
val config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-journal.keyspace=AllPersistenceIdsSpec
cassandra-query-journal.max-buffer-size = 10
cassandra-query-journal.refresh-interval = 0.5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ object EventAdaptersReadSpec {

val config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-journal.keyspace=EventAdaptersReadSpec
cassandra-query-journal.max-buffer-size = 10
cassandra-query-journal.refresh-interval = 0.5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ object EventsByPersistenceIdSpec {
akka.loglevel = INFO
akka.actor.serialize-messages = on
akka.actor.serialize-creators = on
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-journal.keyspace=EventsByPersistenceIdSpec
cassandra-query-journal.max-buffer-size = 10
cassandra-query-journal.refresh-interval = 0.5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import akka.stream.testkit.scaladsl.TestSink
object CassandraReadJournalSpec {
val config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-journal.keyspace=JavadslCassandraReadJournalSpec
cassandra-query-journal.max-buffer-size = 10
cassandra-query-journal.refresh-interval = 0.5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import akka.stream.testkit.scaladsl.TestSink
object CassandraReadJournalSpec {
val config = ConfigFactory.parseString(s"""
akka.loglevel = INFO
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-journal.keyspace=ScaladslCassandraReadJournalSpec
cassandra-query-journal.max-buffer-size = 10
cassandra-query-journal.refresh-interval = 0.5s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ object CassandraSessionSpec {
lazy val config = ConfigFactory.parseString(
s"""
akka.loglevel = INFO
cassandra-journal.port = ${CassandraLauncher.randomPort}
cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
cassandra-journal.keyspace=CassandraSessionSpec

test-cassandra-session-config {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ import scala.concurrent.Await
object CassandraSnapshotStoreConfiguration {
lazy val config = ConfigFactory.parseString(
s"""
|cassandra-journal.port = ${CassandraLauncher.randomPort}
|cassandra-snapshot-store.port = ${CassandraLauncher.randomPort}
|cassandra-journal.keyspace=CassandraSnapshotStoreSpec
|cassandra-snapshot-store.keyspace=CassandraSnapshotStoreSpecSnapshot
|cassandra-snapshot-store.max-metadata-result-size = 2
Expand Down Expand Up @@ -107,7 +105,7 @@ class CassandraSnapshotStoreSpec extends SnapshotStoreSpec(CassandraSnapshotStor
snapshotStore.tell(LoadSnapshot(pid, SnapshotSelectionCriteria.Latest, Long.MaxValue), probe.ref)

// no 4th attempt has been made
probe.expectMsg(LoadSnapshotResult(None, Long.MaxValue))
probe.expectMsgType[LoadSnapshotFailed]
}
}
}
Expand Down

0 comments on commit 50366a8

Please sign in to comment.