Skip to content

Commit

Permalink
Elasticsearch: Adapt to Akka 2.6 (akka#2529)
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Rooding authored Nov 20, 2020
1 parent 583b123 commit a1f2cf0
Show file tree
Hide file tree
Showing 10 changed files with 21 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T](
* then explicitly keeps the stage alive. [[clearScrollAsync()]] is responsible
* for completing the stage.
*/
override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
clearScrollAsync()
setKeepGoing(true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ import scala.concurrent.duration._
}
}

override def onDownstreamFinish(): Unit = {
override def onDownstreamFinish(cause: Throwable): Unit = {
if (elementInProgress.isEmpty) {
super.onDownstreamFinish()
super.onDownstreamFinish(cause)
} else {
// emit elements before finishing
setKeepGoing(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import akka.actor.ActorSystem
import akka.http.scaladsl.{Http, HttpExt}
import akka.stream.alpakka.elasticsearch.{impl, _}
import akka.stream.javadsl.Source
import akka.stream.{ActorMaterializer, Attributes}
import akka.stream.{Attributes, Materializer}
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.node.{ArrayNode, NumericNode}

Expand Down Expand Up @@ -39,7 +39,7 @@ object ElasticsearchSource {
settings: ElasticsearchSourceSettings,
objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] =
Source
.setup { (mat: ActorMaterializer, _: Attributes) =>
.fromMaterializer { (mat: Materializer, _: Attributes) =>
{
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
Expand Down Expand Up @@ -73,7 +73,7 @@ object ElasticsearchSource {
settings: ElasticsearchSourceSettings,
objectMapper: ObjectMapper): Source[ReadResult[java.util.Map[String, Object]], NotUsed] =
Source
.setup { (mat: ActorMaterializer, _: Attributes) =>
.fromMaterializer { (mat: Materializer, _: Attributes) =>
{
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
Expand Down Expand Up @@ -111,7 +111,7 @@ object ElasticsearchSource {
clazz: Class[T],
objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] =
Source
.setup { (mat: ActorMaterializer, _: Attributes) =>
.fromMaterializer { (mat: Materializer, _: Attributes) =>
{
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
Expand Down Expand Up @@ -145,7 +145,7 @@ object ElasticsearchSource {
clazz: Class[T],
objectMapper: ObjectMapper): Source[ReadResult[T], NotUsed] =
Source
.setup { (mat: ActorMaterializer, _: Attributes) =>
.fromMaterializer { (mat: Materializer, _: Attributes) =>
{
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ object ElasticsearchFlow {
settings: ElasticsearchWriteSettings,
writer: MessageWriter[T]) = {
Flow
.setup { (mat, _) =>
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
implicit val ec: ExecutionContextExecutor = mat.executionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object ElasticsearchSource {
searchParams: Map[String, String],
settings: ElasticsearchSourceSettings): Source[ReadResult[JsObject], NotUsed] =
Source
.setup { (mat, _) =>
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
implicit val ec: ExecutionContextExecutor = mat.executionContext
Expand Down Expand Up @@ -103,7 +103,7 @@ object ElasticsearchSource {
implicit sprayJsonReader: JsonReader[T]
): Source[ReadResult[T], NotUsed] =
Source
.setup { (mat, _) =>
.fromMaterializer { (mat, _) =>
implicit val system: ActorSystem = mat.system
implicit val http: HttpExt = Http()
implicit val ec: ExecutionContextExecutor = mat.executionContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import akka.http.javadsl.Http;
import akka.http.javadsl.model.ContentTypes;
import akka.http.javadsl.model.HttpRequest;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.alpakka.elasticsearch.ApiVersion;
import akka.stream.alpakka.elasticsearch.ElasticsearchConnectionSettings;
import akka.stream.alpakka.elasticsearch.ElasticsearchParams;
Expand All @@ -30,7 +30,7 @@ public class ElasticsearchTestBase {
protected static ApiVersion apiVersion;
protected static ElasticsearchConnectionSettings connectionSettings;
protected static ActorSystem system;
protected static ActorMaterializer materializer;
protected static Materializer materializer;
protected static Http http;

// #define-class
Expand All @@ -49,7 +49,7 @@ public Book(String title) {
public static void setupBase() throws IOException {
// #init-mat
system = ActorSystem.create();
materializer = ActorMaterializer.create(system);
materializer = Materializer.matFromSystem(system);
// #init-mat
http = Http.get(system);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package akka.stream.alpakka.elasticsearch.impl
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.{Http, HttpExt}
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import akka.stream.alpakka.elasticsearch.{StringMessageWriter, _}
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.stream.scaladsl.Keep
Expand All @@ -25,7 +25,7 @@ class ElasticsearchSimpleFlowStageTest
with BeforeAndAfterAll
with LogCapturing {

implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val mat: Materializer = Materializer(system)
implicit val http: HttpExt = Http()

val writer: StringMessageWriter = StringMessageWriter.getInstance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import akka.http.scaladsl.model.Uri.Path
import akka.http.scaladsl.model.{HttpMethods, HttpRequest, Uri}
import akka.stream.alpakka.elasticsearch._
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.Materializer
import akka.testkit.TestKit
import org.scalatest.{BeforeAndAfterAll, Inspectors}
import org.scalatest.concurrent.ScalaFutures
Expand All @@ -29,7 +29,7 @@ class ElasticsearchSpec

//#init-mat
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val materializer: Materializer = Materializer(system)
//#init-mat
implicit val http: HttpExt = Http()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package docs.scaladsl
import akka.actor.ActorSystem
import akka.http.scaladsl.{Http, HttpExt}
import akka.stream.alpakka.testkit.scaladsl.LogCapturing
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.Materializer
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
Expand All @@ -24,7 +24,7 @@ trait ElasticsearchSpecBase

//#init-mat
implicit val system: ActorSystem = ActorSystem()
implicit val materializer: Materializer = ActorMaterializer()
implicit val materializer: Materializer = Materializer(system)
//#init-mat
implicit val http: HttpExt = Http()
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import akka.stream.alpakka.elasticsearch.{
ElasticsearchSourceSettings
}
import akka.stream.scaladsl.Sink
import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.wordspec.AnyWordSpec

import scala.collection.immutable
Expand Down

0 comments on commit a1f2cf0

Please sign in to comment.