Skip to content

Commit

Permalink
=str akka#22448 rewrite Source.empty to GraphStage
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Mar 7, 2017
1 parent 0367812 commit e47dbda
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 10 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright (C) 2014-2017 Lightbend Inc. <http://www.lightbend.com>
*/

package akka.stream

import java.util.concurrent.TimeUnit

import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.{ Done, NotUsed }
import org.openjdk.jmh.annotations._

import scala.concurrent._
import scala.concurrent.duration._

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class EmptySourceBenchmark {
implicit val system = ActorSystem("EmptySourceBenchmark")
val materializerSettings = ActorMaterializerSettings(system).withDispatcher("akka.test.stream-dispatcher")
implicit val materializer = ActorMaterializer(materializerSettings)


@TearDown
def shutdown(): Unit = {
Await.result(system.terminate(), 5.seconds)
}

val setup = Source.empty[String].toMat(Sink.ignore)(Keep.right)

@Benchmark def empty(): Unit =
Await.result(setup.run(), Duration.Inf)


/*
(not serious benchmark, just sanity check: run on macbook 15, late 2013)
While it was a PublisherSource:
[info] EmptySourceBenchmark.empty thrpt 10 11.219 ± 6.498 ops/ms
Rewrite to GraphStage:
[info] EmptySourceBenchmark.empty thrpt 10 17.556 ± 2.865 ops/ms
*/
}
38 changes: 38 additions & 0 deletions akka-docs/rst/scala/code/docs/actor/LOLSPEC.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package docs.actor

import akka.actor.ActorLogging
import scala.language.postfixOps

import akka.Done
import akka.actor.{ ActorRef, CoordinatedShutdown }
import akka.testkit._
import akka.util._

import scala.concurrent.{ Await, Future }
import scala.concurrent.duration._

import akka.actor.ActorSystem
import akka.actor.Actor
import akka.actor.Props
import akka.testkit.{ ImplicitSender, TestKit }

class LOLSPEC extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {

"schedule a one-off task" in {
val miku = system.actorOf(Props(new Actor {
def receive = {
case x =>
println(s"sender() = ${sender()}")
}
}))

system.eventStream.subscribe(miku, classOf[Object])

system.eventStream.publish("Hello!")
}

}

Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

package akka.remote.serialization

import akka.actor.{Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.remote.{DaemonMsgCreate, RemoteScope}
import akka.routing.{FromConfig, RoundRobinPool}
import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, ExtendedActorSystem, OneForOneStrategy, Props, SupervisorStrategy }
import akka.remote.{ DaemonMsgCreate, RemoteScope }
import akka.routing.{ FromConfig, RoundRobinPool }
import akka.serialization.SerializationExtension
import akka.testkit.{AkkaSpec, TestKit}
import akka.testkit.{ AkkaSpec, TestKit }
import com.typesafe.config.ConfigFactory

import scala.concurrent.duration._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
*/
package akka.stream.impl

import org.reactivestreams.{ Subscriber, Publisher, Subscription }
import akka.annotation.InternalApi
import org.reactivestreams.{ Publisher, Subscriber, Subscription }

import scala.concurrent.{ ExecutionContext, Promise }

/**
* INTERNAL API
*/
@InternalApi
private[akka] case object EmptyPublisher extends Publisher[Nothing] {
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
Expand Down
21 changes: 21 additions & 0 deletions akka-stream/src/main/scala/akka/stream/impl/Sources.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import scala.concurrent.{ Future, Promise }
import akka.Done
import java.util.concurrent.CompletionStage

import akka.annotation.InternalApi

import scala.compat.java8.FutureConverters._
import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -396,3 +398,22 @@ final class LazySource[T, M](sourceFactory: () ⇒ Source[T, M]) extends GraphSt
override def toString = "LazySource"
}

/** INTERNAL API */
@InternalApi
final object EmptySource extends GraphStage[SourceShape[Nothing]] {
val out = Outlet[Nothing]("EmptySource.out")
override val shape = SourceShape(out)

override protected def initialAttributes = DefaultAttributes.lazySource

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with OutHandler {
override def preStart(): Unit = completeStage()
override def onPull(): Unit = completeStage()

setHandler(out, this)
}

override def toString = "EmptySource"
}

6 changes: 1 addition & 5 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -331,11 +331,7 @@ object Source {
*/
def empty[T]: Source[T, NotUsed] = _empty
private[this] val _empty: Source[Nothing, NotUsed] =
new Source(
new PublisherSource[Nothing](
EmptyPublisher,
DefaultAttributes.emptySource,
shape("EmptySource")))
Source.fromGraph(EmptySource)

/**
* Create a `Source` which materializes a [[scala.concurrent.Promise]] which controls what element
Expand Down

0 comments on commit e47dbda

Please sign in to comment.