Skip to content

Commit

Permalink
Stream Testkit: new-API-friendly (akka#29815)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Nov 16, 2020
1 parent d253c64 commit d64bb0a
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,20 @@ import scala.collection.immutable
import scala.concurrent.duration._
import scala.reflect.ClassTag

import org.reactivestreams.{ Publisher, Subscriber, Subscription }

import akka.actor.{ ActorRef, ActorSystem, DeadLetterSuppression, NoSerializationVerificationNeeded }
import akka.actor.ActorRef
import akka.actor.ClassicActorSystemProvider
import akka.actor.DeadLetterSuppression
import akka.actor.NoSerializationVerificationNeeded
import akka.stream._
import akka.stream.impl._
import akka.testkit.{ TestActor, TestProbe }
import akka.testkit.TestActor.AutoPilot
import akka.testkit.TestActor
import akka.testkit.TestProbe
import akka.util.JavaDurationConverters
import akka.util.ccompat._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription

/**
* Provides factory methods for various Publishers.
Expand Down Expand Up @@ -66,27 +71,28 @@ object TestPublisher {
/**
* Probe that implements [[org.reactivestreams.Publisher]] interface.
*/
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ActorSystem): ManualProbe[T] =
def manualProbe[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T] =
new ManualProbe(autoOnSubscribe)

/**
* Probe that implements [[org.reactivestreams.Publisher]] interface and tracks demand.
*/
def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T] =
def probe[T](initialPendingRequests: Long = 0)(implicit system: ClassicActorSystemProvider): Probe[T] =
new Probe(initialPendingRequests)

/**
* Implementation of [[org.reactivestreams.Publisher]] that allows various assertions.
* This probe does not track demand. Therefore you need to expect demand before sending
* elements downstream.
*/
class ManualProbe[I] private[TestPublisher] (autoOnSubscribe: Boolean = true)(implicit system: ActorSystem)
class ManualProbe[I] private[TestPublisher] (autoOnSubscribe: Boolean = true)(
implicit system: ClassicActorSystemProvider)
extends Publisher[I] {

type Self <: ManualProbe[I]

@ccompatUsedUntil213
private val probe: TestProbe = TestProbe()
private val probe: TestProbe = TestProbe()(system.classicSystem)

//this is a way to pause receiving message from probe until subscription is done
private val subscribed = new CountDownLatch(1)
Expand Down Expand Up @@ -212,7 +218,7 @@ object TestPublisher {
/**
* Single subscription and demand tracking for [[TestPublisher.ManualProbe]].
*/
class Probe[T] private[TestPublisher] (initialPendingRequests: Long)(implicit system: ActorSystem)
class Probe[T] private[TestPublisher] (initialPendingRequests: Long)(implicit system: ClassicActorSystemProvider)
extends ManualProbe[T] {

type Self = Probe[T]
Expand Down Expand Up @@ -302,21 +308,21 @@ object TestSubscriber {
/**
* Probe that implements [[org.reactivestreams.Subscriber]] interface.
*/
def manualProbe[T]()(implicit system: ActorSystem): ManualProbe[T] = new ManualProbe()
def manualProbe[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T] = new ManualProbe()

def probe[T]()(implicit system: ActorSystem): Probe[T] = new Probe()
def probe[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = new Probe()

/**
* Implementation of [[org.reactivestreams.Subscriber]] that allows various assertions.
*
* All timeouts are dilated automatically, for more details about time dilation refer to [[akka.testkit.TestKit]].
*/
class ManualProbe[I] private[TestSubscriber] ()(implicit system: ActorSystem) extends Subscriber[I] {
class ManualProbe[I] private[TestSubscriber] ()(implicit system: ClassicActorSystemProvider) extends Subscriber[I] {
import akka.testkit._

type Self <: ManualProbe[I]

private val probe = TestProbe()
private val probe = TestProbe()(system.classicSystem)

@volatile private var _subscription: Subscription = _

Expand Down Expand Up @@ -356,7 +362,7 @@ object TestSubscriber {
* Expect and return a stream element.
*/
def expectNext(): I = {
expectNext(probe.testKitSettings.SingleExpectDefaultTimeout.dilated)
expectNext(probe.testKitSettings.SingleExpectDefaultTimeout.dilated(system.classicSystem))
}

/**
Expand Down Expand Up @@ -788,7 +794,7 @@ object TestSubscriber {
/**
* Single subscription tracking for [[ManualProbe]].
*/
class Probe[T] private[TestSubscriber] ()(implicit system: ActorSystem) extends ManualProbe[T] {
class Probe[T] private[TestSubscriber] ()(implicit system: ClassicActorSystemProvider) extends ManualProbe[T] {

override type Self = Probe[T]

Expand Down Expand Up @@ -885,7 +891,8 @@ private[testkit] object StreamTestKit {
def sendOnSubscribe(): Unit = subscriber.onSubscribe(this)
}

final class ProbeSource[T](val attributes: Attributes, shape: SourceShape[T])(implicit system: ActorSystem)
final class ProbeSource[T](val attributes: Attributes, shape: SourceShape[T])(
implicit system: ClassicActorSystemProvider)
extends SourceModule[T, TestPublisher.Probe[T]](shape) {
override def create(context: MaterializationContext) = {
val probe = TestPublisher.probe[T]()
Expand All @@ -897,7 +904,7 @@ private[testkit] object StreamTestKit {
new ProbeSource[T](attr, amendShape(attr))
}

final class ProbeSink[T](val attributes: Attributes, shape: SinkShape[T])(implicit system: ActorSystem)
final class ProbeSink[T](val attributes: Attributes, shape: SinkShape[T])(implicit system: ClassicActorSystemProvider)
extends SinkModule[T, TestSubscriber.Probe[T]](shape) {
override def create(context: MaterializationContext) = {
val probe = TestSubscriber.probe[T]()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.testkit.javadsl

import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.stream.javadsl.Sink
import akka.stream.testkit._

Expand All @@ -14,7 +14,7 @@ object TestSink {
/**
* A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]].
*/
def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]] =
def probe[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]] =
new Sink(scaladsl.TestSink.probe[T](system))

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.testkit.javadsl

import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.stream.javadsl.Source
import akka.stream.testkit._

Expand All @@ -14,7 +14,7 @@ object TestSource {
/**
* A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]].
*/
def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]] =
def probe[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]] =
new Source(scaladsl.TestSource.probe[T](system))

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.testkit.scaladsl

import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.stream._
import akka.stream.Attributes.none
import akka.stream.scaladsl._
Expand All @@ -20,7 +20,7 @@ object TestSink {
/**
* A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]].
*/
def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] =
def probe[T](implicit system: ClassicActorSystemProvider): Sink[T, Probe[T]] =
Sink.fromGraph[T, TestSubscriber.Probe[T]](new ProbeSink(none, SinkShape(Inlet("ProbeSink.in"))))

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package akka.stream.testkit.scaladsl

import akka.actor.ActorSystem
import akka.actor.ClassicActorSystemProvider
import akka.stream._
import akka.stream.Attributes.none
import akka.stream.scaladsl._
Expand All @@ -19,7 +19,7 @@ object TestSource {
/**
* A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]].
*/
def probe[T](implicit system: ActorSystem) =
def probe[T](implicit system: ClassicActorSystemProvider) =
Source.fromGraph[T, TestPublisher.Probe[T]](new ProbeSource(none, SourceShape(Outlet("ProbeSource.out"))))

}

0 comments on commit d64bb0a

Please sign in to comment.