Skip to content

Commit

Permalink
SSE: Akka 2.6 (akka#2543)
Browse files Browse the repository at this point in the history
  • Loading branch information
ennru authored Dec 14, 2020
1 parent 99b10b5 commit 342484e
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ lazy val solr = alpakkaProject("solr", "solr", Dependencies.Solr)

lazy val sqs = alpakkaProject("sqs", "aws.sqs", Dependencies.Sqs)

lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse)
lazy val sse = alpakkaProject("sse", "sse", Dependencies.Sse, fatalWarnings := true)

lazy val text = alpakkaProject("text", "text")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# The new signature takes `ClassicActorSystemProvider` instead of `Materializer`
# this is not bin-compatible, but resolved by an implicit conversion in sources
# Accepted as this targets Alpakka 3.0.0
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.alpakka.sse.scaladsl.EventSource.apply")
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import akka.http.javadsl.model.sse.ServerSentEvent
import java.util.Optional
import java.util.concurrent.CompletionStage
import java.util.function.{Function => JFunction}

import akka.actor.ClassicActorSystemProvider

import scala.compat.java8.FutureConverters
import scala.compat.java8.OptionConverters

Expand Down Expand Up @@ -64,13 +67,37 @@ object EventSource {
import FutureConverters._
import OptionConverters._

/**
* @param uri URI with absolute path, e.g. "http://myserver/events
* @param send function to send a HTTP request
* @param lastEventId initial value for Last-Evend-ID header, optional
* @param system actor system (classic or new API)
* @return continuous source of server-sent events
*/
def create(uri: Uri,
send: JFunction[HttpRequest, CompletionStage[HttpResponse]],
lastEventId: Optional[String],
system: ClassicActorSystemProvider): Source[ServerSentEvent, NotUsed] = {
val eventSource =
scaladsl
.EventSource(
uri.asScala,
send(_).toScala.map(_.asInstanceOf[SHttpResponse])(system.classicSystem.dispatcher),
lastEventId.asScala
)(system)
.map(v => v: ServerSentEvent)
eventSource.asJava
}

/**
* @param uri URI with absolute path, e.g. "http://myserver/events
* @param send function to send a HTTP request
* @param lastEventId initial value for Last-Evend-ID header, optional
* @param mat `Materializer`
* @return continuous source of server-sent events
* @deprecated pass in the actor system instead of the materializer, since 3.0.0
*/
@deprecated("pass in the actor system instead of the materializer", "3.0.0")
def create(uri: Uri,
send: JFunction[HttpRequest, CompletionStage[HttpResponse]],
lastEventId: Optional[String],
Expand All @@ -81,7 +108,7 @@ object EventSource {
uri.asScala,
send(_).toScala.map(_.asInstanceOf[SHttpResponse])(mat.executionContext),
lastEventId.asScala
)(mat)
)(mat.system)
.map(v => v: ServerSentEvent)
eventSource.asJava
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ package akka.stream.alpakka.sse
package scaladsl

import akka.NotUsed
import akka.actor.ActorSystem
import akka.actor.{ActorSystem, ClassicActorSystemProvider}
import akka.http.scaladsl.client.RequestBuilding.Get
import akka.http.scaladsl.coding.Gzip
import akka.http.scaladsl.model.headers.Accept
import akka.http.scaladsl.model.{HttpRequest, HttpResponse, Uri}
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.scaladsl.{Broadcast, Flow, GraphDSL, Merge, Source}
import akka.stream.{ActorMaterializer, Materializer, SourceShape}
import akka.stream.SourceShape
import akka.http.scaladsl.model.sse.ServerSentEvent
import akka.http.scaladsl.model.sse.ServerSentEvent.heartbeat
import akka.http.scaladsl.model.MediaTypes.`text/event-stream`
Expand Down Expand Up @@ -79,18 +79,18 @@ object EventSource {
* @param send function to send a HTTP request
* @param initialLastEventId initial value for Last-Event-ID header, `None` by default
* @param retryDelay delay for retrying after completion, `0` by default
* @param mat implicit `Materializer`, needed to obtain server-sent events
* @param system implicit actor system (classic or new API)
* @return continuous source of server-sent events
*/
def apply(uri: Uri,
send: HttpRequest => Future[HttpResponse],
initialLastEventId: Option[String] = None,
retryDelay: FiniteDuration = Duration.Zero)(
implicit mat: Materializer
implicit system: ClassicActorSystemProvider
): EventSource = {
import EventStreamUnmarshalling.fromEventsStream
import mat.executionContext
implicit val system: ActorSystem = actorMaterializer(mat).system
implicit val actorSystem: ActorSystem = system.classicSystem
import actorSystem.dispatcher

val continuousEvents = {
def getEventSource(lastEventId: Option[String]) = {
Expand Down Expand Up @@ -132,10 +132,4 @@ object EventSource {
SourceShape(events.out)
})
}

private def actorMaterializer(mat: Materializer): ActorMaterializer = mat match {
case am: ActorMaterializer => am
case _ => throw new Error("ActorMaterializer required")
}

}
6 changes: 2 additions & 4 deletions sse/src/test/java/docs/javadsl/EventSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import akka.NotUsed;
import akka.actor.ActorSystem;

import akka.stream.ActorMaterializer;
import akka.stream.ThrottleMode;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
Expand All @@ -33,7 +32,6 @@ public static void compileTest() {
String host = "localhost";
int port = 8080;
ActorSystem system = null;
ActorMaterializer materializer = null;

int nrOfSamples = 10;

Expand All @@ -46,7 +44,7 @@ public static void compileTest() {
final Uri targetUri = Uri.create(String.format("http://%s:%d", host, port));
final Optional<String> lastEventId = Optional.of("2");
Source<ServerSentEvent, NotUsed> eventSource =
EventSource.create(targetUri, send, lastEventId, materializer);
EventSource.create(targetUri, send, lastEventId, system);
// #event-source

// #consume-events
Expand All @@ -57,7 +55,7 @@ public static void compileTest() {
eventSource
.throttle(elements, per, maximumBurst, ThrottleMode.shaping())
.take(nrOfSamples)
.runWith(Sink.seq(), materializer);
.runWith(Sink.seq(), system);
// #consume-events
}
}
4 changes: 1 addition & 3 deletions sse/src/test/scala/docs/scaladsl/EventSourceSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import akka.http.scaladsl.model.headers.`Last-Event-ID`
import akka.http.scaladsl.server.{Directives, Route}
import akka.pattern.pipe
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorMaterializer, ThrottleMode}
import akka.stream.ThrottleMode
import akka.testkit.SocketUtil
import akka.{Done, NotUsed}
import org.scalatest.BeforeAndAfterAll
Expand Down Expand Up @@ -85,7 +85,6 @@ object EventSourceSpec {
import context.dispatcher

private implicit val sys = context.system
private implicit val mat = ActorMaterializer()

context.system.scheduler.scheduleOnce(1.second, self, Bind)

Expand Down Expand Up @@ -143,7 +142,6 @@ final class EventSourceSpec extends AsyncWordSpec with Matchers with BeforeAndAf

private implicit val system = ActorSystem()
private implicit val ec = system.dispatcher
private implicit val mat = ActorMaterializer()

"EventSource" should {
"communicate correctly with an instable HTTP server" in {
Expand Down

0 comments on commit 342484e

Please sign in to comment.