Skip to content

Commit

Permalink
=str akka#16959: MapAsync and MapAsyncUnordered should not ignore cancel
Browse files Browse the repository at this point in the history
added TCK verification as well
  • Loading branch information
Endre Sándor Varga committed Feb 27, 2015
1 parent fbe80f2 commit 38df267
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 2 deletions.
34 changes: 34 additions & 0 deletions akka-stream-tck/src/test/scala/akka/stream/tck/MapAsyncTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck

import java.util.concurrent.atomic.AtomicInteger

import akka.stream.scaladsl.{ Flow, OperationAttributes }
import akka.stream.{ ActorFlowMaterializer, ActorFlowMaterializerSettings }
import org.reactivestreams.{ Processor, Publisher }

import scala.concurrent.Future

class MapAsyncTest extends AkkaIdentityProcessorVerification[Int] {

val processorCounter = new AtomicInteger

override def createIdentityProcessor(maxBufferSize: Int): Processor[Int, Int] = {
val settings = ActorFlowMaterializerSettings(system)
.withInputBuffer(initialSize = maxBufferSize / 2, maxSize = maxBufferSize)

implicit val materializer = ActorFlowMaterializer(settings)(system)

processorFromFlow(
Flow[Int].mapAsync(Future.successful).withAttributes(OperationAttributes.name("identity")))
}

override def createHelperPublisher(elements: Long): Publisher[Int] = {
implicit val mat = ActorFlowMaterializer()(system)

createSimpleIntPublisher(elements)(mat)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -137,5 +137,20 @@ class FlowMapAsyncSpec extends AkkaSpec {
c.expectComplete()
}

"should handle cancel properly" in {
val pub = StreamTestKit.PublisherProbe[Int]()
val sub = StreamTestKit.SubscriberProbe[Int]()

Source(pub).mapAsync(Future.successful).runWith(Sink(sub))

val upstream = pub.expectSubscription()
upstream.expectRequest()

sub.expectSubscription().cancel()

upstream.expectCancellation()

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,20 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec {
c.probe.receiveN(5).toSet should be(expected)
}

"should handle cancel properly" in {
val pub = StreamTestKit.PublisherProbe[Int]()
val sub = StreamTestKit.SubscriberProbe[Int]()

Source(pub).mapAsyncUnordered(Future.successful).runWith(Sink(sub))

val upstream = pub.expectSubscription()
upstream.expectRequest()

sub.expectSubscription().cancel()

upstream.expectCancellation()

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ private[akka] class MapAsyncProcessorImpl(_settings: ActorFlowMaterializerSettin
object RunningPhaseCondition extends TransferState {
def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - gap > 0) ||
(primaryInputs.inputsDepleted && gap == 0)
def isCompleted = false
def isCompleted = primaryOutputs.isClosed
}

val running: TransferPhase = TransferPhase(RunningPhaseCondition) { ()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[akka] class MapAsyncUnorderedProcessorImpl(_settings: ActorFlowMateriali
object RunningPhaseCondition extends TransferState {
def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandCount - inProgressCount > 0) ||
(primaryInputs.inputsDepleted && inProgressCount == 0)
def isCompleted = false
def isCompleted = primaryOutputs.isClosed
}

val running: TransferPhase = TransferPhase(RunningPhaseCondition) { ()
Expand Down

0 comments on commit 38df267

Please sign in to comment.