Skip to content

Commit

Permalink
Elasticsearch connector (akka#221)
Browse files Browse the repository at this point in the history
  • Loading branch information
takezoe authored and raboof committed Sep 19, 2017
1 parent c4350b9 commit 618ebf0
Show file tree
Hide file tree
Showing 18 changed files with 1,342 additions and 0 deletions.
8 changes: 8 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ lazy val alpakka = project
cassandra,
csv,
dynamodb,
elasticsearch,
files,
ftp,
geode,
Expand Down Expand Up @@ -88,6 +89,13 @@ lazy val dynamodb = project
Dependencies.DynamoDB
)

lazy val elasticsearch = project
.enablePlugins(AutomateHeaderPlugin)
.settings(
name := "akka-stream-alpakka-elasticsearch",
Dependencies.Elasticsearch
)

lazy val files = project // The name file is taken by `sbt.file`!
.in(file("file"))
.enablePlugins(AutomateHeaderPlugin)
Expand Down
1 change: 1 addition & 0 deletions docs/src/main/paradox/connectors.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
* [AWS SQS Connector](sqs.md)
* [Azure Storage Queue](azure-storage-queue.md)
* [Cassandra Connector](cassandra.md)
* [Elasticsearch Connectors](elasticsearch.md)
* [File Connectors](file.md)
* [FTP Connector](ftp.md)
* [Google Cloud Pub/Sub Connector](google-cloud-pub-sub.md)
Expand Down
130 changes: 130 additions & 0 deletions docs/src/main/paradox/elasticsearch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# Elasticsearch Connector

The Elasticsearch connector provides Akka Stream sources and sinks for Elasticsearch.

For more information about Elasticsearch please visit the [official documentation](https://www.elastic.co/guide/index.html).

## Artifacts

sbt
: @@@vars
```scala
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "$version$"
```
@@@

Maven
: @@@vars
```xml
<dependency>
<groupId>com.lightbend.akka</groupId>
<artifactId>akka-stream-alpakka-elasticsearch_$scala.binaryVersion$</artifactId>
<version>$version$</version>
</dependency>
```
@@@

Gradle
: @@@vars
```gradle
dependencies {
compile group: "com.lightbend.akka", name: "akka-stream-alpakka-elasticsearch_$scala.binaryVersion$", version: "$version$"
}
```
@@@

## Usage

Sources, Flows and Sinks provided by this connector need a prepared `RestClient` to access to Elasticsearch.

Scala
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #init-client }

Java
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #init-client }

We will also need an @scaladoc[ActorSystem](akka.actor.ActorSystem) and an @scaladoc[ActorMaterializer](akka.stream.ActorMaterializer).

Scala
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #init-mat }

Java
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #init-mat }

This is all preparation that we are going to need.

### JsObject message

Now we can stream messages which contains spray-json's `JsObject` (in Scala) or `java.util.Map<String, Object>` (in Java)
from or to Elasticsearch where we have access to by providing the `RestClient` to the
@scaladoc[ElasticsearchSource](akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSource$) or the
@scaladoc[ElasticsearchSink](akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSink$).

Scala
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-jsobject }

Java
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-jsobject }


### Typed messages

Also, it's possible to stream messages which contains any classes. In Scala, spray-json is used for JSON conversion,
so defining the mapped class and `JsonFormat` for it is necessary. In Java, Jackson is used, so just define the mapped class.

Scala
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #define-class }

Java
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #define-class }


Use `ElasticsearchSource.typed` and `ElasticsearchSink.typed` to create source and sink instead.

Scala
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-typed }

Java
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-typed }


### Configuration

We can specify the buffer size for the source.

Scala (source)
: @@snip (../../../../elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSourceSettings.scala) { #source-settings }

Also, we can specify the buffer size, the max retry count and the retry interval for the sink.

Scala (sink)
: @@snip (../../../../elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/scaladsl/ElasticsearchSinkSettings.scala) { #sink-settings }

`ElasticsearchSource` retrieves messages from Elasticsearch by scroll scan. This buffer size is used as the scroll size.
`ElasticsearchSink` puts messages by one bulk request per messages of this buffer size.

### Using Elasticsearch as a Flow

You can also build flow stages. The API is similar to creating Sinks.

Scala (flow)
: @@snip (../../../../elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/ElasticsearchSpec.scala) { #run-flow }

Java (flow)
: @@snip (../../../../elasticsearch/src/test/java/akka/stream/alpakka/elasticsearch/ElasticsearchTest.java) { #run-flow }

### Running the example code

The code in this guide is part of runnable tests of this project. You are welcome to edit the code and run it in sbt.

Scala
: ```
sbt
> elasticsearch/testOnly *.ElasticsearchSpec
```

Java
: ```
sbt
> elasticsearch/testOnly *.ElasticsearchTest
```
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright (C) 2016-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.alpakka.elasticsearch

import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage._
import org.apache.http.entity.StringEntity
import org.elasticsearch.client.{Response, ResponseListener, RestClient}

import scala.collection.mutable
import scala.collection.JavaConverters._
import spray.json._

import scala.concurrent.Future
import scala.concurrent.duration._
import ElasticsearchFlowStage._
import akka.NotUsed
import akka.stream.alpakka.elasticsearch.scaladsl.ElasticsearchSinkSettings
import org.apache.http.message.BasicHeader
import org.apache.http.util.EntityUtils

final case class IncomingMessage[T](id: Option[String], source: T)

trait MessageWriter[T] {
def convert(message: T): String
}

class ElasticsearchFlowStage[T, R](
indexName: String,
typeName: String,
client: RestClient,
settings: ElasticsearchSinkSettings,
pusher: Seq[IncomingMessage[T]] => R,
writer: MessageWriter[T]
) extends GraphStage[FlowShape[IncomingMessage[T], Future[R]]] {

private val in = Inlet[IncomingMessage[T]]("messages")
private val out = Outlet[Future[R]]("failed")
override val shape = FlowShape(in, out)

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {

private var state: State = Idle
private val queue = new mutable.Queue[IncomingMessage[T]]()
private val failureHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Throwable)](handleFailure)
private val responseHandler = getAsyncCallback[(Seq[IncomingMessage[T]], Response)](handleResponse)
private var failedMessages: Seq[IncomingMessage[T]] = Nil
private var retryCount: Int = 0

override def preStart(): Unit =
pull(in)

private def tryPull(): Unit =
if (queue.size < settings.bufferSize && !isClosed(in) && !hasBeenPulled(in)) {
pull(in)
}

override def onTimer(timerKey: Any): Unit = {
sendBulkUpdateRequest(failedMessages)
failedMessages = Nil
}

private def handleFailure(args: (Seq[IncomingMessage[T]], Throwable)): Unit = {
val (messages, exception) = args
if (retryCount >= settings.maxRetry) {
failStage(exception)
} else {
retryCount = retryCount + 1
failedMessages = messages
scheduleOnce(NotUsed, settings.retryInterval.millis)
}
}

private def handleSuccess(): Unit =
completeStage()

private def handleResponse(args: (Seq[IncomingMessage[T]], Response)): Unit = {
retryCount = 0
val (messages, response) = args
val responseJson = EntityUtils.toString(response.getEntity).parseJson

// If some commands in bulk request failed, pass failed messages to follows.
val items = responseJson.asJsObject.fields("items").asInstanceOf[JsArray]
val failedMessages = items.elements.zip(messages).flatMap {
case (item, message) =>
val result = item.asJsObject.fields("index").asJsObject.fields("result").asInstanceOf[JsString].value
if (result == "created" || result == "updated") {
None
} else {
Some(message)
}
}

// Fetch next messages from queue and send them
val nextMessages = (1 to settings.bufferSize).flatMap { _ =>
queue.dequeueFirst(_ => true)
}

if (nextMessages.isEmpty) {
state match {
case Finished => handleSuccess()
case _ => state = Idle
}
} else {
sendBulkUpdateRequest(nextMessages)
}

push(out, Future.successful(pusher(failedMessages)))
}

private def sendBulkUpdateRequest(messages: Seq[IncomingMessage[T]]): Unit = {
val json = messages
.map { message =>
JsObject(
"index" -> JsObject(
Seq(
Option("_index" -> JsString(indexName)),
Option("_type" -> JsString(typeName)),
message.id.map { id =>
"_id" -> JsString(id)
}
).flatten: _*
)
).toString + "\n" + writer.convert(message.source)
}
.mkString("", "\n", "\n")

client.performRequestAsync(
"POST",
"/_bulk",
Map[String, String]().asJava,
new StringEntity(json),
new ResponseListener() {
override def onFailure(exception: Exception): Unit =
failureHandler.invoke((messages, exception))
override def onSuccess(response: Response): Unit =
responseHandler.invoke((messages, response))
},
new BasicHeader("Content-Type", "application/x-ndjson")
)
}

setHandlers(in, out, this)

override def onPull(): Unit = tryPull()

override def onPush(): Unit = {
val message = grab(in)
queue.enqueue(message)

state match {
case Idle => {
state = Sending
val messages = (1 to settings.bufferSize).flatMap { _ =>
queue.dequeueFirst(_ => true)
}
sendBulkUpdateRequest(messages)
}
case _ => ()
}

tryPull()
}

override def onUpstreamFailure(exception: Throwable): Unit =
failStage(exception)

override def onUpstreamFinish(): Unit =
state match {
case Idle => handleSuccess()
case Sending => state = Finished
case Finished => ()
}
}

}

object ElasticsearchFlowStage {

private sealed trait State
private case object Idle extends State
private case object Sending extends State
private case object Finished extends State

}
Loading

0 comments on commit 618ebf0

Please sign in to comment.