diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index a4034d13f93..390d2fc05cb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -101,6 +101,46 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { // it should be terminated after none of it's substreams are used within the timeout expectTerminated(groupByActor, 1000.millis) } + + "not timeout and cancel substream publishers when they have been subscribed to" in { + val publisherProbe = StreamTestKit.PublisherProbe[Int]() + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + upstreamSubscription.sendNext(2) + + val (_, s1) = subscriber.expectNext() + // should not break normal usage + val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() + s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) + val s1Sub = s1SubscriberProbe.expectSubscription() + s1Sub.request(1) + s1SubscriberProbe.expectNext(1) + + val (_, s2) = subscriber.expectNext() + // should not break normal usage + val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() + s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) + val s2Sub = s2SubscriberProbe.expectSubscription() + + // sleep long enough for tiemout to trigger if not cancelled + Thread.sleep(1000) + + s2Sub.request(100) + s2SubscriberProbe.expectNext(2) + s1Sub.request(100) + upstreamSubscription.sendNext(3) + upstreamSubscription.sendNext(4) + s1SubscriberProbe.expectNext(3) + s2SubscriberProbe.expectNext(4) + } } private def watchGroupByActor(flowNr: Int): ActorRef = { diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 49c055c2889..9d6451b3804 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -25,11 +25,6 @@ akka { # Cleanup leaked publishers and subscribers when they are not used within a given deadline subscription-timeout { - # Fully qualified config path which holds the dispatcher configuration - # to be used for the scheduled stream cancellations. - # When this value is left empty, the containing actor context's dispatcher will be used. - dispatcher = "" - # when the subscription timeout is reached one of the following strategies on the "stale" publisher: # cancel - cancel it (via `onError` or subscribing to the publisher and `cancel()`ing the subscription right away # warn - log a warning statement about the stale element (then drop the reference to it) diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 7268e685766..2ec2704f78f 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -250,6 +250,7 @@ final case class MaterializerSettings( } object StreamSubscriptionTimeoutSettings { + import StreamSubscriptionTimeoutTerminationMode._ /** Java API */ def create(config: Config): StreamSubscriptionTimeoutSettings = @@ -263,21 +264,23 @@ object StreamSubscriptionTimeoutSettings { case "warn" ⇒ WarnTermination case "cancel" ⇒ CancelTermination }, - timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis, - dispatcher = c.getString("dispatcher")) + timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis) } } -final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration, dispatcher: String) +final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration) sealed abstract class StreamSubscriptionTimeoutTerminationMode + object StreamSubscriptionTimeoutTerminationMode { + case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode + case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode + case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode + /** Java API */ def noop = NoopTermination /** Java API */ def warn = WarnTermination /** Java API */ def cancel = CancelTermination + } -case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode -case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode -case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 4b919623a25..460fdd6fcd6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -6,11 +6,12 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference import akka.actor.ActorLogging import akka.actor.Cancellable -import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } +import akka.stream.ReactiveStreamsConstants import akka.actor.{ Actor, ActorRef } import akka.stream.MaterializerSettings import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.collection.mutable +import scala.concurrent.duration.FiniteDuration /** * INTERNAL API @@ -20,6 +21,7 @@ private[akka] object MultiStreamOutputProcessor { case class SubstreamRequestMore(substream: SubstreamKey, demand: Long) case class SubstreamCancel(substream: SubstreamKey) case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) + case class SubstreamSubscriptionTimeout(substream: SubstreamKey) class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { override def request(elements: Long): Unit = @@ -38,7 +40,7 @@ private[akka] object MultiStreamOutputProcessor { final case class Failed(e: Throwable) extends CompletedState } - class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: SubscriptionTimeout) + class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable) extends SimpleOutputs(actor, pump) with Publisher[Any] { import SubstreamOutput._ @@ -69,6 +71,7 @@ private[akka] object MultiStreamOutputProcessor { } private def closePublisher(withState: CompletedState): Unit = { + subscriptionTimeout.cancel() state.getAndSet(withState) match { case Attached(sub) ⇒ closeSubscriber(sub, withState) case _: CompletedState ⇒ throw new IllegalStateException("Attempted to double shutdown publisher") @@ -82,14 +85,13 @@ private[akka] object MultiStreamOutputProcessor { } override def subscribe(s: Subscriber[_ >: Any]): Unit = { - if (subscriptionTimeout.cancelAndHandle(s)) { - if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) - else { - state.get() match { - case _: Attached ⇒ s.onError(new IllegalStateException("GroupBy substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber)) - case c: CompletedState ⇒ closeSubscriber(s, c) - case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") - } + subscriptionTimeout.cancel() + if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) + else { + state.get() match { + case _: Attached ⇒ s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber)) + case c: CompletedState ⇒ closeSubscriber(s, c) + case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") } } } @@ -109,6 +111,7 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc this: Actor with ActorLogging ⇒ import MultiStreamOutputProcessor._ + import StreamSubscriptionTimeoutSupport._ protected def nextId(): Long @@ -116,11 +119,10 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc protected def createSubstreamOutput(): SubstreamOutput = { val id = SubstreamKey(nextId()) - val outputs = publisherWithStreamSubscriptionTimeout { - new SubstreamOutput(id, self, this, _) - } - substreamOutputs(outputs.key) = outputs - outputs + val cancellable = scheduleSubscriptionTimeout(self, SubstreamSubscriptionTimeout(id)) + val output = new SubstreamOutput(id, self, this, cancellable) + substreamOutputs(output.key) = output + output } protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = { @@ -148,6 +150,14 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc } case SubstreamCancel(key) ⇒ invalidateSubstreamOutput(key) case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs(key).attachSubscriber(subscriber) + case SubstreamSubscriptionTimeout(key) ⇒ subscriptionTimedOut(substreamOutputs(key)) + } + + override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match { + case s: SubstreamOutput ⇒ + s.cancel(cause) + s.attachSubscriber(CancelingSubscriber) + case _ ⇒ // ignore } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala index a6ba2353495..a1dcbfd5403 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala @@ -3,39 +3,35 @@ */ package akka.stream.impl -import java.util.concurrent.atomic.AtomicBoolean - import akka.actor._ -import akka.dispatch.ExecutionContexts -import akka.stream.CancelTermination -import akka.stream.NoopTermination +import akka.stream.StreamSubscriptionTimeoutTerminationMode.{ CancelTermination, NoopTermination, WarnTermination } import akka.stream.StreamSubscriptionTimeoutSettings -import akka.stream.WarnTermination -import org.reactivestreams.Processor -import org.reactivestreams.Publisher -import org.reactivestreams.Subscriber -import org.reactivestreams.Subscription - +import org.reactivestreams._ import scala.concurrent.duration.FiniteDuration -import scala.concurrent.Future -import scala.concurrent.Promise import scala.util.control.NoStackTrace -/** - * Handed to a [[Publisher]] participating in subscription-timeouts. - * - * It *MUST* cancel this timeout the earliest it can in it's `subscribe(Subscriber[T])` method to prevent the timeout from being triggered spuriously. - */ -trait SubscriptionTimeout { +object StreamSubscriptionTimeoutSupport { + /** - * Cancels the subscription timeout and returns `true` if the given `Subscriber` is valid to be processed. - * For example, if termination is in progress already the Processor should not process this incoming subscriber. - * In case of returning `false` as in "do not handle this subscriber", this method takes care of cancelling the Subscriber - * automatically by signalling `onError` with an adequate description of the subscription-timeout being exceeded. + * A subscriber who calls `cancel` directly from `onSubscribe` and ignores all other callbacks. + */ + final case object CancelingSubscriber extends Subscriber[Any] { + override def onSubscribe(s: Subscription): Unit = s.cancel() + override def onError(t: Throwable): Unit = () + override def onComplete(): Unit = () + override def onNext(t: Any): Unit = () + } + + /** + * INTERNAL API * - * [[Publisher]] implementations *MUST* use this method to guard any handling of Subscribers (in `Publisher#subscribe`). + * Subscription timeout which does not start any scheduled events and always returns `true`. + * This specialized implementation is to be used for "noop" timeout mode. */ - def cancelAndHandle(s: Subscriber[_]): Boolean + private final case object NoopSubscriptionTimeout extends Cancellable { + override def cancel() = true + override def isCancelled = true + } } /** @@ -47,109 +43,59 @@ trait SubscriptionTimeout { trait StreamSubscriptionTimeoutSupport { this: Actor with ActorLogging ⇒ - /** Default settings for subscription timeouts. */ - def subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings + import StreamSubscriptionTimeoutSupport._ /** - * Creates a [[Publisher]] using the given `mkPublisher` function and registers it for subscription-timeout termination, - * using the default timeout from the configuration. - * - * The created Publisher MUST wrap it's code handling a Subscribers incoming subscription in an `if (subscriptionTimeout.cancel())` block. - * This is in order to avoid races between the timer cancelling the publisher and it acknowlaging an incoming Subscriber. + * Default settings for subscription timeouts. */ - def publisherWithStreamSubscriptionTimeout[Pub <: Publisher[_]](mkPublisher: SubscriptionTimeout ⇒ Pub): Pub = - publisherWithStreamSubscriptionTimeout(subscriptionTimeoutSettings.timeout)(mkPublisher) + protected def subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings /** - * Creates a [[Publisher]] using the given `mkPublisher` function and registers it for subscription-timeout termination, - * using the passed in timeout. - * - * The created Publisher MUST wrap it's code handling a Subscribers incoming subscription in an `if (subscriptionTimeout.cancel())` block. - * This is in order to avoid races between the timer cancelling the publisher and it acknowlaging an incoming Subscriber. + * Schedules a Subscription timeout. + * The actor will receive the message created by the provided block if the timeout triggers. */ - def publisherWithStreamSubscriptionTimeout[Pub <: Publisher[_]](timeoutOverride: FiniteDuration)(mkPublisher: SubscriptionTimeout ⇒ Pub): Pub = { - val p = Promise[Publisher[_]]() // to break chicken-and-egg with subscriptionTimeout - - val subscriptionTimeout = scheduleSubscriptionTimeout(p.future, timeoutOverride) - val pub = mkPublisher(subscriptionTimeout) - p.success(pub) - - pub - } - - private def scheduleSubscriptionTimeout(rs: Future[_], timeout: FiniteDuration): SubscriptionTimeout = { - implicit val dispatcher = - if (subscriptionTimeoutSettings.dispatcher.trim.isEmpty) context.dispatcher - else context.system.dispatchers.lookup(subscriptionTimeoutSettings.dispatcher) - - new SubscriptionTimeout { - private val safeToCancelTimer = new AtomicBoolean(true) - - val subscriptionTimeout = context.system.scheduler.scheduleOnce(timeout, new Runnable { - override def run(): Unit = { - if (safeToCancelTimer.compareAndSet(true, false)) - onReactiveStream { terminate(_, timeout) } - } - }) - - override def cancelAndHandle(s: Subscriber[_]): Boolean = s match { - case _ if subscriptionTimeout.isCancelled ⇒ - // there was some initial subscription already, which cancelled the timeout => continue normal operation - true - - case _ if safeToCancelTimer.get ⇒ - // first subscription signal, cancel the subscription-timeout - safeToCancelTimer.compareAndSet(true, false) && subscriptionTimeout.cancel() - true - - case CancellingSubscriber if !safeToCancelTimer.get ⇒ - // publisher termination in progress - normally we'd onError all subscribers, except the CancellationSubscriber (!) - // guaranteed that no other subscribers are coming in now - true - - case _ ⇒ - // terminated - kill incoming subscribers - onReactiveStream { rs ⇒ - s.onError(new SubscriptionTimeoutException(s"Publisher (${rs}) you are trying to subscribe to has been shut-down " + - s"because exceeding it's subscription-timeout.") with NoStackTrace) - } - - false - } + protected def scheduleSubscriptionTimeout(actor: ActorRef, message: Any): Cancellable = + subscriptionTimeoutSettings.mode match { + case NoopTermination ⇒ + NoopSubscriptionTimeout + case _ ⇒ + import context.dispatcher + val cancellable = context.system.scheduler.scheduleOnce(subscriptionTimeoutSettings.timeout, actor, message) + cancellable + } - private final def onReactiveStream(block: Any ⇒ Unit) = - rs.foreach { rs ⇒ block(rs) }(ExecutionContexts.sameThreadExecutionContext) + private def cancel(target: Publisher[_], timeout: FiniteDuration): Unit = { + val millis = timeout.toMillis + target match { + case p: Processor[_, _] ⇒ + log.debug("Cancelling {} Processor's publisher and subscriber sides (after {} ms)", p, millis) + handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${millis}) ms") with NoStackTrace) + + case p: Publisher[_] ⇒ + log.debug("Cancelling {} (after: {} ms)", p, millis) + handleSubscriptionTimeout(target, new SubscriptionTimeoutException(s"Publisher (${p}) you are trying to subscribe to has been shut-down " + + s"because exceeding it's subscription-timeout.") with NoStackTrace) } } - private def cancel(rs: Any, timeout: FiniteDuration): Unit = rs match { - case p: Processor[_, _] ⇒ - log.debug("Cancelling {} Processor's publisher and subscriber sides (after {})", p, timeout) - p.subscribe(CancellingSubscriber) - p.onError(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${timeout})") with NoStackTrace) - - case p: Publisher[_] ⇒ - log.debug("Cancelling {} using CancellingSubscriber (after: {})", p, timeout) - p.subscribe(CancellingSubscriber) + private def warn(target: Publisher[_], timeout: FiniteDuration): Unit = { + log.warning("Timed out {} detected (after {} ms)! You should investigate if you either cancel or consume all {} instances", + target, timeout.toMillis, target.getClass.getCanonicalName) } - private def warn(rs: Any, timeout: FiniteDuration): Unit = { - log.warning("Timed out {} detected (after {})! You should investigate if you either cancel or consume all {} instances", - rs, timeout, rs.getClass.getCanonicalName) - } - private def terminate(el: Any, timeout: FiniteDuration): Unit = subscriptionTimeoutSettings.mode match { + /** + * Called by the actor when a subscription has timed out. Expects the actual `Publisher` or `Processor` target. + */ + protected def subscriptionTimedOut(target: Publisher[_]): Unit = subscriptionTimeoutSettings.mode match { case NoopTermination ⇒ // ignore... - case WarnTermination ⇒ warn(el, timeout) - case CancelTermination ⇒ cancel(el, timeout) - } - - private final case object CancellingSubscriber extends Subscriber[Any] { - override def onSubscribe(s: Subscription): Unit = s.cancel() - override def onError(t: Throwable): Unit = () - override def onComplete(): Unit = () - override def onNext(t: Any): Unit = () + case WarnTermination ⇒ warn(target, subscriptionTimeoutSettings.timeout) + case CancelTermination ⇒ cancel(target, subscriptionTimeoutSettings.timeout) } + /** + * Callback that should ensure that the target is cancelled with the given cause. + */ + protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception): Unit } -class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg) \ No newline at end of file +class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg)