Skip to content

Commit

Permalink
Elasticsearch: Make flows type signature simpler to prepare for `with…
Browse files Browse the repository at this point in the history
…Context` (akka#1515)

* Sort out logging and simpler test structure
* Reduce logic in ES flow stage (drop batching, futures)
* Extract error reason; simpler JSON building
* Emit WriteResults, not sequences
  • Loading branch information
ennru authored Feb 20, 2019
1 parent ccc8c7d commit 8d9c95d
Show file tree
Hide file tree
Showing 12 changed files with 453 additions and 578 deletions.
4 changes: 2 additions & 2 deletions docs/src/main/paradox/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ Java

| Parameter | Default | Description |
| ------------------- | ------- | ------------------------------------------------------------------------------------------------------ |
| bufferSize | 10 | `ElasticsearchSink` puts messages by one bulk request per messages of this buffer size. |
| bufferSize | 10 | Flow and Sink batch messages to bulk requests when back-pressure applies. |
| versionType | None | If set, `ElasticsearchSink` uses the chosen versionType to index documents. See [Version types](https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#_version_types) for accepted settings. |
| retryLogic | No retries | See below |


A bulk request might fail partially for some reason. To retry failed writes to Elasticsearch, a `RetryLogic` can be specified. The provided implementation is `RetryAtFixedRate`.

@@@ warning
If using retries, you will receive messages out of order downstream in cases where elastic returns an error one some of the documents in a bulk request.
If using retries, you will receive messages **out of order downstream** in cases when Elasticsearch returns an error on some of the documents in a bulk request.
@@@


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import scala.compat.java8.OptionConverters._
* INTERNAL API
*/
@InternalApi
private[elasticsearch] sealed abstract class Operation(val command: String)
private[elasticsearch] sealed abstract class Operation(val command: String) {
override def toString: String = command
}

/**
* INTERNAL API
Expand Down Expand Up @@ -132,12 +134,22 @@ object WriteMessage {
* [[akka.stream.alpakka.elasticsearch.testkit.MessageFactory]].
*/
final class WriteResult[T2, C2] @InternalApi private[elasticsearch] (val message: WriteMessage[T2, C2],
/** JSON structure of the Elasticsearch error. */
val error: Option[String]) {
val success: Boolean = error.isEmpty

/** Java API */
/** Java API: JSON structure of the Elasticsearch error. */
def getError: java.util.Optional[String] = error.asJava

/** `reason` field value of the Elasticsearch error. */
def errorReason: Option[String] = {
import spray.json._
error.flatMap(_.parseJson.asJsObject.fields.get("reason").map(_.asInstanceOf[JsString].value))
}

/** Java API: `reason` field value from the Elasticsearch error */
def getErrorReason: java.util.Optional[String] = errorReason.asJava

override def toString =
s"""WriteResult(message=$message,error=$error)"""

Expand Down
Loading

0 comments on commit 8d9c95d

Please sign in to comment.