Skip to content

Commit

Permalink
=per akka#17598 Add writerUuid to PersistentRepr
Browse files Browse the repository at this point in the history
* to support detection of multiple writers, facility to do
  that automatically is not part of this commit
  • Loading branch information
patriknw committed Jun 25, 2015
1 parent abd430c commit 00449cd
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -1,40 +1,40 @@
package akka.persistence

import java.util.concurrent.atomic.AtomicInteger

import scala.reflect.ClassTag

import akka.actor._
import akka.testkit._

import com.typesafe.config._

import org.scalatest._
import java.util.UUID

abstract class PluginSpec(val config: Config) extends TestKitBase with WordSpecLike with Matchers with BeforeAndAfterAll with BeforeAndAfterEach {
private val counter = new AtomicInteger(0)

private var _extension: Persistence = _
private var _pid: String = _
private var _writerUuid: String = _

// used to avoid messages be delivered to a restarted actor,
// this is akka-persistence internals and journals themselves don't really care
protected val actorInstanceId = 1

override protected def beforeEach(): Unit =
override protected def beforeEach(): Unit = {
_pid = s"p-${counter.incrementAndGet()}"
_writerUuid = UUID.randomUUID.toString
}

override protected def beforeAll(): Unit =
_extension = Persistence(system)

override protected def afterAll(): Unit =
shutdown(system)

def extension: Persistence =
_extension
def extension: Persistence = _extension

def pid: String = _pid

def pid: String =
_pid
def writerUuid: String = _writerUuid

def subscribe[T: ClassTag](subscriber: ActorRef) =
system.eventStream.subscribe(subscriber, implicitly[ClassTag[T]].runtimeClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
super.beforeEach()
senderProbe = TestProbe()
receiverProbe = TestProbe()
writeMessages(1, 5, pid, senderProbe.ref)
writeMessages(1, 5, pid, senderProbe.ref, writerUuid)
}

/**
Expand All @@ -52,22 +52,26 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
extension.journalFor(null)

def replayedMessage(snr: Long, deleted: Boolean = false, confirms: Seq[String] = Nil): ReplayedMessage =
ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender))
ReplayedMessage(PersistentImpl(s"a-${snr}", snr, pid, "", deleted, Actor.noSender, writerUuid))

def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef): Unit = {
def writeMessages(fromSnr: Int, toSnr: Int, pid: String, sender: ActorRef, writerUuid: String): Unit = {
val msgs =
if (supportsAtomicPersistAllOfSeveralEvents)
(fromSnr to toSnr).map { i
AtomicWrite(PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender))
AtomicWrite(PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender,
writerUuid = writerUuid))
}
else
(fromSnr to toSnr - 1).map { i
if (i == toSnr - 1)
AtomicWrite(List(
PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender),
PersistentRepr(payload = s"a-${i + 1}", sequenceNr = i + 1, persistenceId = pid, sender = sender)))
PersistentRepr(payload = s"a-$i", sequenceNr = i, persistenceId = pid, sender = sender,
writerUuid = writerUuid),
PersistentRepr(payload = s"a-${i + 1}", sequenceNr = i + 1, persistenceId = pid, sender = sender,
writerUuid = writerUuid)))
else
AtomicWrite(PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender))
AtomicWrite(PersistentRepr(payload = s"a-${i}", sequenceNr = i, persistenceId = pid, sender = sender,
writerUuid = writerUuid))
}

val probe = TestProbe()
Expand All @@ -76,7 +80,10 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {

probe.expectMsg(WriteMessagesSuccessful)
fromSnr to toSnr foreach { i
probe.expectMsgPF() { case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`), _) payload should be(s"a-${i}") }
probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, `i`, `pid`, _, _, `sender`, `writerUuid`), _)
payload should be(s"a-${i}")
}
}
}

Expand Down Expand Up @@ -158,23 +165,25 @@ abstract class JournalSpec(config: Config) extends PluginSpec(config) {
val notSerializableEvent = new Object { override def toString = "not serializable" }
val msgs = (6 to 8).map { i
val event = if (i == 7) notSerializableEvent else s"b-$i"
AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender))
AtomicWrite(PersistentRepr(payload = event, sequenceNr = i, persistenceId = pid, sender = Actor.noSender,
writerUuid = writerUuid))
}

val probe = TestProbe()
journal ! WriteMessages(msgs, probe.ref, actorInstanceId)

probe.expectMsg(WriteMessagesSuccessful)
val Pid = pid
val WriterUuid = writerUuid
probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender), _) payload should be(s"b-6")
case WriteMessageSuccess(PersistentImpl(payload, 6L, Pid, _, _, Actor.noSender, WriterUuid), _) payload should be(s"b-6")
}
probe.expectMsgPF() {
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender), _, _)
case WriteMessageRejected(PersistentImpl(payload, 7L, Pid, _, _, Actor.noSender, WriterUuid), _, _)
payload should be(notSerializableEvent)
}
probe.expectMsgPF() {
case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender), _) payload should be(s"b-8")
case WriteMessageSuccess(PersistentImpl(payload, 8L, Pid, _, _, Actor.noSender, WriterUuid), _) payload should be(s"b-8")
}

}
Expand Down
Loading

0 comments on commit 00449cd

Please sign in to comment.