Skip to content

Commit

Permalink
Merge pull request akka#16282 from akka/wip-16263-subscription-timeou…
Browse files Browse the repository at this point in the history
…t-ban

!str akka#16263 Made subscription timeout handling run mostly inside the actor
  • Loading branch information
bantonsson committed Nov 12, 2014
2 parents d8ae408 + 5a26718 commit d20d057
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,46 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
// it should be terminated after none of it's substreams are used within the timeout
expectTerminated(groupByActor, 1000.millis)
}

"not timeout and cancel substream publishers when they have been subscribed to" in {
val publisherProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)

val upstreamSubscription = publisherProbe.expectSubscription()

val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)

upstreamSubscription.sendNext(1)
upstreamSubscription.sendNext(2)

val (_, s1) = subscriber.expectNext()
// should not break normal usage
val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe)
val s1Sub = s1SubscriberProbe.expectSubscription()
s1Sub.request(1)
s1SubscriberProbe.expectNext(1)

val (_, s2) = subscriber.expectNext()
// should not break normal usage
val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe)
val s2Sub = s2SubscriberProbe.expectSubscription()

// sleep long enough for tiemout to trigger if not cancelled
Thread.sleep(1000)

s2Sub.request(100)
s2SubscriberProbe.expectNext(2)
s1Sub.request(100)
upstreamSubscription.sendNext(3)
upstreamSubscription.sendNext(4)
s1SubscriberProbe.expectNext(3)
s2SubscriberProbe.expectNext(4)
}
}

private def watchGroupByActor(flowNr: Int): ActorRef = {
Expand Down
5 changes: 0 additions & 5 deletions akka-stream/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,6 @@ akka {

# Cleanup leaked publishers and subscribers when they are not used within a given deadline
subscription-timeout {
# Fully qualified config path which holds the dispatcher configuration
# to be used for the scheduled stream cancellations.
# When this value is left empty, the containing actor context's dispatcher will be used.
dispatcher = ""

# when the subscription timeout is reached one of the following strategies on the "stale" publisher:
# cancel - cancel it (via `onError` or subscribing to the publisher and `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the reference to it)
Expand Down
15 changes: 9 additions & 6 deletions akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ final case class MaterializerSettings(
}

object StreamSubscriptionTimeoutSettings {
import StreamSubscriptionTimeoutTerminationMode._

/** Java API */
def create(config: Config): StreamSubscriptionTimeoutSettings =
Expand All @@ -263,21 +264,23 @@ object StreamSubscriptionTimeoutSettings {
case "warn" WarnTermination
case "cancel" CancelTermination
},
timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis,
dispatcher = c.getString("dispatcher"))
timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis)
}
}
final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration, dispatcher: String)
final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration)

sealed abstract class StreamSubscriptionTimeoutTerminationMode

object StreamSubscriptionTimeoutTerminationMode {
case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode
case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode
case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode

/** Java API */
def noop = NoopTermination
/** Java API */
def warn = WarnTermination
/** Java API */
def cancel = CancelTermination

}
case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode
case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode
case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorLogging
import akka.actor.Cancellable
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import akka.stream.ReactiveStreamsConstants
import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

/**
* INTERNAL API
Expand All @@ -20,6 +21,7 @@ private[akka] object MultiStreamOutputProcessor {
case class SubstreamRequestMore(substream: SubstreamKey, demand: Long)
case class SubstreamCancel(substream: SubstreamKey)
case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any])
case class SubstreamSubscriptionTimeout(substream: SubstreamKey)

class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription {
override def request(elements: Long): Unit =
Expand All @@ -38,7 +40,7 @@ private[akka] object MultiStreamOutputProcessor {
final case class Failed(e: Throwable) extends CompletedState
}

class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: SubscriptionTimeout)
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: Cancellable)
extends SimpleOutputs(actor, pump) with Publisher[Any] {

import SubstreamOutput._
Expand Down Expand Up @@ -69,6 +71,7 @@ private[akka] object MultiStreamOutputProcessor {
}

private def closePublisher(withState: CompletedState): Unit = {
subscriptionTimeout.cancel()
state.getAndSet(withState) match {
case Attached(sub) closeSubscriber(sub, withState)
case _: CompletedState throw new IllegalStateException("Attempted to double shutdown publisher")
Expand All @@ -82,14 +85,13 @@ private[akka] object MultiStreamOutputProcessor {
}

override def subscribe(s: Subscriber[_ >: Any]): Unit = {
if (subscriptionTimeout.cancelAndHandle(s)) {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("GroupBy substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
subscriptionTimeout.cancel()
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
else {
state.get() match {
case _: Attached s.onError(new IllegalStateException("Substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
}
}
Expand All @@ -109,18 +111,18 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
this: Actor with ActorLogging

import MultiStreamOutputProcessor._
import StreamSubscriptionTimeoutSupport._

protected def nextId(): Long

private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutput]

protected def createSubstreamOutput(): SubstreamOutput = {
val id = SubstreamKey(nextId())
val outputs = publisherWithStreamSubscriptionTimeout {
new SubstreamOutput(id, self, this, _)
}
substreamOutputs(outputs.key) = outputs
outputs
val cancellable = scheduleSubscriptionTimeout(self, SubstreamSubscriptionTimeout(id))
val output = new SubstreamOutput(id, self, this, cancellable)
substreamOutputs(output.key) = output
output
}

protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
Expand Down Expand Up @@ -148,6 +150,14 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
}
case SubstreamCancel(key) invalidateSubstreamOutput(key)
case SubstreamSubscribe(key, subscriber) substreamOutputs(key).attachSubscriber(subscriber)
case SubstreamSubscriptionTimeout(key) subscriptionTimedOut(substreamOutputs(key))
}

override protected def handleSubscriptionTimeout(target: Publisher[_], cause: Exception) = target match {
case s: SubstreamOutput
s.cancel(cause)
s.attachSubscriber(CancelingSubscriber)
case _ // ignore
}
}

Expand Down
Loading

0 comments on commit d20d057

Please sign in to comment.