Skip to content

Commit

Permalink
[FLINK-4988] [elasticsearch] Restructure Elasticsearch connectors
Browse files Browse the repository at this point in the history
This closes apache#3112.
  • Loading branch information
tzulitai committed Feb 7, 2017
1 parent 8699b03 commit b5caaef
Show file tree
Hide file tree
Showing 51 changed files with 2,301 additions and 1,724 deletions.
271 changes: 202 additions & 69 deletions docs/dev/connectors/elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,158 +23,291 @@ specific language governing permissions and limitations
under the License.
-->

This connector provides a Sink that can write to an
[Elasticsearch](https://elastic.co/) Index. To use this connector, add the
following dependency to your project:

{% highlight xml %}
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch{{ site.scala_version_suffix }}</artifactId>
<version>{{site.version }}</version>
</dependency>
{% endhighlight %}
This connector provides sinks that can request document actions to an
[Elasticsearch](https://elastic.co/) Index. To use this connector, add one
of the following dependencies to your project, depending on the version
of the Elasticsearch installation:

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left">Maven Dependency</th>
<th class="text-left">Supported since</th>
<th class="text-left">Elasticsearch version</th>
</tr>
</thead>
<tbody>
<tr>
<td>flink-connector-elasticsearch{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>1.x</td>
</tr>
<tr>
<td>flink-connector-elasticsearch2{{ site.scala_version_suffix }}</td>
<td>1.0.0</td>
<td>2.x</td>
</tr>
<tr>
<td>flink-connector-elasticsearch5{{ site.scala_version_suffix }}</td>
<td>1.2.0</td>
<td>5.x</td>
</tr>
</tbody>
</table>

Note that the streaming connectors are currently not part of the binary
distribution. See
[here]({{site.baseurl}}/dev/linking.html)
for information about how to package the program with the libraries for
cluster execution.
distribution. See [here]({{site.baseurl}}/dev/linking.html) for information
about how to package the program with the libraries for cluster execution.

#### Installing Elasticsearch

Instructions for setting up an Elasticsearch cluster can be found
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/setup.html).
Make sure to set and remember a cluster name. This must be set when
creating a Sink for writing to your cluster
creating an `ElasticsearchSink` for requesting document actions against your cluster.

#### Elasticsearch Sink
The connector provides a Sink that can send data to an Elasticsearch Index.

The sink can use two different methods for communicating with Elasticsearch:

1. An embedded Node
2. The TransportClient

See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
for information about the differences between the two modes.
The `ElasticsearchSink` uses a `TransportClient` to communicate with an
Elasticsearch cluster.

This code shows how to create a sink that uses an embedded Node for
communication:
The example below shows how to configure and create a sink:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
<div data-lang="java, Elasticsearch 1.x" markdown="1">
{% highlight java %}
DataStream<String> input = ...;

Map<String, String> config = Maps.newHashMap();
Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

input.addSink(new ElasticsearchSink<>(config, new IndexRequestBuilder<String>() {
@Override
public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
Map<String, Object> json = new HashMap<>();
json.put("data", element);
List<TransportAddress> transportAddresses = new ArrayList<String>();
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300));
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
<div data-lang="java, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight java %}
DataStream<String> input = ...;

Map<String, String> config = new HashMap<>();
config.put("cluster.name", "my-cluster-name");
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300));

input.addSink(new ElasticsearchSink<>(config, transportAddresses, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));{% endhighlight %}
</div>
<div data-lang="scala, Elasticsearch 1.x" markdown="1">
{% highlight scala %}
val input: DataStream[String] = ...

val config = new util.HashMap[String, String]
val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[TransportAddress]
transportAddresses.add(new InetSocketTransportAddress("127.0.0.1", 9300))
transportAddresses.add(new InetSocketTransportAddress("10.2.3.1", 9300))

input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
{% endhighlight %}
</div>
<div data-lang="scala, Elasticsearch 2.x / 5.x" markdown="1">
{% highlight scala %}
val input: DataStream[String] = ...

val config = new java.util.HashMap[String, String]
config.put("cluster.name", "my-cluster-name")
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1")

val transportAddresses = new java.util.ArrayList[InetSocketAddress]
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300))
transportAddresses.add(new InetSocketAddress(InetAddress.getByName("10.2.3.1"), 9300))

text.addSink(new ElasticsearchSink(config, new IndexRequestBuilder[String] {
override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
val json = new util.HashMap[String, AnyRef]
input.addSink(new ElasticsearchSink(config, transportAddresses, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
println("SENDING: " + element)
Requests.indexRequest.index("my-index").`type`("my-type").source(json)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
{% endhighlight %}
</div>
</div>

Note how a Map of Strings is used to configure the Sink. The configuration keys
are documented in the Elasticsearch documentation
Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
The configuration keys are documented in the Elasticsearch documentation
[here](https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html).
Especially important is the `cluster.name` parameter that must correspond to
the name of your cluster.

Internally, the sink uses a `BulkProcessor` to send index requests to the cluster.
This will buffer elements before sending a request to the cluster. The behaviour of the
`BulkProcessor` can be configured using these config keys:
Also note that the example only demonstrates performing a single index
request for each incoming element. Generally, the `ElasticsearchSinkFunction`
can be used to perform multiple requests of different types (ex.,
`DeleteRequest`, `UpdateRequest`, etc.).

Internally, the sink uses a `BulkProcessor` to send action requests to the cluster.
This will buffer elements before sending them in bulk to the cluster. The behaviour of the
`BulkProcessor` can be set using these config keys in the provided `Map` configuration:
* **bulk.flush.max.actions**: Maximum amount of elements to buffer
* **bulk.flush.max.size.mb**: Maximum amount of data (in megabytes) to buffer
* **bulk.flush.interval.ms**: Interval at which to flush data regardless of the other two
settings in milliseconds

This example code does the same, but with a `TransportClient`:
#### Communication using Embedded Node (only for Elasticsearch 1.x)

For Elasticsearch versions 1.x, communication using an embedded node is
also supported. See [here](https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/client.html)
for information about the differences between communicating with Elasticsearch
with an embedded node and a `TransportClient`.

Below is an example of how to create an `ElasticsearchSink` use an
embedded node instead of a `TransportClient`:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
DataStream<String> input = ...;

Map<String, String> config = Maps.newHashMap();
Map<String, String> config = new HashMap<>;
// This instructs the sink to emit after every element, otherwise they would be buffered
config.put("bulk.flush.max.actions", "1");
config.put("cluster.name", "my-cluster-name");

List<TransportAddress> transports = new ArrayList<String>();
transports.add(new InetSocketTransportAddress("node-1", 9300));
transports.add(new InetSocketTransportAddress("node-2", 9300));

input.addSink(new ElasticsearchSink<>(config, transports, new IndexRequestBuilder<String>() {
@Override
public IndexRequest createIndexRequest(String element, RuntimeContext ctx) {
Map<String, Object> json = new HashMap<>();
input.addSink(new ElasticsearchSink<>(config, new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
json.put("data", element);

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}

@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val input: DataStream[String] = ...

val config = new util.HashMap[String, String]
val config = new java.util.HashMap[String, String]
config.put("bulk.flush.max.actions", "1")
config.put("cluster.name", "my-cluster-name")

val transports = new ArrayList[String]
transports.add(new InetSocketTransportAddress("node-1", 9300))
transports.add(new InetSocketTransportAddress("node-2", 9300))

text.addSink(new ElasticsearchSink(config, transports, new IndexRequestBuilder[String] {
override def createIndexRequest(element: String, ctx: RuntimeContext): IndexRequest = {
val json = new util.HashMap[String, AnyRef]
input.addSink(new ElasticsearchSink(config, new ElasticsearchSinkFunction[String] {
def createIndexRequest(element: String): IndexRequest = {
val json = new java.util.HashMap[String, String]
json.put("data", element)
println("SENDING: " + element)
Requests.indexRequest.index("my-index").`type`("my-type").source(json)

return Requests.indexRequest()
.index("my-index")
.type("my-type")
.source(json);
}
}))
{% endhighlight %}
</div>
</div>

The difference is that we now need to provide a list of Elasticsearch Nodes
to which the sink should connect using a `TransportClient`.
The difference is that now we do not need to provide a list of addresses
of Elasticsearch nodes.

More information about Elasticsearch can be found [here](https://elastic.co).

#### Packaging the Elasticsearch Connector into an Uber-Jar

For the execution of your Flink program, it is recommended to build a
so-called uber-jar (executable jar) containing all your dependencies
(see [here]({{site.baseurl}}/dev/linking.html) for further information).

However, when an uber-jar containing an Elasticsearch sink is executed,
an `IllegalArgumentException` may occur, which is caused by conflicting
files of Elasticsearch and it's dependencies in `META-INF/services`:

```
IllegalArgumentException[An SPI class of type org.apache.lucene.codecs.PostingsFormat with name 'Lucene50' does not exist. You need to add the corresponding JAR file supporting this SPI to your classpath. The current classpath supports the following names: [es090, completion090, XBloomFilter]]
```

If the uber-jar is built using Maven, this issue can be avoided by
adding the following to the Maven POM file in the plugins section:

~~~xml
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
~~~
Loading

0 comments on commit b5caaef

Please sign in to comment.