diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml deleted file mode 100644 index 0d6da4e6dcb83..0000000000000 --- a/flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml +++ /dev/null @@ -1,91 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-end-to-end-tests - 1.16-SNAPSHOT - - - flink-elasticsearch6-test - Flink : E2E Tests : Elasticsearch 6 - jar - - - - org.apache.flink - flink-streaming-java - ${project.version} - provided - - - org.apache.flink - flink-connector-elasticsearch6 - ${project.version} - - - - - - - org.apache.maven.plugins - maven-shade-plugin - - - Elasticsearch6SinkExample - package - - shade - - - Elasticsearch6SinkExample - - - com.google.code.findbugs:jsr305 - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - org.apache.flink.streaming.tests.Elasticsearch6SinkExample - - - - - - - - - - diff --git a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java deleted file mode 100644 index 7243351431243..0000000000000 --- a/flink-end-to-end-tests/flink-elasticsearch6-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch6SinkExample.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.tests; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch6SinkBuilder; -import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** End to end test for Elasticsearch6Sink. */ -public class Elasticsearch6SinkExample { - - public static void main(String[] args) throws Exception { - - final ParameterTool parameterTool = ParameterTool.fromArgs(args); - - if (parameterTool.getNumberOfParameters() < 3) { - System.out.println( - "Missing parameters!\n" - + "Usage: --numRecords --index --type "); - return; - } - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - - DataStream> source = - env.fromSequence(0, parameterTool.getInt("numRecords") - 1) - .flatMap( - new FlatMapFunction>() { - @Override - public void flatMap( - Long value, Collector> out) { - final String key = String.valueOf(value); - final String message = "message #" + value; - out.collect(Tuple2.of(key, message + "update #1")); - out.collect(Tuple2.of(key, message + "update #2")); - } - }); - - ElasticsearchSink> sink = - new Elasticsearch6SinkBuilder>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, ctx, indexer) -> { - indexer.add(createIndexRequest(element.f1, parameterTool)); - indexer.add(createUpdateRequest(element, parameterTool)); - }) - .setBulkFlushMaxActions(1) // emit after every element, don't buffer - .build(); - - source.sinkTo(sink); - env.execute("Elasticsearch 6.x end to end sink test example"); - } - - private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { - Map json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index(parameterTool.getRequired("index")) - .type(parameterTool.getRequired("type")) - .id(element) - .source(json); - } - - private static UpdateRequest createUpdateRequest( - Tuple2 element, ParameterTool parameterTool) { - Map json = new HashMap<>(); - json.put("data", element.f1); - - return new UpdateRequest( - parameterTool.getRequired("index"), - parameterTool.getRequired("type"), - element.f0) - .doc(json) - .upsert(json); - } -} diff --git a/flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml b/flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml deleted file mode 100644 index cebe36bdf14cc..0000000000000 --- a/flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml +++ /dev/null @@ -1,91 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-end-to-end-tests - 1.16-SNAPSHOT - - - flink-elasticsearch7-test - Flink : E2E Tests : Elasticsearch 7 - jar - - - - org.apache.flink - flink-streaming-java - ${project.version} - provided - - - org.apache.flink - flink-connector-elasticsearch7 - ${project.version} - - - - - - - org.apache.maven.plugins - maven-shade-plugin - - - Elasticsearch7SinkExample - package - - shade - - - Elasticsearch7SinkExample - - - com.google.code.findbugs:jsr305 - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - org.apache.flink.streaming.tests.Elasticsearch7SinkExample - - - - - - - - - - diff --git a/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java b/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java deleted file mode 100644 index 10a4bc785acb7..0000000000000 --- a/flink-end-to-end-tests/flink-elasticsearch7-test/src/main/java/org/apache/flink/streaming/tests/Elasticsearch7SinkExample.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.tests; - -import org.apache.flink.api.common.functions.FlatMapFunction; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder; -import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Collector; - -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Requests; - -import java.util.HashMap; -import java.util.Map; - -/** End to end test for Elasticsearch7Sink. */ -public class Elasticsearch7SinkExample { - - public static void main(String[] args) throws Exception { - - final ParameterTool parameterTool = ParameterTool.fromArgs(args); - - if (parameterTool.getNumberOfParameters() < 2) { - System.out.println( - "Missing parameters!\n" + "Usage: --numRecords --index "); - return; - } - - final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.enableCheckpointing(5000); - - DataStream> source = - env.fromSequence(0, parameterTool.getInt("numRecords") - 1) - .flatMap( - new FlatMapFunction>() { - @Override - public void flatMap( - Long value, Collector> out) { - final String key = String.valueOf(value); - final String message = "message #" + value; - out.collect(Tuple2.of(key, message + "update #1")); - out.collect(Tuple2.of(key, message + "update #2")); - } - }); - - ElasticsearchSink> sink = - new Elasticsearch7SinkBuilder>() - .setHosts(new HttpHost("127.0.0.1", 9200, "http")) - .setEmitter( - (element, ctx, indexer) -> { - indexer.add(createIndexRequest(element.f1, parameterTool)); - indexer.add(createUpdateRequest(element, parameterTool)); - }) - .setBulkFlushMaxActions(1) // emit after every element, don't buffer - .build(); - - source.sinkTo(sink); - env.execute("Elasticsearch 7.x end to end sink test example"); - } - - private static IndexRequest createIndexRequest(String element, ParameterTool parameterTool) { - Map json = new HashMap<>(); - json.put("data", element); - - return Requests.indexRequest() - .index(parameterTool.getRequired("index")) - .id(element) - .source(json); - } - - private static UpdateRequest createUpdateRequest( - Tuple2 element, ParameterTool parameterTool) { - Map json = new HashMap<>(); - json.put("data", element.f1); - - return new UpdateRequest(parameterTool.getRequired("index"), element.f0) - .doc(json) - .upsert(json); - } -} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index 7a1b3ba121f01..d2ca9322d5a4d 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -51,7 +51,6 @@ under the License. flink-stream-stateful-job-upgrade-test flink-queryable-state-test flink-local-recovery-and-allocation-test - flink-elasticsearch6-test flink-quickstart-test flink-confluent-schema-registry flink-stream-state-ttl-test @@ -69,7 +68,6 @@ under the License. flink-plugins-test flink-tpch-test flink-streaming-kinesis-test - flink-elasticsearch7-test flink-end-to-end-tests-common-kafka flink-tpcds-test flink-netty-shuffle-memory-control-test diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 0a8b044800d57..a9a78c92aef59 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -199,9 +199,6 @@ function run_group_2 { run_test "Netty shuffle direct memory consumption end-to-end test" "$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh" - run_test "Elasticsearch (v6.8.20) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.20.tar.gz" - run_test "Elasticsearch (v7.15.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 7 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.15.2-linux-x86_64.tar.gz" - run_test "Quickstarts Java nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh java" run_test "Quickstarts Scala nightly end-to-end test" "$END_TO_END_DIR/test-scripts/test_quickstarts.sh scala" diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh b/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh deleted file mode 100755 index e2ee273d43a32..0000000000000 --- a/flink-end-to-end-tests/test-scripts/test_streaming_elasticsearch.sh +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env bash -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -source "$(dirname "$0")"/common.sh -source "$(dirname "$0")"/elasticsearch-common.sh - -ELASTICSEARCH_VERSION=$1 -DOWNLOAD_URL=$2 - -mkdir -p $TEST_DATA_DIR - -setup_elasticsearch $DOWNLOAD_URL $ELASTICSEARCH_VERSION -wait_elasticsearch_working - -start_cluster - -function test_cleanup { - shutdown_elasticsearch_cluster index -} - -on_exit test_cleanup - -TEST_ES_JAR=${END_TO_END_DIR}/flink-elasticsearch${ELASTICSEARCH_VERSION}-test/target/Elasticsearch${ELASTICSEARCH_VERSION}SinkExample.jar - -# run the Flink job -$FLINK_DIR/bin/flink run -p 1 $TEST_ES_JAR \ - --numRecords 20 \ - --index index \ - --type type - -# 40 index requests and 20 final update requests -verify_result_line_number 60 index