Skip to content

Commit

Permalink
[FLINK-28107][python][connector/elasticsearch] Support id of document…
Browse files Browse the repository at this point in the history
… is null

This closes apache#20002.
  • Loading branch information
a49a authored and dianfu committed Jun 20, 2022
1 parent a9f2e20 commit 11910d5
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 103 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.api.connector.sink2.SinkWriter;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;

import javax.annotation.Nullable;

import java.util.Map;
import java.util.function.Function;

import static org.apache.flink.util.Preconditions.checkNotNull;

/** A simple ElasticsearchEmitter which is currently used in PyFlink ES connector. */
public class MapElasticsearchEmitter implements ElasticsearchEmitter<Map<String, Object>> {

private static final long serialVersionUID = 1L;

private final String index;
private @Nullable final String documentType;
private @Nullable final String idFieldName;
private final boolean isDynamicIndex;
private transient Function<Map<String, Object>, String> indexProvider;

public MapElasticsearchEmitter(
String index,
@Nullable String documentType,
@Nullable String idFieldName,
boolean isDynamicIndex) {
this.index = checkNotNull(index);
this.documentType = documentType;
this.idFieldName = idFieldName;
this.isDynamicIndex = isDynamicIndex;
}

@Override
public void open() throws Exception {
if (isDynamicIndex) {
indexProvider = doc -> doc.get(index).toString();
} else {
indexProvider = doc -> index;
}
}

@Override
public void emit(Map<String, Object> doc, SinkWriter.Context context, RequestIndexer indexer) {
if (idFieldName != null) {
final UpdateRequest updateRequest =
new UpdateRequest(
indexProvider.apply(doc),
documentType,
doc.get(idFieldName).toString())
.doc(doc)
.upsert(doc);
indexer.add(updateRequest);
} else {
final IndexRequest indexRequest =
new IndexRequest(indexProvider.apply(doc), documentType).source(doc);
indexer.add(indexRequest);
}
}
}

This file was deleted.

12 changes: 6 additions & 6 deletions flink-python/pyflink/datastream/connectors/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ def static_index(index: str, key_field: str = None, doc_type: str = None) \
Creates an emitter with static index which is invoked on every record to convert it to
Elasticsearch actions.
"""
JSimpleElasticsearchEmitter = get_gateway().jvm \
.org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter
j_emitter = JSimpleElasticsearchEmitter(index, doc_type, key_field, False)
JMapElasticsearchEmitter = get_gateway().jvm \
.org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter
j_emitter = JMapElasticsearchEmitter(index, doc_type, key_field, False)
return ElasticsearchEmitter(j_emitter)

@staticmethod
Expand All @@ -87,9 +87,9 @@ def dynamic_index(index_field: str, key_field: str = None, doc_type: str = None)
Creates an emitter with dynamic index which is invoked on every record to convert it to
Elasticsearch actions.
"""
JSimpleElasticsearchEmitter = get_gateway().jvm \
.org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter
j_emitter = JSimpleElasticsearchEmitter(index_field, doc_type, key_field, True)
JMapElasticsearchEmitter = get_gateway().jvm \
.org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter
j_emitter = JMapElasticsearchEmitter(index_field, doc_type, key_field, True)
return ElasticsearchEmitter(j_emitter)


Expand Down
40 changes: 38 additions & 2 deletions flink-python/pyflink/datastream/tests/test_connectors.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def test_es_sink(self):
self.assertTrue(
is_instance_of(
j_emitter,
'org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter'))
'org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter'))
self.assertEqual(
get_field_value(
es_sink.get_java_function(), 'hosts')[0].toString(), 'http://localhost:9200')
Expand Down Expand Up @@ -138,7 +138,43 @@ def test_es_sink_dynamic(self):
self.assertTrue(
is_instance_of(
j_emitter,
'org.apache.flink.connector.elasticsearch.sink.SimpleElasticsearchEmitter'))
'org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter'))

ds.sink_to(es_dynamic_index_sink).name('es dynamic index sink')

def test_es_sink_key_none(self):
ds = self.env.from_collection(
[{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
type_info=Types.MAP(Types.STRING(), Types.STRING()))

es_sink = Elasticsearch7SinkBuilder() \
.set_emitter(ElasticsearchEmitter.static_index('foo')) \
.set_hosts(['localhost:9200']) \
.build()

j_emitter = get_field_value(es_sink.get_java_function(), 'emitter')
self.assertTrue(
is_instance_of(
j_emitter,
'org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter'))

ds.sink_to(es_sink).name('es sink')

def test_es_sink_dynamic_key_none(self):
ds = self.env.from_collection(
[{'name': 'ada', 'id': '1'}, {'name': 'luna', 'id': '2'}],
type_info=Types.MAP(Types.STRING(), Types.STRING()))

es_dynamic_index_sink = Elasticsearch7SinkBuilder() \
.set_emitter(ElasticsearchEmitter.dynamic_index('name')) \
.set_hosts(['localhost:9200']) \
.build()

j_emitter = get_field_value(es_dynamic_index_sink.get_java_function(), 'emitter')
self.assertTrue(
is_instance_of(
j_emitter,
'org.apache.flink.connector.elasticsearch.sink.MapElasticsearchEmitter'))

ds.sink_to(es_dynamic_index_sink).name('es dynamic index sink')

Expand Down

0 comments on commit 11910d5

Please sign in to comment.