From 8d9c95d85cf6c34aa3df8ab4f6f14179df630d66 Mon Sep 17 00:00:00 2001 From: Enno <458526+ennru@users.noreply.github.com> Date: Wed, 20 Feb 2019 09:47:13 +0100 Subject: [PATCH] Elasticsearch: Make flows type signature simpler to prepare for `withContext` (#1515) * Sort out logging and simpler test structure * Reduce logic in ES flow stage (drop batching, futures) * Extract error reason; simpler JSON building * Emit WriteResults, not sequences --- docs/src/main/paradox/elasticsearch.md | 4 +- .../alpakka/elasticsearch/WriteMessage.scala | 16 +- .../impl/ElasticsearchFlowStage.scala | 400 +++++++--------- .../javadsl/ElasticsearchFlow.scala | 40 +- .../javadsl/ElasticsearchSink.scala | 4 +- .../scaladsl/ElasticsearchFlow.scala | 49 +- .../scaladsl/ElasticsearchSink.scala | 2 +- .../java/docs/javadsl/ElasticsearchTest.java | 69 ++- .../src/test/resources/application.conf | 2 +- .../src/test/resources/log4j2-test.xml | 2 +- .../src/test/resources/logback-test.xml | 8 +- .../docs/scaladsl/ElasticsearchSpec.scala | 435 +++++++----------- 12 files changed, 453 insertions(+), 578 deletions(-) diff --git a/docs/src/main/paradox/elasticsearch.md b/docs/src/main/paradox/elasticsearch.md index f2ab0a7199..714856b543 100644 --- a/docs/src/main/paradox/elasticsearch.md +++ b/docs/src/main/paradox/elasticsearch.md @@ -118,7 +118,7 @@ Java | Parameter | Default | Description | | ------------------- | ------- | ------------------------------------------------------------------------------------------------------ | -| bufferSize | 10 | `ElasticsearchSink` puts messages by one bulk request per messages of this buffer size. | +| bufferSize | 10 | Flow and Sink batch messages to bulk requests when back-pressure applies. | | versionType | None | If set, `ElasticsearchSink` uses the chosen versionType to index documents. See [Version types](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types) for accepted settings. | | retryLogic | No retries | See below | @@ -126,7 +126,7 @@ Java A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a `RetryLogic` can be specified. The provided implementation is `RetryAtFixedRate`. @@@ warning -If using retries, you will receive messages out of order downstream in cases where elastic returns an error one some of the documents in a bulk request. +If using retries, you will receive messages **out of order downstream** in cases when Elasticsearch returns an error on some of the documents in a bulk request. @@@ diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteMessage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteMessage.scala index 723e269c41..ef731ec9fa 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteMessage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/WriteMessage.scala @@ -14,7 +14,9 @@ import scala.compat.java8.OptionConverters._ * INTERNAL API */ @InternalApi -private[elasticsearch] sealed abstract class Operation(val command: String) +private[elasticsearch] sealed abstract class Operation(val command: String) { + override def toString: String = command +} /** * INTERNAL API @@ -132,12 +134,22 @@ object WriteMessage { * [[akka.stream.alpakka.elasticsearch.testkit.MessageFactory]]. */ final class WriteResult[T2, C2] @InternalApi private[elasticsearch] (val message: WriteMessage[T2, C2], + /** JSON structure of the Elasticsearch error. */ val error: Option[String]) { val success: Boolean = error.isEmpty - /** Java API */ + /** Java API: JSON structure of the Elasticsearch error. */ def getError: java.util.Optional[String] = error.asJava + /** `reason` field value of the Elasticsearch error. */ + def errorReason: Option[String] = { + import spray.json._ + error.flatMap(_.parseJson.asJsObject.fields.get("reason").map(_.asInstanceOf[JsString].value)) + } + + /** Java API: `reason` field value from the Elasticsearch error */ + def getErrorReason: java.util.Optional[String] = errorReason.asJava + override def toString = s"""WriteResult(message=$message,error=$error)""" diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala index 642b4a0366..a9910dee82 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchFlowStage.scala @@ -18,8 +18,7 @@ import org.apache.http.util.EntityUtils import org.elasticsearch.client.{Response, ResponseListener, RestClient} import spray.json._ -import scala.collection.mutable -import scala.concurrent.Future +import scala.collection.immutable /** * INTERNAL API @@ -31,252 +30,215 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C]( client: RestClient, settings: ElasticsearchWriteSettings, writer: MessageWriter[T] -) extends GraphStage[FlowShape[WriteMessage[T, C], Future[Seq[WriteResult[T, C]]]]] { +) extends GraphStage[FlowShape[immutable.Seq[WriteMessage[T, C]], immutable.Seq[WriteResult[T, C]]]] { require(indexName != null, "You must define an index name") require(typeName != null, "You must define a type name") - private val in = Inlet[WriteMessage[T, C]]("messages") - private val out = Outlet[Future[Seq[WriteResult[T, C]]]]("result") + private val in = Inlet[immutable.Seq[WriteMessage[T, C]]]("messages") + private val out = Outlet[immutable.Seq[WriteResult[T, C]]]("result") override val shape = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new StageLogic() - private var state: State = Idle - private val queue = new mutable.Queue[WriteMessage[T, C]]() - private val failureHandler = getAsyncCallback[(Seq[WriteMessage[T, C]], Throwable)](handleFailure) - private val responseHandler = getAsyncCallback[(Seq[WriteMessage[T, C]], Response)](handleResponse) - private var failedMessages: Seq[WriteMessage[T, C]] = Nil - private var retryCount: Int = 0 + private class StageLogic extends TimerGraphStageLogic(shape) with InHandler with OutHandler with StageLogging { - override def preStart(): Unit = - pull(in) + private val typeNameTuple = "_type" -> JsString(typeName) + private val versionTypeTuple: Option[(String, JsString)] = settings.versionType.map { versionType => + "version_type" -> JsString(versionType) + } - private def tryPull(): Unit = - if (queue.size < settings.bufferSize && !isClosed(in) && !hasBeenPulled(in)) { - pull(in) - } + private var upstreamFinished = false + private var inflight = 0 - override def onTimer(timerKey: Any): Unit = { - sendBulkUpdateRequest(failedMessages) - failedMessages = Nil + private val failureHandler = getAsyncCallback[(immutable.Seq[WriteMessage[T, C]], Throwable)](handleFailure) + private val responseHandler = getAsyncCallback[(immutable.Seq[WriteMessage[T, C]], Response)](handleResponse) + private var failedMessages: immutable.Seq[WriteMessage[T, C]] = Nil + private var retryCount: Int = 0 + + private def tryPull(): Unit = + if (!isClosed(in) && !hasBeenPulled(in)) { + pull(in) } - private def handleFailure(args: (Seq[WriteMessage[T, C]], Throwable)): Unit = { - val (messages, exception) = args - if (!settings.retryLogic.shouldRetry(retryCount, List(exception.toString))) { - log.error("Received error from elastic. Giving up after {} tries. {}, Error: {}", + override def onTimer(timerKey: Any): Unit = { + if (log.isDebugEnabled) log.debug("retrying inflight={} {}", inflight, failedMessages) + sendBulkUpdateRequest(failedMessages) + failedMessages = Nil + } + + private def handleFailure(args: (immutable.Seq[WriteMessage[T, C]], Throwable)): Unit = { + val (messages, exception) = args + if (!settings.retryLogic.shouldRetry(retryCount, List(exception.toString))) { + log.error("Received error from elastic. Giving up after {} tries. {}, Error: {}", + retryCount, + settings.retryLogic, + exception) + failStage(exception) + } else { + log.warning("Received error from elastic. Try number {}. {}, Error: {}", retryCount, settings.retryLogic, exception) - failStage(exception) - } else { - log.warning("Received error from elastic. Try number {}. {}, Error: {}", - retryCount, - settings.retryLogic, - exception) - retryCount = retryCount + 1 - failedMessages = messages - scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount)) - } - } - - private def handleSuccess(): Unit = - completeStage() - - private def handleResponse(args: (Seq[WriteMessage[T, C]], Response)): Unit = { - val (messages, response) = args - val responseJson = EntityUtils.toString(response.getEntity).parseJson - - // If some commands in bulk request failed, pass failed messages to follows. - val items = responseJson.asJsObject.fields("items").asInstanceOf[JsArray] - val messageResults: Seq[WriteResult[T, C]] = items.elements.zip(messages).map { - case (item, message) => - val command = message.operation.command - val res = item.asJsObject.fields(command).asJsObject - val error: Option[String] = res.fields.get("error").map(_.toString()) - new WriteResult(message, error) - } - - val failedMsgs = messageResults.filterNot(_.error.isEmpty) - - if (failedMsgs.nonEmpty && settings.retryLogic.shouldRetry(retryCount, failedMsgs.map(_.error.get).toList)) { - retryPartialFailedMessages(messageResults, failedMsgs) - } else { - forwardAllResults(messageResults) - } - } - - private def retryPartialFailedMessages( - messageResults: Seq[WriteResult[T, C]], - failedMsgs: Seq[WriteResult[T, C]] - ): Unit = { - // Retry partial failed messages - // NOTE: When we partially return message like this, message will arrive out of order downstream - // and it can break commit-logic when using Kafka retryCount = retryCount + 1 - failedMessages = failedMsgs.map(_.message) // These are the messages we're going to retry + failedMessages = messages scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount)) - - val successMsgs = messageResults.filter(_.error.isEmpty) - if (successMsgs.nonEmpty) { - // push the messages that DID succeed - emit(out, Future.successful(successMsgs)) - } } + } - private def forwardAllResults(messageResults: Seq[WriteResult[T, C]]): Unit = { - retryCount = 0 // Clear retryCount - - // Push result - emit(out, Future.successful(messageResults)) - - // Fetch next messages from queue and send them - val nextMessages = (1 to settings.bufferSize).flatMap { _ => - queue.dequeueFirst(_ => true) - } - - if (nextMessages.isEmpty) { - state match { - case Finished => handleSuccess() - case _ => state = Idle - } - } else { - sendBulkUpdateRequest(nextMessages) - } + private def handleResponse(args: (immutable.Seq[WriteMessage[T, C]], Response)): Unit = { + val (messages, response) = args + val responseJson = EntityUtils.toString(response.getEntity).parseJson + if (log.isDebugEnabled) log.debug("response {}", responseJson.prettyPrint) + + // If some commands in bulk request failed, pass failed messages to follows. + val items = responseJson.asJsObject.fields("items").asInstanceOf[JsArray] + val messageResults: immutable.Seq[WriteResult[T, C]] = items.elements.zip(messages).map { + case (item, message) => + val command = message.operation.command + val res = item.asJsObject.fields(command).asJsObject + val error: Option[String] = res.fields.get("error").map(_.toString()) + new WriteResult(message, error) } - private def sendBulkUpdateRequest(messages: Seq[WriteMessage[T, C]]): Unit = { - val json = messages - .map { message => - val indexNameToUse: String = message.indexName.getOrElse(indexName) - val additionalMetadata = message.customMetadata.map { case (field, value) => field -> JsString(value) } + val failedMsgs = messageResults.filterNot(_.error.isEmpty) - JsObject(message.operation match { - case Index => - "index" -> JsObject( - (Seq( - Option("_index" -> JsString(indexNameToUse)), - Option("_type" -> JsString(typeName)), - message.version.map { version => - "_version" -> JsNumber(version) - }, - settings.versionType.map { versionType => - "version_type" -> JsString(versionType) - }, - message.id.map { id => - "_id" -> JsString(id) - } - ).flatten ++ additionalMetadata): _* - ) - case Create => - "create" -> JsObject( - (Seq( - Option("_index" -> JsString(indexNameToUse)), - Option("_type" -> JsString(typeName)), - message.id.map { id => - "_id" -> JsString(id) - } - ).flatten ++ additionalMetadata): _* - ) - case Update | Upsert => - "update" -> JsObject( - (Seq( - Option("_index" -> JsString(indexNameToUse)), - Option("_type" -> JsString(typeName)), - message.version.map { version => - "_version" -> JsNumber(version) - }, - settings.versionType.map { versionType => - "version_type" -> JsString(versionType) - }, - Option("_id" -> JsString(message.id.get)) - ).flatten ++ additionalMetadata): _* - ) - case Delete => - "delete" -> JsObject( - (Seq( - Option("_index" -> JsString(indexNameToUse)), - Option("_type" -> JsString(typeName)), - message.version.map { version => - "_version" -> JsNumber(version) - }, - settings.versionType.map { versionType => - "version_type" -> JsString(versionType) - }, - Option("_id" -> JsString(message.id.get)) - ).flatten ++ additionalMetadata): _* - ) - }).toString + messageToJsonString(message) - } - .mkString("", "\n", "\n") - - log.debug("Posting data to Elasticsearch: {}", json) - - client.performRequestAsync( - "POST", - "/_bulk", - java.util.Collections.emptyMap[String, String](), - new StringEntity(json, StandardCharsets.UTF_8), - new ResponseListener() { - override def onFailure(exception: Exception): Unit = - failureHandler.invoke((messages, exception)) - override def onSuccess(response: Response): Unit = - responseHandler.invoke((messages, response)) - }, - new BasicHeader("Content-Type", "application/x-ndjson") - ) + if (failedMsgs.nonEmpty && settings.retryLogic.shouldRetry(retryCount, failedMsgs.map(_.error.get).toList)) { + retryPartialFailedMessages(messageResults, failedMsgs) + } else { + retryCount = 0 + emitResults(messageResults) } + } - private def messageToJsonString(message: WriteMessage[T, C]): String = - message.operation match { - case Index | Create => - "\n" + writer.convert(message.source.get) - case Upsert => - "\n" + JsObject( - "doc" -> writer.convert(message.source.get).parseJson, - "doc_as_upsert" -> JsTrue - ).toString - case Update => - "\n" + JsObject( - "doc" -> writer.convert(message.source.get).parseJson - ).toString - case Delete => - "" - } - - setHandlers(in, out, this) - - override def onPull(): Unit = tryPull() + private def retryPartialFailedMessages( + messageResults: immutable.Seq[WriteResult[T, C]], + failedMsgs: immutable.Seq[WriteResult[T, C]] + ): Unit = { + if (log.isDebugEnabled) log.debug("retryPartialFailedMessages inflight={} {}", inflight, failedMsgs) + // Retry partial failed messages + // NOTE: When we partially return message like this, message will arrive out of order downstream + // and it can break commit-logic when using Kafka + retryCount = retryCount + 1 + failedMessages = failedMsgs.map(_.message) // These are the messages we're going to retry + scheduleOnce(RetrySend, settings.retryLogic.nextRetry(retryCount)) + + val successMsgs = messageResults.filter(_.error.isEmpty) + if (successMsgs.nonEmpty) { + // push the messages that DID succeed + emitResults(successMsgs) + } + } - override def onPush(): Unit = { - val message = grab(in) - queue.enqueue(message) + private def emitResults(successMsgs: immutable.Seq[WriteResult[T, C]]): Unit = { + emit(out, successMsgs) + tryPull() + inflight -= successMsgs.size + if (upstreamFinished && inflight == 0) completeStage() + } - state match { - case Idle => { - state = Sending - val messages = (1 to settings.bufferSize).flatMap { _ => - queue.dequeueFirst(_ => true) - } - sendBulkUpdateRequest(messages) + private def sendBulkUpdateRequest(messages: immutable.Seq[WriteMessage[T, C]]): Unit = { + val json = messages + .map { message => + val sharedFields: Seq[(String, JsString)] = Seq( + "_index" -> JsString(message.indexName.getOrElse(indexName)), + typeNameTuple + ) ++ message.customMetadata.map { case (field, value) => field -> JsString(value) } + val tuple: (String, JsObject) = message.operation match { + case Index => + val fields = Seq( + message.version.map { version => + "_version" -> JsNumber(version) + }, + versionTypeTuple, + message.id.map { id => + "_id" -> JsString(id) + } + ).flatten + "index" -> JsObject( + (sharedFields ++ fields): _* + ) + case Create => + val fields = Seq( + message.id.map { id => + "_id" -> JsString(id) + } + ).flatten + "create" -> JsObject( + (sharedFields ++ fields): _* + ) + case Update | Upsert => + val fields = Seq( + message.version.map { version => + "_version" -> JsNumber(version) + }, + versionTypeTuple, + Option("_id" -> JsString(message.id.get)) + ).flatten + "update" -> JsObject( + (sharedFields ++ fields): _* + ) + case Delete => + val fields = Seq( + message.version.map { version => + "_version" -> JsNumber(version) + }, + versionTypeTuple, + Option("_id" -> JsString(message.id.get)) + ).flatten + "delete" -> JsObject( + (sharedFields ++ fields): _* + ) } - case _ => () + JsObject(tuple).compactPrint + messageToJsonString(message) } + .mkString("", "\n", "\n") + + log.debug("Posting data to Elasticsearch: {}", json) + + client.performRequestAsync( + "POST", + "/_bulk", + java.util.Collections.emptyMap[String, String](), + new StringEntity(json, StandardCharsets.UTF_8), + new ResponseListener() { + override def onFailure(exception: Exception): Unit = failureHandler.invoke((messages, exception)) + override def onSuccess(response: Response): Unit = responseHandler.invoke((messages, response)) + }, + new BasicHeader("Content-Type", "application/x-ndjson") + ) + } - tryPull() + private def messageToJsonString(message: WriteMessage[T, C]): String = + message.operation match { + case Index | Create => + "\n" + writer.convert(message.source.get) + case Upsert => + "\n" + JsObject( + "doc" -> writer.convert(message.source.get).parseJson, + "doc_as_upsert" -> JsTrue + ).toString + case Update => + "\n" + JsObject( + "doc" -> writer.convert(message.source.get).parseJson + ).toString + case Delete => + "" } - override def onUpstreamFailure(exception: Throwable): Unit = - failStage(exception) + setHandlers(in, out, this) - override def onUpstreamFinish(): Unit = - state match { - case Idle => handleSuccess() - case Sending => state = Finished - case Finished => () - } + override def onPull(): Unit = tryPull() + + override def onPush(): Unit = { + val messages = grab(in) + inflight += messages.size + sendBulkUpdateRequest(messages) } + + override def onUpstreamFinish(): Unit = + if (inflight == 0) completeStage() + else upstreamFinished = true + } } /** @@ -286,10 +248,4 @@ private[elasticsearch] final class ElasticsearchFlowStage[T, C]( private[elasticsearch] object ElasticsearchFlowStage { private object RetrySend - - private sealed trait State - private case object Idle extends State - private case object Sending extends State - private case object Finished extends State - } diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala index 8518617854..4dd1132a37 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchFlow.scala @@ -6,21 +6,25 @@ package akka.stream.alpakka.elasticsearch.javadsl import akka.NotUsed import akka.stream.alpakka.elasticsearch._ -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl import com.fasterxml.jackson.databind.ObjectMapper import org.elasticsearch.client.RestClient -import scala.collection.JavaConverters._ - import akka.stream.alpakka.elasticsearch.impl +import scala.collection.immutable + /** * Java API to create Elasticsearch flows. */ object ElasticsearchFlow { /** - * Creates a [[akka.stream.javadsl.Flow]] for type `T` from [[WriteMessage]] to lists of [[WriteResult]]. + * Create a flow to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T`. + * The result status is port of the [[akka.stream.alpakka.elasticsearch.WriteResult WriteResult]] and must be checked for + * successful execution. + * + * Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected. */ def create[T]( indexName: String, @@ -28,22 +32,27 @@ object ElasticsearchFlow { settings: ElasticsearchWriteSettings, client: RestClient, objectMapper: ObjectMapper - ): akka.stream.javadsl.Flow[WriteMessage[T, NotUsed], java.util.List[WriteResult[T, NotUsed]], NotUsed] = - Flow - .fromGraph( + ): akka.stream.javadsl.Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = + scaladsl + .Flow[WriteMessage[T, NotUsed]] + .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } + .via( new impl.ElasticsearchFlowStage[T, NotUsed](indexName, typeName, client, settings, new JacksonWriter[T](objectMapper)) ) - .mapAsync(1)(identity) - .map(x => x.asJava) + .mapConcat(identity) .asJava /** - * Creates a [[akka.stream.javadsl.Flow]] for type `T` from [[WriteMessage]] to lists of [[WriteResult]] + * Create a flow to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T` * with `passThrough` of type `C`. + * The result status is port of the [[akka.stream.alpakka.elasticsearch.WriteResult WriteResult]] and must be checked for + * successful execution. + * + * Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected. */ def createWithPassThrough[T, C]( indexName: String, @@ -51,13 +60,14 @@ object ElasticsearchFlow { settings: ElasticsearchWriteSettings, client: RestClient, objectMapper: ObjectMapper - ): akka.stream.javadsl.Flow[WriteMessage[T, C], java.util.List[WriteResult[T, C]], NotUsed] = - Flow - .fromGraph( + ): akka.stream.javadsl.Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = + scaladsl + .Flow[WriteMessage[T, C]] + .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } + .via( new impl.ElasticsearchFlowStage[T, C](indexName, typeName, client, settings, new JacksonWriter[T](objectMapper)) ) - .mapAsync(1)(identity) - .map(x => x.asJava) + .mapConcat(identity) .asJava private final class JacksonWriter[T](mapper: ObjectMapper) extends MessageWriter[T] { diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala index 9cec8045ca..7809410156 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/javadsl/ElasticsearchSink.scala @@ -18,7 +18,7 @@ import org.elasticsearch.client.RestClient object ElasticsearchSink { /** - * Creates a [[akka.stream.javadsl.Sink]] to Elasticsearch for [[WriteMessage]] containing type `T`. + * Create a sink to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T`. */ def create[T]( indexName: String, @@ -29,6 +29,6 @@ object ElasticsearchSink { ): akka.stream.javadsl.Sink[WriteMessage[T, NotUsed], CompletionStage[Done]] = ElasticsearchFlow .create(indexName, typeName, settings, client, objectMapper) - .toMat(Sink.ignore[java.util.List[WriteResult[T, NotUsed]]], Keep.right[NotUsed, CompletionStage[Done]]) + .toMat(Sink.ignore[WriteResult[T, NotUsed]], Keep.right[NotUsed, CompletionStage[Done]]) } diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala index 24ea03dd2e..5d75cbdf76 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchFlow.scala @@ -10,6 +10,7 @@ import akka.stream.alpakka.elasticsearch.impl import akka.stream.scaladsl.Flow import org.elasticsearch.client.RestClient import spray.json._ +import scala.collection.immutable /** * Scala API to create Elasticsearch flows. @@ -17,26 +18,33 @@ import spray.json._ object ElasticsearchFlow { /** - * Creates a [[akka.stream.scaladsl.Flow]] for type `T` from [[WriteMessage]] to sequences - * of [[WriteResult]]. + * Create a flow to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T`. + * The result status is port of the [[akka.stream.alpakka.elasticsearch.WriteResult WriteResult]] and must be checked for + * successful execution. + * + * Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected. */ def create[T](indexName: String, typeName: String, settings: ElasticsearchWriteSettings = ElasticsearchWriteSettings.Default)( implicit client: RestClient, writer: JsonWriter[T] - ): Flow[WriteMessage[T, NotUsed], Seq[WriteResult[T, NotUsed]], NotUsed] = + ): Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = create[T](indexName, typeName, settings, new SprayJsonWriter[T]()(writer)) /** - * Creates a [[akka.stream.scaladsl.Flow]] for type `T` from [[WriteMessage]] to sequences - * of [[WriteResult]]. + * Create a flow to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T`. + * The result status is port of the [[akka.stream.alpakka.elasticsearch.WriteResult WriteResult]] and must be checked for + * successful execution. + * + * Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected. */ def create[T](indexName: String, typeName: String, settings: ElasticsearchWriteSettings, writer: MessageWriter[T])( implicit client: RestClient - ): Flow[WriteMessage[T, NotUsed], Seq[WriteResult[T, NotUsed]], NotUsed] = - Flow - .fromGraph( + ): Flow[WriteMessage[T, NotUsed], WriteResult[T, NotUsed], NotUsed] = + Flow[WriteMessage[T, NotUsed]] + .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } + .via( new impl.ElasticsearchFlowStage[T, NotUsed]( indexName, typeName, @@ -45,32 +53,41 @@ object ElasticsearchFlow { writer ) ) - .mapAsync(1)(identity) + .mapConcat(identity) /** - * Creates a [[akka.stream.scaladsl.Flow]] for type `T` from [[WriteMessage]] to lists of [[WriteResult]] + * Create a flow to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T` * with `passThrough` of type `C`. + * The result status is port of the [[akka.stream.alpakka.elasticsearch.WriteResult WriteResult]] and must be checked for + * successful execution. + * + * Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected. */ def createWithPassThrough[T, C](indexName: String, typeName: String, settings: ElasticsearchWriteSettings = ElasticsearchWriteSettings.Default)( implicit client: RestClient, writer: JsonWriter[T] - ): Flow[WriteMessage[T, C], Seq[WriteResult[T, C]], NotUsed] = + ): Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = createWithPassThrough[T, C](indexName, typeName, settings, new SprayJsonWriter[T]()(writer)) /** - * Creates a [[akka.stream.scaladsl.Flow]] for type `T` from [[WriteMessage]] to lists of [[WriteResult]] + * Create a flow to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T` * with `passThrough` of type `C`. + * The result status is port of the [[akka.stream.alpakka.elasticsearch.WriteResult WriteResult]] and must be checked for + * successful execution. + * + * Warning: When settings configure retrying, messages are emitted out-of-order when errors are detected. */ def createWithPassThrough[T, C](indexName: String, typeName: String, settings: ElasticsearchWriteSettings, writer: MessageWriter[T])( implicit client: RestClient - ): Flow[WriteMessage[T, C], Seq[WriteResult[T, C]], NotUsed] = - Flow - .fromGraph( + ): Flow[WriteMessage[T, C], WriteResult[T, C], NotUsed] = + Flow[WriteMessage[T, C]] + .batch(settings.bufferSize, immutable.Seq(_)) { case (seq, wm) => seq :+ wm } + .via( new impl.ElasticsearchFlowStage[T, C]( indexName, typeName, @@ -79,7 +96,7 @@ object ElasticsearchFlow { writer ) ) - .mapAsync(1)(identity) + .mapConcat(identity) private final class SprayJsonWriter[T](implicit writer: JsonWriter[T]) extends MessageWriter[T] { override def convert(message: T): String = message.toJson.toString() diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala index 10e7458b7e..130fb031ca 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSink.scala @@ -18,7 +18,7 @@ import scala.concurrent.Future object ElasticsearchSink { /** - * Creates a [[akka.stream.scaladsl.Sink]] to Elasticsearch for [[WriteMessage]] containing type `T`. + * Create a sink to update Elasticsearch with [[akka.stream.alpakka.elasticsearch.WriteMessage WriteMessage]]s containing type `T`. */ def create[T](indexName: String, typeName: String, diff --git a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java index 7a1832ad5e..530bf814df 100644 --- a/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java +++ b/elasticsearch/src/test/java/docs/javadsl/ElasticsearchTest.java @@ -34,6 +34,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.junit.Assert.assertEquals; @@ -230,7 +231,7 @@ public void typedStream() throws Exception { public void flow() throws Exception { // Copy source/book to sink3/book through JsObject stream // #run-flow - CompletionStage>>> f1 = + CompletionStage>> f1 = ElasticsearchSource.typed( "source", "_doc", @@ -249,11 +250,11 @@ public void flow() throws Exception { .runWith(Sink.seq(), materializer); // #run-flow - List>> result1 = f1.toCompletableFuture().get(); + List> result1 = f1.toCompletableFuture().get(); flush("sink3"); - for (List> aResult1 : result1) { - assertEquals(true, aResult1.get(0).success()); + for (WriteResult aResult1 : result1) { + assertEquals(true, aResult1.success()); } // Assert docs in sink3/book @@ -341,40 +342,35 @@ public void testKafkaExample() throws Exception { final KafkaCommitter kafkaCommitter = new KafkaCommitter(); - Source.from(messagesFromKafka) // Assume we get this from Kafka - .map( - kafkaMessage -> { - Book book = kafkaMessage.book; - String id = book.title; - - // Transform message so that we can write to elastic - return WriteMessage.createIndexMessage(id, book).withPassThrough(kafkaMessage.offset); - }) - .via( // write to elastic - ElasticsearchFlow.createWithPassThrough( - "sink6", - "_doc", - ElasticsearchWriteSettings.create().withBufferSize(5), - client, - new ObjectMapper())) - .map( - messageResults -> { - messageResults - .stream() - .forEach( - result -> { - if (!result.success()) - throw new RuntimeException("Failed to write message to elastic"); - // Commit to kafka - kafkaCommitter.commit(result.message().passThrough()); - }); - return NotUsed.getInstance(); - }) - .runWith(Sink.seq(), materializer) // Run it - .toCompletableFuture() - .get(); // Wait for it to complete + CompletionStage kafkaToEs = + Source.from(messagesFromKafka) // Assume we get this from Kafka + .map( + kafkaMessage -> { + Book book = kafkaMessage.book; + String id = book.title; + // Transform message so that we can write to elastic + return WriteMessage.createIndexMessage(id, book) + .withPassThrough(kafkaMessage.offset); + }) + .via( // write to elastic + ElasticsearchFlow.createWithPassThrough( + "sink6", + "_doc", + ElasticsearchWriteSettings.create().withBufferSize(5), + client, + new ObjectMapper())) + .map( + result -> { + if (!result.success()) + throw new RuntimeException("Failed to write message to elastic"); + // Commit to kafka + kafkaCommitter.commit(result.message().passThrough()); + return NotUsed.getInstance(); + }) + .runWith(Sink.ignore(), materializer); // #kafka-example + kafkaToEs.toCompletableFuture().get(5, TimeUnit.SECONDS); // Wait for it to complete flush("sink6"); // Make sure all messages was committed to kafka @@ -470,7 +466,6 @@ public void testUsingVersions() throws Exception { .toCompletableFuture() .get() .get(0) - .get(0) .success(); assertEquals(false, success); diff --git a/elasticsearch/src/test/resources/application.conf b/elasticsearch/src/test/resources/application.conf index b623cf2f76..7bdde8d7f2 100644 --- a/elasticsearch/src/test/resources/application.conf +++ b/elasticsearch/src/test/resources/application.conf @@ -1,4 +1,4 @@ -akka.stream.akka { +akka { loggers = ["akka.event.slf4j.Slf4jLogger"] logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" loglevel = "DEBUG" diff --git a/elasticsearch/src/test/resources/log4j2-test.xml b/elasticsearch/src/test/resources/log4j2-test.xml index 982e01ccc3..8642e20fc5 100644 --- a/elasticsearch/src/test/resources/log4j2-test.xml +++ b/elasticsearch/src/test/resources/log4j2-test.xml @@ -3,7 +3,7 @@ - + diff --git a/elasticsearch/src/test/resources/logback-test.xml b/elasticsearch/src/test/resources/logback-test.xml index 75e8ef8734..2df68abc50 100644 --- a/elasticsearch/src/test/resources/logback-test.xml +++ b/elasticsearch/src/test/resources/logback-test.xml @@ -7,7 +7,13 @@ + + + + + + - \ No newline at end of file + diff --git a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala index a11c861378..9b0ff2a1da 100644 --- a/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala +++ b/elasticsearch/src/test/scala/docs/scaladsl/ElasticsearchSpec.scala @@ -4,9 +4,10 @@ package docs.scaladsl +import java.util.Collections import java.util.concurrent.TimeUnit -import akka.NotUsed +import akka.{Done, NotUsed} import akka.actor.ActorSystem import akka.stream.{ActorMaterializer, Materializer} import akka.stream.alpakka.elasticsearch.scaladsl._ @@ -18,13 +19,17 @@ import akka.testkit.TestKit import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.codelibs.elasticsearch.runner.ElasticsearchClusterRunner +import org.scalatest.concurrent.ScalaFutures import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} import scala.collection.JavaConverters._ -import scala.concurrent.Await -import scala.concurrent.duration.{Duration, _} +import scala.collection.immutable +import scala.concurrent.Future +import scala.concurrent.duration._ -class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { +class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll with ScalaFutures { + + private implicit val patience = PatienceConfig(10.seconds) private val runner = new ElasticsearchClusterRunner() @@ -59,6 +64,13 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) runner.ensureYellow() + def register(indexName: String, title: String): Unit = + client.performRequest("POST", + s"$indexName/_doc", + Map[String, String]().asJava, + new StringEntity(s"""{"title": "$title"}"""), + new BasicHeader("Content-Type", "application/json")) + register("source", "Akka in Action") register("source", "Programming in Scala") register("source", "Learning Scala") @@ -82,8 +94,8 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { private def createStrictMapping(indexName: String): Unit = client.performRequest( "PUT", - s"$indexName", - Map[String, String]().asJava, + indexName, + Collections.emptyMap[String, String], new StringEntity(s"""{ | "mappings": { | "_doc": { @@ -98,18 +110,11 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { new BasicHeader("Content-Type", "application/json") ) - private def register(indexName: String, title: String): Unit = - client.performRequest("POST", - s"$indexName/_doc", - Map[String, String]().asJava, - new StringEntity(s"""{"title": "$title"}"""), - new BasicHeader("Content-Type", "application/json")) - private def documentation: Unit = { //#source-settings val sourceSettings = ElasticsearchSourceSettings() .withBufferSize(10) - .withScrollDuration(FiniteDuration(5, TimeUnit.MINUTES)) + .withScrollDuration(5.minutes) //#source-settings //#sink-settings val sinkSettings = @@ -120,7 +125,19 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { //#sink-settings } - "ElasticsearchSourceSettings" should { + private def readTitlesFrom(indexName: String): Future[immutable.Seq[String]] = + ElasticsearchSource + .typed[Book]( + indexName, + "_doc", + """{"match_all": {}}""" + ) + .map { message => + message.source.title + } + .runWith(Sink.seq) + + "Source Settings" should { "convert scrollDuration value to correct scroll string value (Days)" in { val sourceSettings = ElasticsearchSourceSettings() .withScrollDuration(FiniteDuration(5, TimeUnit.DAYS)) @@ -167,9 +184,9 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { "Un-typed Elasticsearch connector" should { "consume and publish Json documents" in assertAllStagesStopped { - // Copy source/_doc to sink2/_doc through typed stream + val indexName = "sink2" //#run-jsobject - val f1 = ElasticsearchSource + val copy = ElasticsearchSource .create( indexName = "source", typeName = "_doc", @@ -181,31 +198,16 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } .runWith( ElasticsearchSink.create[Book]( - indexName = "sink2", + indexName, typeName = "_doc" ) ) //#run-jsobject - Await.result(f1, 10.seconds) - - flush("sink2") - - // Assert docs in sink2/_doc - val f2 = ElasticsearchSource - .typed[Book]( - "sink2", - "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result = Await.result(f2, 10.seconds) + copy.futureValue shouldBe Done + flush(indexName) - result.sorted shouldEqual Seq( + readTitlesFrom(indexName).futureValue should contain allElementsOf Seq( "Akka Concurrency", "Akka in Action", "Effective Akka", @@ -219,9 +221,9 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { "Typed Elasticsearch connector" should { "consume and publish documents as specific type" in assertAllStagesStopped { - // Copy source/_doc to sink2/_doc through typed stream + val indexName = "sink2" //#run-typed - val f1 = ElasticsearchSource + val copy = ElasticsearchSource .typed[Book]( indexName = "source", typeName = "_doc", @@ -232,31 +234,16 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } .runWith( ElasticsearchSink.create[Book]( - indexName = "sink2", + indexName, typeName = "_doc" ) ) //#run-typed - Await.result(f1, 10.seconds) - - flush("sink2") - - // Assert docs in sink2/_doc - val f2 = ElasticsearchSource - .typed[Book]( - "sink2", - "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result = Await.result(f2, 10.seconds) + copy.futureValue shouldBe Done + flush(indexName) - result.sorted shouldEqual Seq( + readTitlesFrom(indexName).futureValue should contain allElementsOf Seq( "Akka Concurrency", "Akka in Action", "Effective Akka", @@ -270,9 +257,9 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { "ElasticsearchFlow" should { "store documents and pass failed documents to downstream" in assertAllStagesStopped { - // Copy source/_doc to sink3/_doc through typed stream + val indexName = "sink3" //#run-flow - val f1 = ElasticsearchSource + val copy = ElasticsearchSource .typed[Book]( indexName = "source", typeName = "_doc", @@ -283,34 +270,18 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } .via( ElasticsearchFlow.create[Book]( - indexName = "sink3", + indexName = indexName, typeName = "_doc" ) ) .runWith(Sink.seq) //#run-flow - val result1 = Await.result(f1, 10.seconds) - flush("sink3") - // Assert no errors - assert(result1.forall(!_.exists(_.success == false))) - - // Assert docs in sink3/_doc - val f2 = ElasticsearchSource - .typed[Book]( - "sink3", - "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result2 = Await.result(f2, 10.seconds) + copy.futureValue.filter(!_.success) shouldBe empty + flush(indexName) - result2.sorted shouldEqual Seq( + readTitlesFrom(indexName).futureValue.sorted shouldEqual Seq( "Akka Concurrency", "Akka in Action", "Effective Akka", @@ -322,60 +293,47 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "not post invalid encoded JSON" in assertAllStagesStopped { - - val books = Seq( + val books = immutable.Seq( "Akka in Action", "Akka \u00DF Concurrency" ) - val f1 = Source(books.zipWithIndex.toVector) + val indexName = "sink4" + val createBooks = Source(books.zipWithIndex) .map { case (book: String, index: Int) => WriteMessage.createIndexMessage(index.toString, Book(book)) } .via( ElasticsearchFlow.create[Book]( - "sink4", + indexName, "_doc" ) ) .runWith(Sink.seq) - val result1 = Await.result(f1, 10.seconds) - flush("sink4") - // Assert no error - assert(result1.forall(!_.exists(_.success == false))) - - // Assert docs in sink4/_doc - val f2 = ElasticsearchSource - .typed[Book]( - "sink4", - "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result2 = Await.result(f2, 10.seconds) - - result2.sorted shouldEqual Seq( + createBooks.futureValue.filter(!_.success) shouldBe empty + flush(indexName) + readTitlesFrom(indexName).futureValue should contain allElementsOf Seq( "Akka in Action", "Akka \u00DF Concurrency" ) } - "retry a failed documents and pass retired documents to downstream" in assertAllStagesStopped { + "retry a failed document and pass retried documents to downstream" in assertAllStagesStopped { + val indexName = "sink5" + // Create strict mapping to prevent invalid documents - createStrictMapping("sink5") + createStrictMapping(indexName) - val f1 = Source( - Seq( - Map("title" -> "Akka in Action").toJson, - Map("subject" -> "Akka Concurrency").toJson - ).zipWithIndex.toVector + val createBooks = Source( + immutable + .Seq( + Map("title" -> "Akka in Action").toJson, + Map("subject" -> "Akka Concurrency").toJson + ) + .zipWithIndex ).map { case (book: JsObject, index: Int) => WriteMessage.createIndexMessage(index.toString, book) @@ -383,50 +341,32 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } .via( ElasticsearchFlow.create( - "sink5", + indexName, "_doc", ElasticsearchWriteSettings() - .withRetryLogic(RetryAtFixedRate(5, 1.second)) + .withRetryLogic(RetryAtFixedRate(5, 100.millis)) ) ) .runWith(Sink.seq) val start = System.currentTimeMillis() - val result1 = Await.result(f1, 10.seconds) + val writeResults = createBooks.futureValue val end = System.currentTimeMillis() + writeResults should have size 2 + // Assert retired documents - assert( - result1.flatten.filter(!_.success).toList == Seq( - MessageFactory.createWriteResult[JsValue, NotUsed]( - WriteMessage.createIndexMessage("1", Map("subject" -> "Akka Concurrency").toJson), - Some( - """{"reason":"mapping set to strict, dynamic introduction of [subject] within [_doc] is not allowed","type":"strict_dynamic_mapping_exception"}""" - ) - ) - ) + val failed = writeResults.filter(!_.success).head + failed.message shouldBe WriteMessage.createIndexMessage("1", Map("subject" -> "Akka Concurrency").toJson) + failed.errorReason shouldBe Some( + "mapping set to strict, dynamic introduction of [subject] within [_doc] is not allowed" ) // Assert retried 5 times by looking duration assert(end - start > 5 * 100) - flush("sink5") - - // Assert docs in sink5/_doc - val f2 = ElasticsearchSource - .typed[Book]( - "sink5", - "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result2 = Await.result(f2, 10.seconds) - - result2.sorted shouldEqual Seq( + flush(indexName) + readTitlesFrom(indexName).futureValue shouldEqual Seq( "Akka in Action" ) } @@ -447,12 +387,13 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { KafkaMessage(Book("Book 3"), KafkaOffset(2)) ) - var committedOffsets = List[KafkaOffset]() + var committedOffsets = Vector[KafkaOffset]() def commitToKafka(offset: KafkaOffset): Unit = committedOffsets = committedOffsets :+ offset - val f1 = Source(messagesFromKafka) // Assume we get this from Kafka + val indexName = "sink6" + val kafkaToEs = Source(messagesFromKafka) // Assume we get this from Kafka .map { kafkaMessage: KafkaMessage => val book = kafkaMessage.book val id = book.title @@ -463,41 +404,24 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } .via( // write to elastic ElasticsearchFlow.createWithPassThrough[Book, KafkaOffset]( - indexName = "sink6", + indexName = indexName, typeName = "_doc" ) ) - .map { messageResults => - messageResults.foreach { result => - if (!result.success) throw new Exception("Failed to write message to elastic") - // Commit to kafka - commitToKafka(result.message.passThrough) - } + .map { result => + if (!result.success) throw new Exception("Failed to write message to elastic") + // Commit to kafka + commitToKafka(result.message.passThrough) } - .runWith(Sink.seq) + .runWith(Sink.ignore) - Await.ready(f1, 10.seconds) + kafkaToEs.futureValue shouldBe Done //#kafka-example - flush("sink6") + flush(indexName) // Make sure all messages was committed to kafka - assert(List(0, 1, 2) == committedOffsets.map(_.offset)) - - // Assert that all docs were written to elastic - val f2 = ElasticsearchSource - .typed[Book]( - indexName = "sink6", - typeName = "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result2 = Await.result(f2, 10.seconds).toList - - result2.sorted shouldEqual messagesFromKafka.map(_.book.title).sorted + committedOffsets.map(_.offset) should contain theSameElementsAs Seq(0, 1, 2) + readTitlesFrom(indexName).futureValue.toList should contain allElementsOf messagesFromKafka.map(_.book.title) } "store new documents using upsert method and partially update existing ones" in assertAllStagesStopped { @@ -507,24 +431,22 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ("00003", Book("Book 3")) ) - // Create new documents in sink7/_doc using the upsert method - val f1 = Source(books) + val indexName = "sink7" + val createBooks = Source(books) .map { book: (String, Book) => WriteMessage.createUpsertMessage(id = book._1, source = book._2) } .via( ElasticsearchFlow.create[Book]( - "sink7", + indexName, "_doc" ) ) .runWith(Sink.seq) - val result1 = Await.result(f1, 10.seconds) - flush("sink7") - // Assert no errors - assert(result1.forall(!_.exists(_.success == false))) + createBooks.futureValue.filter(!_.success) shouldBe 'empty + flush(indexName) // Create a second dataset with matching indexes to test partial update val updatedBooks = List( @@ -543,39 +465,35 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) // Update sink7/_doc with the second dataset - val f2 = Source(updatedBooks) + val upserts = Source(updatedBooks) .map { book: (String, JsObject) => WriteMessage.createUpsertMessage(id = book._1, source = book._2) } .via( ElasticsearchFlow.create[JsObject]( - "sink7", + indexName, "_doc" ) ) .runWith(Sink.seq) - val result2 = Await.result(f2, 10.seconds) - flush("sink7") - // Assert no errors - assert(result2.forall(!_.exists(_.success == false))) + upserts.futureValue.filter(!_.success) shouldBe 'empty + flush(indexName) // Assert docs in sink7/_doc - val f3 = ElasticsearchSource( - "sink7", + val readBooks = ElasticsearchSource( + indexName, "_doc", """{"match_all": {}}""", - ElasticsearchSourceSettings.Default + ElasticsearchSourceSettings() ).map { message => message.source } .runWith(Sink.seq) - val result3 = Await.result(f3, 10.seconds) - // Docs should contain both columns - result3.sortBy(_.fields("title").compactPrint) shouldEqual Seq( + readBooks.futureValue.sortBy(_.fields("title").compactPrint) shouldEqual Seq( JsObject( "title" -> JsString("Book 1"), "rating" -> JsNumber(4) @@ -592,8 +510,8 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "handle multiple types of operations correctly" in assertAllStagesStopped { + val indexName = "sink8" //#multiple-operations - // Index, create, update, upsert and delete documents in sink8/_doc val requests = List[WriteMessage[Book, NotUsed]]( WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")), WriteMessage.createUpsertMessage(id = "00002", source = Book("Book 2")), @@ -603,75 +521,67 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { WriteMessage.createDeleteMessage(id = "00002") ) - val f1 = Source(requests) + val writeResults = Source(requests) .via( ElasticsearchFlow.create[Book]( - "sink8", + indexName, "_doc" ) ) .runWith(Sink.seq) //#multiple-operations - val result1 = Await.result(f1, 10.seconds) - flush("sink8") - + val results = writeResults.futureValue + results should have size requests.size // Assert no errors except a missing document for a update request - val error = result1.flatMap(_.flatMap(_.error)) - error.length shouldEqual 1 - error(0).parseJson.asJsObject.fields("reason").asInstanceOf[JsString].value shouldEqual - "[_doc][00004]: document missing" + val errorMessages = results.flatMap(_.errorReason) + errorMessages should have size 1 + errorMessages(0) shouldEqual "[_doc][00004]: document missing" + flush(indexName) // Assert docs in sink8/_doc - val f3 = ElasticsearchSource( - "sink8", + val readBooks = ElasticsearchSource( + indexName, "_doc", """{"match_all": {}}""", - ElasticsearchSourceSettings.Default + ElasticsearchSourceSettings() ).map { message => - message.source - } + message.source + } .runWith(Sink.seq) - val result3 = Await.result(f3, 10.seconds) - // Docs should contain both columns - result3.sortBy(_.fields("title").compactPrint) shouldEqual Seq( + readBooks.futureValue.sortBy(_.fields("title").compactPrint) shouldEqual Seq( JsObject("title" -> JsString("Book 1")), JsObject("title" -> JsString("Book 3")), JsObject("title" -> JsString("Book 5")) ) } - } - "ElasticsearchFlow" should { "Create existing document should fail" in { + val indexName = "sink9" val requests = List[WriteMessage[Book, NotUsed]]( WriteMessage.createIndexMessage(id = "00001", source = Book("Book 1")), WriteMessage.createCreateMessage(id = "00001", source = Book("Book 1")) ) - val f1 = Source(requests) + val writeResults = Source(requests) .via( ElasticsearchFlow.create[Book]( - "sink9", + indexName, "_doc" ) ) .runWith(Sink.seq) - val result1 = Await.result(f1, Duration.Inf) - flush("sink9") - + val results = writeResults.futureValue + results should have size requests.size // Assert error - val error = result1.flatMap(_.flatMap(_.error)) - error.length shouldEqual 1 - error(0).parseJson.asJsObject.fields("reason").asInstanceOf[JsString].value shouldEqual - "[_doc][00001]: version conflict, document already exists (current version [1])" + val errorMessages = results.flatMap(_.errorReason) + errorMessages should have size 1 + errorMessages(0) shouldEqual "[_doc][00001]: version conflict, document already exists (current version [1])" } - } - "ElasticsearchSource" should { "read and write document-version if configured to do so" in assertAllStagesStopped { case class VersionTestDoc(id: String, name: String, value: Int) @@ -687,7 +597,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) // insert new documents - val f1 = Source(docs) + val indexResults = Source(docs) .map { doc => WriteMessage.createIndexMessage(doc.id, doc) } @@ -700,16 +610,14 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) .runWith(Sink.seq) - val result1 = Await.result(f1, 10.seconds) - flush(indexName) - // Assert no errors - assert(result1.forall(!_.exists(_.success == false))) + indexResults.futureValue.filter(!_.success) shouldBe 'empty + flush(indexName) // search for the documents and assert them being at version 1, // then update while specifying that for which version - val f3 = ElasticsearchSource + val updatedVersions = ElasticsearchSource .typed[VersionTestDoc]( indexName, typeName, @@ -736,12 +644,11 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) .runWith(Sink.seq) - val result3 = Await.result(f3, 10.seconds) - assert(result3.forall(!_.exists(_.success == false))) + updatedVersions.futureValue.filter(!_.success) shouldBe 'empty flush(indexName) // Search again to assert that all documents are now on version 2 - val f4 = ElasticsearchSource + val assertVersions = ElasticsearchSource .typed[VersionTestDoc]( indexName, typeName, @@ -757,11 +664,10 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } .runWith(Sink.ignore) - val result4 = Await.result(f4, 10.seconds) + assertVersions.futureValue shouldBe Done // Try to write document with old version - it should fail - - val f5 = Source + val illegalIndexWrites = Source .single(VersionTestDoc("1", "a", 2)) .map { doc => val oldVersion = 1 @@ -776,9 +682,8 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) .runWith(Sink.seq) - val result5 = Await.result(f5, 10.seconds) - assert(result5(0)(0).success == false) - + val result5 = illegalIndexWrites.futureValue + result5.head.success shouldBe false } "allow read and write using configured version type" in assertAllStagesStopped { @@ -791,7 +696,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { val externalVersion = 5L // Insert new document using external version - val f1 = Source + val insertWrite = Source .single(book) .map { doc => WriteMessage.createIndexMessage(docId, doc).withVersion(externalVersion) @@ -805,13 +710,13 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) .runWith(Sink.seq) - val insertResult = Await.result(f1, 10.seconds).head.head + val insertResult = insertWrite.futureValue.head assert(insertResult.success) flush(indexName) // Assert that the document's external version is saved - val f2 = ElasticsearchSource + val readFirst = ElasticsearchSource .typed[Book]( indexName, typeName, @@ -820,19 +725,16 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) .runWith(Sink.head) - val message = Await.result(f2, 10.seconds) - assert(message.version.contains(externalVersion)) + assert(readFirst.futureValue.version.contains(externalVersion)) } - } - "Elasticsearch connector" should { - "Should use indexName supplied in message if present" in assertAllStagesStopped { + "use indexName supplied in message if present" in assertAllStagesStopped { // Copy source/_doc to sink2/_doc through typed stream //#custom-index-name-example val customIndexName = "custom-index" - val f1 = ElasticsearchSource + val writeCustomIndex = ElasticsearchSource .typed[Book]( indexName = "source", typeName = "_doc", @@ -851,25 +753,9 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) //#custom-index-name-example - Await.result(f1, 10.seconds) - + writeCustomIndex.futureValue shouldBe Done flush(customIndexName) - - // Assert docs in custom-index/_doc - val f2 = ElasticsearchSource - .typed[Book]( - customIndexName, - "_doc", - """{"match_all": {}}""" - ) - .map { message => - message.source.title - } - .runWith(Sink.seq) - - val result = Await.result(f2, 10.seconds) - - result.sorted shouldEqual Seq( + readTitlesFrom(customIndexName).futureValue.sorted shouldEqual Seq( "Akka Concurrency", "Akka in Action", "Effective Akka", @@ -880,7 +766,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) } - "should throw an IllegalArgumentException if indexName is null when passed to ElasticsearchSource" in { + "throw an IllegalArgumentException if indexName is null when passed to ElasticsearchSource" in { assertThrows[IllegalArgumentException] { ElasticsearchSource .create( @@ -891,7 +777,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } - "should throw an IllegalArgumentException if indexName is null when passed to ElasticsearchFlow" in { + "throw an IllegalArgumentException if indexName is null when passed to ElasticsearchFlow" in { assertThrows[IllegalArgumentException] { ElasticsearchFlow .create[Book]( @@ -901,7 +787,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } } - "should throw an IllegalArgumentException if typeName is null when passed to ElasticsearchFlow" in { + "throw an IllegalArgumentException if typeName is null when passed to ElasticsearchFlow" in { assertThrows[IllegalArgumentException] { ElasticsearchFlow .create[Book]( @@ -913,8 +799,8 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { } "ElasticsearchSource" should { - "should let you search without specifying typeName" in assertAllStagesStopped { - val f1 = ElasticsearchSource + "allow search without specifying typeName" in assertAllStagesStopped { + val readWithoutTypeName = ElasticsearchSource .typed[Book]( indexName = "source", typeName = None, @@ -924,8 +810,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { .map(_.source.title) .runWith(Sink.seq) - val result = Await.result(f1, 10.seconds).toList - + val result = readWithoutTypeName.futureValue.toList result.sorted shouldEqual Seq( "Akka Concurrency", "Akka in Action", @@ -936,9 +821,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { "Scala for Spark in Production" ) } - } - "ElasticsearchSource" should { "be able to use custom searchParams" in assertAllStagesStopped { //#custom-search-params @@ -957,7 +840,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) // insert new documents - val f1 = Source(docs) + val writes = Source(docs) .map { doc => WriteMessage.createIndexMessage(doc.id, doc) } @@ -970,23 +853,20 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { ) .runWith(Sink.seq) - val result1 = Await.result(f1, 10.seconds) + writes.futureValue.filter(!_.success) shouldBe 'empty flush(indexName) - // Assert no errors - assert(result1.forall(!_.exists(_.success == false))) - //#custom-search-params // Search for docs and ask elastic to only return some fields - val f3 = ElasticsearchSource + val readWithSearchParameters = ElasticsearchSource .typed[TestDoc](indexName, Some(typeName), searchParams = Map( "query" -> """ {"match_all": {}} """, "_source" -> """ ["id", "a", "c"] """ ), - ElasticsearchSourceSettings.Default) + ElasticsearchSourceSettings()) .map { message => message.source } @@ -994,8 +874,7 @@ class ElasticsearchSpec extends WordSpec with Matchers with BeforeAndAfterAll { //#custom-search-params - val result3 = Await.result(f3, 10.seconds) - assert(result3.toList.sortBy(_.id) == docs.map(_.copy(b = None))) + assert(readWithSearchParameters.futureValue.toList.sortBy(_.id) == docs.map(_.copy(b = None))) } }