Skip to content

Commit

Permalink
fixes akka#20414 Allow different ActorMaterializer subtypes
Browse files Browse the repository at this point in the history
  • Loading branch information
kkasravi committed Jun 7, 2016
1 parent 47c1b5b commit 049b950
Show file tree
Hide file tree
Showing 21 changed files with 114 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ private[http] object HttpServerBluePrint {
})

private var activeTimers = 0
private def timeout = ActorMaterializer.downcast(materializer).settings.subscriptionTimeoutSettings.timeout
private def timeout = ActorMaterializerHelper.downcast(materializer).settings.subscriptionTimeoutSettings.timeout
private def addTimeout(s: SubscriptionTimeout): Unit = {
if (activeTimers == 0) setKeepGoing(true)
activeTimers += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package akka.http.scaladsl.server

import scala.concurrent.{ Future, ExecutionContextExecutor }
import akka.stream.{ ActorMaterializer, Materializer }
import scala.concurrent.{ExecutionContextExecutor, Future}
import akka.stream.{ActorMaterializer, ActorMaterializerHelper, Materializer}
import akka.event.LoggingAdapter
import akka.http.scaladsl.settings.{ RoutingSettings, ParserSettings }
import akka.http.scaladsl.marshalling.{ Marshal, ToResponseMarshallable }
import akka.http.scaladsl.settings.{ParserSettings, RoutingSettings}
import akka.http.scaladsl.marshalling.{Marshal, ToResponseMarshallable}
import akka.http.scaladsl.model._
import akka.http.scaladsl.util.FastFuture
import akka.http.scaladsl.util.FastFuture._
Expand All @@ -29,7 +29,7 @@ private[http] class RequestContextImpl(
this(request, request.uri.path, ec, materializer, log, settings, parserSettings)

def this(request: HttpRequest, log: LoggingAdapter, settings: RoutingSettings)(implicit ec: ExecutionContextExecutor, materializer: Materializer) =
this(request, request.uri.path, ec, materializer, log, settings, ParserSettings(ActorMaterializer.downcast(materializer).system))
this(request, request.uri.path, ec, materializer, log, settings, ParserSettings(ActorMaterializerHelper.downcast(materializer).system))

def reconfigure(executionContext: ExecutionContextExecutor, materializer: Materializer, log: LoggingAdapter, settings: RoutingSettings): RequestContext =
copy(executionContext = executionContext, materializer = materializer, log = log, routingSettings = settings)
Expand Down
10 changes: 5 additions & 5 deletions akka-http/src/main/scala/akka/http/scaladsl/server/Route.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package akka.http.scaladsl.server

import akka.NotUsed
import akka.http.scaladsl.settings.{ RoutingSettings, ParserSettings }
import akka.stream.{ ActorMaterializer, Materializer }
import akka.http.scaladsl.settings.{ParserSettings, RoutingSettings}
import akka.stream.{ActorMaterializer, ActorMaterializerHelper, Materializer}

import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.{ExecutionContextExecutor, Future}
import akka.stream.scaladsl.Flow
import akka.http.scaladsl.model.{ HttpRequest, HttpResponse }
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.util.FastFuture._

object Route {
Expand Down Expand Up @@ -66,7 +66,7 @@ object Route {

{
implicit val executionContext = effectiveEC // overrides parameter
val effectiveParserSettings = if (parserSettings ne null) parserSettings else ParserSettings(ActorMaterializer.downcast(materializer).system)
val effectiveParserSettings = if (parserSettings ne null) parserSettings else ParserSettings(ActorMaterializerHelper.downcast(materializer).system)

val sealedRoute = seal(route)
request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import akka.http.scaladsl.settings.ParserSettings
import scala.collection.immutable
import scala.collection.immutable.VectorBuilder
import akka.util.ByteString
import akka.event.{ NoLogging, LoggingAdapter }
import akka.stream.ActorMaterializer
import akka.event.{LoggingAdapter, NoLogging}
import akka.stream.{ActorMaterializer, ActorMaterializerHelper}
import akka.stream.impl.fusing.IteratorInterpreter
import akka.stream.scaladsl._
import akka.http.impl.engine.parsing.BodyPartParser
Expand Down Expand Up @@ -75,7 +75,7 @@ trait MultipartUnmarshallers {
FastFuture.failed(new RuntimeException("Content-Type with a multipart media type must have a 'boundary' parameter"))
case Some(boundary)
import BodyPartParser._
val effectiveParserSettings = Option(parserSettings).getOrElse(ParserSettings(ActorMaterializer.downcast(mat).system))
val effectiveParserSettings = Option(parserSettings).getOrElse(ParserSettings(ActorMaterializerHelper.downcast(mat).system))
val parser = new BodyPartParser(defaultContentType, boundary, log, effectiveParserSettings)
FastFuture.successful {
entity match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
*/
package akka.stream.impl

import akka.stream._
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import org.reactivestreams.{ Subscription, Subscriber, Publisher }
import akka.stream._
import org.reactivestreams.{ Publisher, Subscriber, Subscription }

import scala.concurrent.duration._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import akka.stream.ActorAttributes._
import akka.stream.Attributes.LogLevels
import akka.stream.Supervision._
import akka.stream.testkit.ScriptedTest
import akka.stream.javadsl
import akka.stream.{ ActorMaterializer, Materializer, Attributes }
import akka.stream._
import akka.testkit.TestProbe

import scala.concurrent.duration._
import scala.concurrent.Await
import scala.util.control.NoStackTrace
Expand All @@ -29,7 +29,7 @@ class FlowLogSpec extends AkkaSpec("akka.loglevel = DEBUG") with ScriptedTest {

"A Log" must {

val supervisorPath = ActorMaterializer.downcast(mat).supervisor.path
val supervisorPath = ActorMaterializerHelper.downcast(mat).supervisor.path
val LogSrc = s"akka.stream.Log($supervisorPath)"
val LogClazz = classOf[Materializer]

Expand Down
17 changes: 13 additions & 4 deletions akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package akka.stream

import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.{ AtomicBoolean }
import java.util.concurrent.atomic.AtomicBoolean

import akka.actor.{ ActorContext, ActorRef, ActorRefFactory, ActorSystem, ExtendedActorSystem, Props }
import akka.event.LoggingAdapter
Expand All @@ -16,6 +16,7 @@ import com.typesafe.config.Config

import scala.concurrent.duration._
import akka.japi.function
import akka.stream.impl.fusing.GraphInterpreterShell

import scala.util.control.NoStackTrace

Expand Down Expand Up @@ -126,6 +127,12 @@ object ActorMaterializer {
system
}

}

/**
* INTERNAL API
*/
private[akka] object ActorMaterializerHelper {
/**
* INTERNAL API
*/
Expand Down Expand Up @@ -163,21 +170,23 @@ abstract class ActorMaterializer extends Materializer {
def isShutdown: Boolean

/**
* INTERNAL API: this might become public later
* INTERNAL API
*/
private[akka] def actorOf(context: MaterializationContext, props: Props): ActorRef

/**
* INTERNAL API
*/
private[akka] def system: ActorSystem
def system: ActorSystem

/**
* INTERNAL API
*/
private[akka] def logger: LoggingAdapter

/** INTERNAL API */
/**
* INTERNAL API
*/
private[akka] def supervisor: ActorRef

}
Expand Down
3 changes: 1 addition & 2 deletions akka-stream/src/main/scala/akka/stream/Materializer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ private[akka] object NoMaterializer extends Materializer {
}

/**
* INTERNAL API: this might become public later
*
* Context parameter to the `create` methods of sources and sinks.
*/
private[akka] case class MaterializationContext(
case class MaterializationContext(
materializer: Materializer,
effectiveAttributes: Attributes,
stageName: String)
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,59 @@
*/
package akka.stream.impl

import java.util.concurrent.atomic.{ AtomicBoolean }
import java.util.concurrent.atomic.AtomicBoolean
import java.{ util ju }

import akka.NotUsed
import akka.actor._
import akka.event.Logging
import akka.event.{ Logging, LoggingAdapter }
import akka.dispatch.Dispatchers
import akka.pattern.ask
import akka.stream._
import akka.stream.impl.StreamLayout.{ Module, AtomicModule }
import akka.stream.impl.StreamLayout.{ AtomicModule, Module }
import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule }
import akka.stream.impl.io.TLSActor
import akka.stream.impl.io.TlsModule
import org.reactivestreams._

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ Await, ExecutionContextExecutor }
import akka.stream.impl.fusing.GraphStageModule
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
import akka.stream.impl.fusing.Fusing
import akka.stream.impl.fusing.GraphInterpreterShell

/**
* ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell
*/
abstract class ExtendedActorMaterializer extends ActorMaterializer {

override def withNamePrefix(name: String): ExtendedActorMaterializer

/**
* INTERNAL API
*/
def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef): Mat

/**
* INTERNAL API
*/
def actorOf(context: MaterializationContext, props: Props): ActorRef

/**
* INTERNAL API
*/
override def logger: LoggingAdapter

/**
* INTERNAL API
*/
override def supervisor: ActorRef

}

/**
* INTERNAL API
*/
Expand All @@ -32,7 +65,7 @@ private[akka] case class ActorMaterializerImpl(
dispatchers: Dispatchers,
supervisor: ActorRef,
haveShutDown: AtomicBoolean,
flowNames: SeqActorName) extends ActorMaterializer {
flowNames: SeqActorName) extends ExtendedActorMaterializer {
import akka.stream.impl.Stages._
private val _logger = Logging.getLogger(system, this)
override def logger = _logger
Expand Down Expand Up @@ -79,7 +112,7 @@ private[akka] case class ActorMaterializerImpl(
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
materialize(_runnableGraph, null)

private[stream] def materialize[Mat](
override def materialize[Mat](
_runnableGraph: Graph[ClosedShape, Mat],
subflowFuser: GraphInterpreterShell ActorRef): Mat = {
val runnableGraph =
Expand Down Expand Up @@ -213,7 +246,7 @@ private[akka] case class ActorMaterializerImpl(

}

private[akka] class SubFusingActorMaterializerImpl(val delegate: ActorMaterializerImpl, registerShell: GraphInterpreterShell ActorRef) extends Materializer {
private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ActorRef) extends Materializer {
override def executionContext: ExecutionContextExecutor = delegate.executionContext

override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell)
Expand All @@ -223,7 +256,7 @@ private[akka] class SubFusingActorMaterializerImpl(val delegate: ActorMaterializ
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =
delegate.schedulePeriodically(initialDelay, interval, task)

def withNamePrefix(name: String): SubFusingActorMaterializerImpl =
override def withNamePrefix(name: String): SubFusingActorMaterializerImpl =
new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell)
}

Expand Down
4 changes: 2 additions & 2 deletions akka-stream/src/main/scala/akka/stream/impl/Modules.scala
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ private[akka] final class MaybeSource[Out](val attributes: Attributes, shape: So
private[akka] final class ActorPublisherSource[Out](props: Props, val attributes: Attributes, shape: SourceShape[Out]) extends SourceModule[Out, ActorRef](shape) {

override def create(context: MaterializationContext) = {
val publisherRef = ActorMaterializer.downcast(context.materializer).actorOf(context, props)
val publisherRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
(akka.stream.actor.ActorPublisher[Out](publisherRef), publisherRef)
}

Expand All @@ -113,7 +113,7 @@ private[akka] final class ActorRefSource[Out](
override protected def label: String = s"ActorRefSource($bufferSize, $overflowStrategy)"

override def create(context: MaterializationContext) = {
val mat = ActorMaterializer.downcast(context.materializer)
val mat = ActorMaterializerHelper.downcast(context.materializer)
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
(akka.stream.actor.ActorPublisher[Out](ref), ref)
}
Expand Down
8 changes: 4 additions & 4 deletions akka-stream/src/main/scala/akka/stream/impl/Sinks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ private[akka] final class FanoutPublisherSink[In](
extends SinkModule[In, Publisher[In]](shape) {

override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
val impl = actorMaterializer.actorOf(
context,
FanoutProcessorImpl.props(actorMaterializer.effectiveSettings(attributes)))
Expand All @@ -124,7 +124,7 @@ private[akka] final class FanoutPublisherSink[In](
private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Done]](shape) {

override def create(context: MaterializationContext) = {
val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
val effectiveSettings = ActorMaterializerHelper.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
val p = Promise[Done]()
(new SinkholeSubscriber[Any](p), p.future)
}
Expand Down Expand Up @@ -163,7 +163,7 @@ private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShap
private[akka] final class ActorSubscriberSink[In](props: Props, val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, ActorRef](shape) {

override def create(context: MaterializationContext) = {
val subscriberRef = ActorMaterializer.downcast(context.materializer).actorOf(context, props)
val subscriberRef = ActorMaterializerHelper.downcast(context.materializer).actorOf(context, props)
(akka.stream.actor.ActorSubscriber[In](subscriberRef), subscriberRef)
}

Expand All @@ -179,7 +179,7 @@ private[akka] final class ActorRefSink[In](ref: ActorRef, onCompleteMessage: Any
shape: SinkShape[In]) extends SinkModule[In, NotUsed](shape) {

override def create(context: MaterializationContext) = {
val actorMaterializer = ActorMaterializer.downcast(context.materializer)
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
val effectiveSettings = actorMaterializer.effectiveSettings(context.effectiveAttributes)
val subscriberRef = actorMaterializer.actorOf(
context,
Expand Down
12 changes: 6 additions & 6 deletions akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Pub
/**
* INERNAL API
*/
private[stream] object MaterializerSession {
object MaterializerSession {
class MaterializationPanic(cause: Throwable) extends RuntimeException("Materialization aborted.", cause) with NoStackTrace

final val Debug = false
Expand All @@ -810,7 +810,7 @@ private[stream] object MaterializerSession {
/**
* INTERNAL API
*/
private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) {
abstract class MaterializerSession(val topLevel: StreamLayout.Module, val initialAttributes: Attributes) {
import StreamLayout._

// the contained maps store either Subscriber[Any] or VirtualPublisher, but the type system cannot express that
Expand Down Expand Up @@ -839,7 +839,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// Enters a copied module and establishes a scope that prevents internals to leak out and interfere with copies
// of the same module.
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def enterScope(enclosing: CopiedModule): Unit = {
protected def enterScope(enclosing: CopiedModule): Unit = {
if (MaterializerSession.Debug) println(f"entering scope [${System.identityHashCode(enclosing)}%08x]")
subscribersStack ::= new ju.HashMap
publishersStack ::= new ju.HashMap
Expand All @@ -851,7 +851,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
// them to the copied ports instead of the original ones (since there might be multiple copies of the same module
// leading to port identity collisions)
// We don't store the enclosing CopiedModule itself as state since we don't use it anywhere else than exit and enter
private def exitScope(enclosing: CopiedModule): Unit = {
protected def exitScope(enclosing: CopiedModule): Unit = {
if (MaterializerSession.Debug) println(f"exiting scope [${System.identityHashCode(enclosing)}%08x]")
val scopeSubscribers = subscribers
val scopePublishers = publishers
Expand Down Expand Up @@ -969,7 +969,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
ret
}

final protected def assignPort(in: InPort, subscriberOrVirtual: AnyRef): Unit = {
protected def assignPort(in: InPort, subscriberOrVirtual: AnyRef): Unit = {
subscribers.put(in, subscriberOrVirtual)

currentLayout.upstreams.get(in) match {
Expand All @@ -981,7 +981,7 @@ private[stream] abstract class MaterializerSession(val topLevel: StreamLayout.Mo
}
}

final protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = {
protected def assignPort(out: OutPort, publisher: Publisher[Any]): Unit = {
publishers.put(out, publisher)

currentLayout.downstreams.get(out) match {
Expand Down
Loading

0 comments on commit 049b950

Please sign in to comment.