Skip to content

Commit

Permalink
fix ActorPublisher state machine, fixes akka#20031
Browse files Browse the repository at this point in the history
  • Loading branch information
rkuhn committed Mar 15, 2016
1 parent c735403 commit 19d0bdb
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package akka.stream.io

import java.io.File
import java.io.FileWriter
import java.util.Random
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
Expand All @@ -24,6 +23,9 @@ import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.testkit.AkkaSpec
import java.io.OutputStreamWriter
import java.io.FileOutputStream
import java.nio.charset.StandardCharsets.UTF_8

object FileSourceSpec {
final case class Settings(chunkSize: Int, readAhead: Int)
Expand All @@ -45,7 +47,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {

val testFile = {
val f = File.createTempFile("file-source-spec", ".tmp")
new FileWriter(f).append(TestText).close()
new OutputStreamWriter(new FileOutputStream(f), UTF_8).append(TestText).close()
f
}

Expand All @@ -60,7 +62,7 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {

val manyLines = {
val f = File.createTempFile(s"file-source-spec-lines_$LinesCount", "tmp")
val w = new FileWriter(f)
val w = new OutputStreamWriter(new FileOutputStream(f), UTF_8)
(1 to LinesCount).foreach { l
w.append("a" * l).append("\n")
}
Expand Down Expand Up @@ -196,6 +198,14 @@ class FileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
try assertDispatcher(ref, "akka.actor.default-dispatcher") finally p.cancel()
} finally shutdown(sys)
}

"not signal onComplete more than once" in {
FileIO.fromFile(testFile, 2 * TestText.length)
.runWith(TestSink.probe)
.requestNext(ByteString(TestText, UTF_8.name))
.expectComplete()
.expectNoMsg(1.second)
}
}

override def afterTermination(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ trait ActorPublisher[T] extends Actor {
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*/
def onComplete(): Unit = lifecycleState match {
case Active | PreSubscriber | CompleteThenStop
case Active | PreSubscriber
lifecycleState = Completed
if (subscriber ne null) // otherwise onComplete will be called when the subscription arrives
try tryOnComplete(subscriber) finally subscriber = null
case Completed
case Completed | CompleteThenStop
throw new IllegalStateException("onComplete must only be called once")
case _: ErrorEmitted
throw new IllegalStateException("onComplete must not be called after onError")
Expand Down Expand Up @@ -225,13 +225,13 @@ trait ActorPublisher[T] extends Actor {
* call [[#onNext]], [[#onError]] and [[#onComplete]].
*/
def onError(cause: Throwable): Unit = lifecycleState match {
case Active | PreSubscriber | CompleteThenStop
case Active | PreSubscriber
lifecycleState = ErrorEmitted(cause, stop = false)
if (subscriber ne null) // otherwise onError will be called when the subscription arrives
try tryOnError(subscriber, cause) finally subscriber = null
case _: ErrorEmitted
throw new IllegalStateException("onError must only be called once")
case Completed
case Completed | CompleteThenStop
throw new IllegalStateException("onError must not be called after onComplete")
case Canceled // drop
}
Expand Down Expand Up @@ -260,8 +260,6 @@ trait ActorPublisher[T] extends Actor {
if (n < 1) {
if (lifecycleState == Active)
onError(numberOfElementsInRequestMustBePositiveException)
else
super.aroundReceive(receive, msg)
} else {
demand += n
if (demand < 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ private[akka] final class FilePublisher(f: File, completionPromise: Promise[IORe

def readAndSignal(maxReadAhead: Int): Unit =
if (isActive) {
// Write previously buffered, read into buffer, write newly buffered
availableChunks = signalOnNexts(readAhead(maxReadAhead, signalOnNexts(availableChunks)))
// Write previously buffered, then refill buffer
availableChunks = readAhead(maxReadAhead, signalOnNexts(availableChunks))
if (totalDemand > 0 && isActive) self ! Continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ private[akka] object InputStreamPublisher {

/** INTERNAL API */
private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Promise[IOResult], chunkSize: Int)
extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging {
extends akka.stream.actor.ActorPublisher[ByteString]
with ActorLogging {

// TODO possibly de-duplicate with FilePublisher?

Expand All @@ -47,7 +47,7 @@ private[akka] class InputStreamPublisher(is: InputStream, completionPromise: Pro
def readAndSignal(): Unit =
if (isActive) {
readAndEmit()
if (totalDemand > 0) self ! Continue
if (totalDemand > 0 && isActive) self ! Continue
}

def readAndEmit(): Unit = if (totalDemand > 0) try {
Expand Down

0 comments on commit 19d0bdb

Please sign in to comment.