diff --git a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala b/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala deleted file mode 100644 index adbf3728aeb..00000000000 --- a/akka-stream/src/main/scala/akka/persistence/stream/PersistentPublisher.scala +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package akka.persistence.stream - -import scala.util.control.NonFatal -import scala.concurrent.duration._ - -import org.reactivestreams.api.Producer -import org.reactivestreams.spi.Subscriber - -import akka.actor._ -import akka.persistence._ -import akka.stream._ -import akka.stream.impl._ -import akka.stream.impl.Ast.ProducerNode -import akka.stream.scaladsl.Flow - -// ------------------------------------------------------------------------------------------------ -// FIXME: move this file to akka-persistence-experimental once going back to project dependencies -// NOTE: "producer" has been changed to "publisher" wherever possible, covering the upcoming -// changes in reactive-streams. -// ------------------------------------------------------------------------------------------------ - -object PersistentFlow { - /** - * Starts a new [[Persistent]] message flow from the given processor, - * identified by `processorId`. Elements are pulled from the processor's - * journal (using a [[View]]) in accordance with the demand coming from - * the downstream transformation steps. - * - * Elements pulled from the processor's journal are buffered in memory so that - * fine-grained demands (requests) from downstream can be served efficiently. - */ - def fromProcessor(processorId: String): Flow[Persistent] = - fromProcessor(processorId, PersistentPublisherSettings()) - - /** - * Starts a new [[Persistent]] message flow from the given processor, - * identified by `processorId`. Elements are pulled from the processor's - * journal (using a [[View]]) in accordance with the demand coming from - * the downstream transformation steps. - * - * Elements pulled from the processor's journal are buffered in memory so that - * fine-grained demands (requests) from downstream can be served efficiently. - * Reads from the journal are done in (coarse-grained) batches of configurable - * size (which correspond to the configurable maximum buffer size). - * - * @see [[PersistentPublisherSettings]] - */ - def fromProcessor(processorId: String, publisherSettings: PersistentPublisherSettings): Flow[Persistent] = - FlowImpl(PersistentPublisherNode(processorId, publisherSettings), Nil) -} - -/** - * Configuration object for a [[Persistent]] stream publisher. - * - * @param fromSequenceNr Sequence number where the published stream shall start (inclusive). - * Default is `1L`. - * @param maxBufferSize Maximum number of persistent messages to be buffered in memory (per publisher). - * Default is `100`. - * @param idle Optional duration to wait if no more persistent messages can be pulled from the journal - * before attempting the next pull. Default is `None` which causes the publisher to take - * the value defined by the `akka.persistence.view.auto-update-interval` configuration - * key. If defined, the `idle` value is taken directly. - */ -case class PersistentPublisherSettings(fromSequenceNr: Long = 1L, maxBufferSize: Int = 100, idle: Option[FiniteDuration] = None) { - require(fromSequenceNr > 0L, "fromSequenceNr must be > 0") -} - -private object PersistentPublisher { - def props(processorId: String, publisherSettings: PersistentPublisherSettings, settings: MaterializerSettings): Props = - Props(classOf[PersistentPublisherImpl], processorId, publisherSettings, settings) -} - -private case class PersistentPublisherNode(processorId: String, publisherSettings: PersistentPublisherSettings) extends ProducerNode[Persistent] { - def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[Persistent] = - new ActorProducer(context.actorOf(PersistentPublisher.props(processorId, publisherSettings, settings))) -} - -private class PersistentPublisherImpl(processorId: String, publisherSettings: PersistentPublisherSettings, materializerSettings: MaterializerSettings) - extends Actor - with ActorLogging - with SubscriberManagement[Persistent] - with SoftShutdown { - - import ActorBasedFlowMaterializer._ - import PersistentPublisherBuffer._ - - type S = ActorSubscription[Persistent] - - private val buffer = context.actorOf(Props(classOf[PersistentPublisherBuffer], processorId, publisherSettings, self), "publisherBuffer") - - private var pub: ActorPublisher[Persistent] = _ - private var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason - - final def receive = { - case ExposedPublisher(pub) ⇒ - this.pub = pub.asInstanceOf[ActorPublisher[Persistent]] - context.become(waitingForSubscribers) - } - - final def waitingForSubscribers: Receive = { - case SubscribePending ⇒ - pub.takePendingSubscribers() foreach registerSubscriber - context.become(active) - } - - final def active: Receive = { - case SubscribePending ⇒ - pub.takePendingSubscribers() foreach registerSubscriber - case RequestMore(sub, elements) ⇒ - moreRequested(sub.asInstanceOf[S], elements) - case Cancel(sub) ⇒ - unregisterSubscription(sub.asInstanceOf[S]) - case Response(ps) ⇒ - try { - ps.foreach(pushToDownstream) - } catch { - case Stop ⇒ { completeDownstream(); shutdownReason = None } - case NonFatal(e) ⇒ { abortDownstream(e); shutdownReason = Some(e) } - } - } - - override def requestFromUpstream(elements: Int): Unit = - buffer ! Request(elements) - - override def initialBufferSize = - materializerSettings.initialFanOutBufferSize - - override def maxBufferSize = - materializerSettings.maxFanOutBufferSize - - override def createSubscription(subscriber: Subscriber[Persistent]): ActorSubscription[Persistent] = - new ActorSubscription(self, subscriber) - - override def cancelUpstream(): Unit = { - pub.shutdown(shutdownReason) - context.stop(buffer) - softShutdown() - } - override def shutdown(completed: Boolean): Unit = { - pub.shutdown(shutdownReason) - context.stop(buffer) - softShutdown() - } - - override def postStop(): Unit = { - pub.shutdown(shutdownReason) - } -} - -private object PersistentPublisherBuffer { - case class Request(num: Int) - case class Response(messages: Vector[Persistent]) - - case object Fill - case object Filled -} - -/** - * A view that buffers up to `publisherSettings.maxBufferSize` persistent messages in memory. - * Downstream demands (requests) are served if the buffer is non-empty either while filling - * the buffer or after having filled the buffer. When the buffer becomes empty new persistent - * messages are loaded from the journal (in batches up to `publisherSettings.maxBufferSize`). - */ -private class PersistentPublisherBuffer(override val processorId: String, publisherSettings: PersistentPublisherSettings, publisher: ActorRef) extends View { - import PersistentPublisherBuffer._ - import context.dispatcher - - private var replayed = 0 - private var requested = 0 - private var buffer: Vector[Persistent] = Vector.empty - - private val filling: Receive = { - case p: Persistent ⇒ - buffer :+= p - replayed += 1 - if (requested > 0) respond(requested) - case Filled ⇒ - if (buffer.nonEmpty && requested > 0) respond(requested) - if (buffer.nonEmpty) pause() - else if (replayed > 0) fill() - else schedule() - case Request(num) ⇒ - requested += num - if (buffer.nonEmpty) respond(requested) - } - - private val pausing: Receive = { - case Request(num) ⇒ - requested += num - respond(requested) - if (buffer.isEmpty) fill() - } - - private val scheduled: Receive = { - case Fill ⇒ - fill() - case Request(num) ⇒ - requested += num - } - - def receive = filling - - override def onReplaySuccess(receive: Receive, await: Boolean): Unit = { - super.onReplaySuccess(receive, await) - self ! Filled - } - - override def onRecoveryFailure(receive: Receive, await: Boolean, cause: Throwable): Unit = { - super.onRecoveryFailure(receive, await, cause) - self ! Filled - } - - override def lastSequenceNr: Long = - math.max(publisherSettings.fromSequenceNr - 1L, super.lastSequenceNr) - - override def autoUpdateInterval: FiniteDuration = - publisherSettings.idle.getOrElse(super.autoUpdateInterval) - - override def autoUpdateReplayMax: Long = - publisherSettings.maxBufferSize - - override def autoUpdate: Boolean = - false - - private def fill(): Unit = { - replayed = 0 - context.become(filling) - self ! Update(await = false, autoUpdateReplayMax) - } - - private def pause(): Unit = { - context.become(pausing) - } - - private def schedule(): Unit = { - context.become(scheduled) - context.system.scheduler.scheduleOnce(autoUpdateInterval, self, Fill) - } - - private def respond(num: Int): Unit = { - val (res, buf) = buffer.splitAt(num) - publisher ! Response(res) - buffer = buf - requested -= res.size - } -} diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala deleted file mode 100644 index bb3f5981144..00000000000 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistenceSpec.scala +++ /dev/null @@ -1,83 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ - -package akka.persistence.stream - -import java.io.File -import java.util.concurrent.atomic.AtomicInteger - -import scala.reflect.ClassTag - -import com.typesafe.config.ConfigFactory - -import org.apache.commons.io.FileUtils -import org.scalatest.BeforeAndAfterEach - -import akka.actor.Props -import akka.persistence._ -import akka.stream.testkit.AkkaSpec - -// --------------------------------------------------------------------------- -// FIXME: remove this file once going back to project dependencies -// --------------------------------------------------------------------------- - -trait PersistenceSpec extends BeforeAndAfterEach with Cleanup { this: AkkaSpec ⇒ - private var _name: String = _ - - lazy val extension = Persistence(system) - val counter = new AtomicInteger(0) - - /** - * Unique name per test. - */ - def name = _name - - /** - * Prefix for generating a unique name per test. - */ - def namePrefix: String = system.name - - /** - * Creates a processor with current name as constructor argument. - */ - def namedProcessor[T <: NamedProcessor: ClassTag] = - system.actorOf(Props(implicitly[ClassTag[T]].runtimeClass, name)) - - override protected def beforeEach() { - _name = s"${namePrefix}-${counter.incrementAndGet()}" - } -} - -object PersistenceSpec { - def config(plugin: String, test: String, serialization: String = "on") = ConfigFactory.parseString( - s""" - akka.actor.serialize-creators = ${serialization} - akka.actor.serialize-messages = ${serialization} - akka.persistence.publish-confirmations = on - akka.persistence.publish-plugin-commands = on - akka.persistence.journal.plugin = "akka.persistence.journal.${plugin}" - akka.persistence.journal.leveldb.dir = "target/journal-${test}" - akka.persistence.snapshot-store.local.dir = "target/snapshots-${test}/" - akka.test.single-expect-default = 10s - """) -} - -trait Cleanup { this: AkkaSpec ⇒ - val storageLocations = List( - "akka.persistence.journal.leveldb.dir", - "akka.persistence.journal.leveldb-shared.store.dir", - "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) - - override protected def atStartup() { - storageLocations.foreach(FileUtils.deleteDirectory) - } - - override protected def afterTermination() { - storageLocations.foreach(FileUtils.deleteDirectory) - } -} - -abstract class NamedProcessor(name: String) extends Processor { - override def processorId: String = name -} diff --git a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala b/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala deleted file mode 100644 index bc15b4b8322..00000000000 --- a/akka-stream/src/test/scala/akka/persistence/stream/PersistentPublisherSpec.scala +++ /dev/null @@ -1,163 +0,0 @@ -/** - * Copyright (C) 2009-2015 Typesafe Inc. - */ -package akka.persistence.stream - -import scala.concurrent.duration._ - -import akka.actor._ -import akka.persistence._ -import akka.stream._ -import akka.stream.scaladsl._ -import akka.stream.testkit._ -import akka.testkit.TestProbe - -// ------------------------------------------------------------------------------------------------ -// FIXME: move this file to akka-persistence-experimental once going back to project dependencies -// ------------------------------------------------------------------------------------------------ - -object PersistentPublisherSpec { - class TestProcessor(name: String, probe: ActorRef) extends NamedProcessor(name) { - def receive = { - case Persistent(payload, sequenceNr) ⇒ probe ! s"${payload}-${sequenceNr}" - } - } -} - -class PersistentPublisherSpec extends PersistenceSpec(PersistenceSpec.config("leveldb", "ViewProducerSpec", serialization = "off")) { - import PersistentPublisherSpec._ - - val numMessages = 10 - - val publisherSettings = PersistentPublisherSettings(idle = Some(100.millis)) - val materializer = FlowMaterializer(MaterializerSettings()) - - var processor1: ActorRef = _ - var processor2: ActorRef = _ - - var processor1Probe: TestProbe = _ - var processor2Probe: TestProbe = _ - - def processorId(num: Int): String = - name + num - - override protected def beforeEach(): Unit = { - super.beforeEach() - - processor1Probe = TestProbe() - processor2Probe = TestProbe() - - processor1 = system.actorOf(Props(classOf[TestProcessor], processorId(1), processor1Probe.ref)) - processor2 = system.actorOf(Props(classOf[TestProcessor], processorId(2), processor2Probe.ref)) - - 1 to numMessages foreach { i ⇒ - processor1 ! Persistent("a") - processor2 ! Persistent("b") - - processor1Probe.expectMsg(s"a-${i}") - processor2Probe.expectMsg(s"b-${i}") - } - } - - override protected def afterEach(): Unit = { - system.stop(processor1) - system.stop(processor1) - super.afterEach() - } - - "A view producer" must { - "pull existing messages from a processor's journal" in { - val streamProbe = TestProbe() - - PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) - - 1 to numMessages foreach { i ⇒ - streamProbe.expectMsg(s"a-${i}") - } - } - "pull existing messages and new from a processor's journal" in { - val streamProbe = TestProbe() - - PersistentFlow.fromProcessor(processorId(1), publisherSettings).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) - - 1 to numMessages foreach { i ⇒ - streamProbe.expectMsg(s"a-${i}") - } - - processor1 ! Persistent("a") - processor1 ! Persistent("a") - - streamProbe.expectMsg(s"a-${numMessages + 1}") - streamProbe.expectMsg(s"a-${numMessages + 2}") - } - "pull existing messages from a processor's journal starting form a specified sequence number" in { - val streamProbe = TestProbe() - val fromSequenceNr = 5L - - PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr)).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) - - fromSequenceNr to numMessages foreach { i ⇒ - streamProbe.expectMsg(s"a-${i}") - } - } - } - - "A view producer" can { - "have several consumers" in { - val streamProbe1 = TestProbe() - val streamProbe2 = TestProbe() - - val producer = PersistentFlow.fromProcessor(processorId(1), publisherSettings).toProducer(materializer) - - Flow(producer).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) - - // let consumer consume all existing messages - 1 to numMessages foreach { i ⇒ - streamProbe1.expectMsg(s"a-${i}") - } - - // subscribe another consumer - Flow(producer).foreach { - case Persistent(payload, sequenceNr) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) - - // produce new messages and let both consumers handle them - 1 to 2 foreach { i ⇒ - processor1 ! Persistent("a") - streamProbe1.expectMsg(s"a-${numMessages + i}") - streamProbe2.expectMsg(s"a-${numMessages + i}") - } - } - } - - "A consumer" can { - "consume from several view producers" in { - val streamProbe1 = TestProbe() - val streamProbe2 = TestProbe() - - val fromSequenceNr1 = 7L - val fromSequenceNr2 = 3L - - val producer1 = PersistentFlow.fromProcessor(processorId(1), publisherSettings.copy(fromSequenceNr = fromSequenceNr1)).toProducer(materializer) - val producer2 = PersistentFlow.fromProcessor(processorId(2), publisherSettings.copy(fromSequenceNr = fromSequenceNr2)).toProducer(materializer) - - Flow(producer1).merge(producer2).foreach { - case Persistent(payload: String, sequenceNr) if (payload.startsWith("a")) ⇒ streamProbe1.ref ! s"${payload}-${sequenceNr}" - case Persistent(payload: String, sequenceNr) if (payload.startsWith("b")) ⇒ streamProbe2.ref ! s"${payload}-${sequenceNr}" - }.consume(materializer) - - 1 to numMessages foreach { i ⇒ - if (i >= fromSequenceNr1) streamProbe1.expectMsg(s"a-${i}") - if (i >= fromSequenceNr2) streamProbe2.expectMsg(s"b-${i}") - } - } - } -}