From a1f2cf02b4c1d822a4b1b2c240d01f4e477fd9c2 Mon Sep 17 00:00:00 2001 From: Marc Rooding Date: Fri, 20 Nov 2020 12:34:04 +0100 Subject: [PATCH] Elasticsearch: Adapt to Akka 2.6 (#2529) --- .../elasticsearch/impl/ElasticsearchSourceStage.scala | 2 +- .../impl/backport/RetryFlowCoordinator.scala | 4 ++-- .../elasticsearch/javadsl/ElasticsearchSource.scala | 10 +++++----- .../elasticsearch/scaladsl/ElasticsearchFlow.scala | 2 +- .../elasticsearch/scaladsl/ElasticsearchSource.scala | 4 ++-- .../test/java/docs/javadsl/ElasticsearchTestBase.java | 6 +++--- .../impl/ElasticsearchSimpleFlowStageTest.scala | 4 ++-- .../test/scala/docs/scaladsl/ElasticsearchSpec.scala | 4 ++-- .../scala/docs/scaladsl/ElasticsearchSpecBase.scala | 4 ++-- .../scala/docs/scaladsl/ElasticsearchSpecUtils.scala | 2 +- 10 files changed, 21 insertions(+), 21 deletions(-) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala index 75de03ceca..0f5d57836a 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala @@ -260,7 +260,7 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( * then explicitly keeps the stage alive. [[clearScrollAsync()]] is responsible * for completing the stage. */ - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { clearScrollAsync() setKeepGoing(true) } diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/backport/RetryFlowCoordinator.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/backport/RetryFlowCoordinator.scala index cb18e58731..07473b26ff 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/backport/RetryFlowCoordinator.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/backport/RetryFlowCoordinator.scala @@ -86,9 +86,9 @@ import scala.concurrent.duration._ } } - override def onDownstreamFinish(): Unit = { + override def onDownstreamFinish(cause: Throwable): Unit = { if (elementInProgress.isEmpty) { - super.onDownstreamFinish() + super.onDownstreamFinish(cause) } else { // emit elements before finishing setKeepGoing(true) diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala index 9c038ca5fe..3423e3bf14 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSource.scala @@ -9,7 +9,7 @@ import akka.actor.ActorSystem import akka.http.scaladsl.{Http, HttpExt} import akka.stream.alpakka.elasticsearch.{impl, _} import akka.stream.javadsl.Source -import akka.stream.{ActorMaterializer, Attributes} +import akka.stream.{Attributes, Materializer} import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.databind.node.{ArrayNode, NumericNode} @@ -39,7 +39,7 @@ object ElasticsearchSource { settings: ElasticsearchSourceSettings, objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = Source - .setup { (mat: ActorMaterializer, _: Attributes) => + .fromMaterializer { (mat: Materializer, _: Attributes) => { implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() @@ -73,7 +73,7 @@ object ElasticsearchSource { settings: ElasticsearchSourceSettings, objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] = Source - .setup { (mat: ActorMaterializer, _: Attributes) => + .fromMaterializer { (mat: Materializer, _: Attributes) => { implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() @@ -111,7 +111,7 @@ object ElasticsearchSource { clazz: Class[T], objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] = Source - .setup { (mat: ActorMaterializer, _: Attributes) => + .fromMaterializer { (mat: Materializer, _: Attributes) => { implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() @@ -145,7 +145,7 @@ object ElasticsearchSource { clazz: Class[T], objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] = Source - .setup { (mat: ActorMaterializer, _: Attributes) => + .fromMaterializer { (mat: Materializer, _: Attributes) => { implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala index 0c3b97f25b..1830524315 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -208,7 +208,7 @@ object ElasticsearchFlow { settings: ElasticsearchWriteSettings, writer: MessageWriter[T]) = { Flow - .setup { (mat, _) => + .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() implicit val ec: ExecutionContextExecutor = mat.executionContext diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala index 3c7544f09b..9ede59bfd4 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSource.scala @@ -64,7 +64,7 @@ object ElasticsearchSource { searchParams: Map[String, String], settings: ElasticsearchSourceSettings): Source[ReadResult[JsObject], NotUsed] = Source - .setup { (mat, _) => + .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() implicit val ec: ExecutionContextExecutor = mat.executionContext @@ -103,7 +103,7 @@ object ElasticsearchSource { implicit sprayJsonReader: JsonReader[T] ): Source[ReadResult[T], NotUsed] = Source - .setup { (mat, _) => + .fromMaterializer { (mat, _) => implicit val system: ActorSystem = mat.system implicit val http: HttpExt = Http() implicit val ec: ExecutionContextExecutor = mat.executionContext diff --git a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java index 9628f7005e..72e40dfbcf 100644 --- a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java +++ b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTestBase.java @@ -8,7 +8,7 @@ import akka.http.javadsl.Http; import akka.http.javadsl.model.ContentTypes; import akka.http.javadsl.model.HttpRequest; -import akka.stream.ActorMaterializer; +import akka.stream.Materializer; import akka.stream.alpakka.elasticsearch.ApiVersion; import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings; import akka.stream.alpakka.elasticsearch.ElasticsearchParams; @@ -30,7 +30,7 @@ public class ElasticsearchTestBase { protected static ApiVersion apiVersion; protected static ElasticsearchConnectionSettings connectionSettings; protected static ActorSystem system; - protected static ActorMaterializer materializer; + protected static Materializer materializer; protected static Http http; // #define-class @@ -49,7 +49,7 @@ public Book(String title) { public static void setupBase() throws IOException { // #init-mat system = ActorSystem.create(); - materializer = ActorMaterializer.create(system); + materializer = Materializer.matFromSystem(system); // #init-mat http = Http.get(system); } diff --git a/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala b/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala index 6cdcd99520..a1a9b6c4ab 100644 --- a/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala +++ b/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSimpleFlowStageTest.scala @@ -7,7 +7,7 @@ package akka.stream.alpakka.elasticsearch.impl import akka.NotUsed import akka.actor.ActorSystem import akka.http.scaladsl.{Http, HttpExt} -import akka.stream.ActorMaterializer +import akka.stream.Materializer import akka.stream.alpakka.elasticsearch.{StringMessageWriter, _} import akka.stream.alpakka.testkit.scaladsl.LogCapturing import akka.stream.scaladsl.Keep @@ -25,7 +25,7 @@ class ElasticsearchSimpleFlowStageTest with BeforeAndAfterAll with LogCapturing { - implicit val mat: ActorMaterializer = ActorMaterializer() + implicit val mat: Materializer = Materializer(system) implicit val http: HttpExt = Http() val writer: StringMessageWriter = StringMessageWriter.getInstance diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala index e7623a2eee..5c63b1fc42 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala @@ -10,7 +10,7 @@ import akka.http.scaladsl.model.Uri.Path import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri} import akka.stream.alpakka.elasticsearch._ import akka.stream.alpakka.testkit.scaladsl.LogCapturing -import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.Materializer import akka.testkit.TestKit import org.scalatest.{BeforeAndAfterAll, Inspectors} import org.scalatest.concurrent.ScalaFutures @@ -29,7 +29,7 @@ class ElasticsearchSpec //#init-mat implicit val system: ActorSystem = ActorSystem() - implicit val materializer: Materializer = ActorMaterializer() + implicit val materializer: Materializer = Materializer(system) //#init-mat implicit val http: HttpExt = Http() diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecBase.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecBase.scala index a7bfa4f22a..67b19b3f29 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecBase.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecBase.scala @@ -7,7 +7,7 @@ package docs.scaladsl import akka.actor.ActorSystem import akka.http.scaladsl.{Http, HttpExt} import akka.stream.alpakka.testkit.scaladsl.LogCapturing -import akka.stream.{ActorMaterializer, Materializer} +import akka.stream.Materializer import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -24,7 +24,7 @@ trait ElasticsearchSpecBase //#init-mat implicit val system: ActorSystem = ActorSystem() - implicit val materializer: Materializer = ActorMaterializer() + implicit val materializer: Materializer = Materializer(system) //#init-mat implicit val http: HttpExt = Http() } diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala index f00c750e79..5b5377a170 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpecUtils.scala @@ -16,7 +16,7 @@ import akka.stream.alpakka.elasticsearch.{ ElasticsearchSourceSettings } import akka.stream.scaladsl.Sink -import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures} +import org.scalatest.concurrent.ScalaFutures import org.scalatest.wordspec.AnyWordSpec import scala.collection.immutable