From 328b7d195cb1a4d5ab86a1b891723e77c0e63140 Mon Sep 17 00:00:00 2001 From: MartijnVisser Date: Thu, 17 Mar 2022 09:45:27 +0100 Subject: [PATCH] [FLINK-26687][Connectors][NiFi] Remove Apache NiFi connector --- .../docs/connectors/datastream/nifi.md | 132 --------------- .../docs/connectors/datastream/overview.md | 1 - .../docs/connectors/datastream/nifi.md | 141 ---------------- .../docs/connectors/datastream/overview.md | 1 - .../pom.xml | 5 - flink-architecture-tests/pom.xml | 7 - flink-connectors/flink-connector-nifi/pom.xml | 82 ---------- .../connectors/nifi/NiFiDataPacket.java | 33 ---- .../nifi/NiFiDataPacketBuilder.java | 34 ---- .../streaming/connectors/nifi/NiFiSink.java | 79 --------- .../streaming/connectors/nifi/NiFiSource.java | 154 ------------------ .../nifi/StandardNiFiDataPacket.java | 44 ----- .../examples/NiFiSinkTopologyExample.java | 63 ------- .../examples/NiFiSourceTopologyExample.java | 60 ------- .../src/test/resources/NiFi_Flink.xml | 16 -- flink-connectors/pom.xml | 1 - tools/ci/stage.sh | 1 - 17 files changed, 854 deletions(-) delete mode 100644 docs/content.zh/docs/connectors/datastream/nifi.md delete mode 100644 docs/content/docs/connectors/datastream/nifi.md delete mode 100644 flink-connectors/flink-connector-nifi/pom.xml delete mode 100644 flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java delete mode 100644 flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java delete mode 100644 flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java delete mode 100644 flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java delete mode 100644 flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java delete mode 100644 flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java delete mode 100644 flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java delete mode 100644 flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml diff --git a/docs/content.zh/docs/connectors/datastream/nifi.md b/docs/content.zh/docs/connectors/datastream/nifi.md deleted file mode 100644 index d8e53e6ec59a4..0000000000000 --- a/docs/content.zh/docs/connectors/datastream/nifi.md +++ /dev/null @@ -1,132 +0,0 @@ ---- -title: NiFi -weight: 8 -type: docs -aliases: - - /zh/dev/connectors/nifi.html ---- - - -# Apache NiFi 连接器 - -{{< hint warning >}} -The NiFi connector is deprecated and will be removed with Flink 1.16. -{{< /hint >}} - -[Apache NiFi](https://nifi.apache.org/) 连接器提供了可以读取和写入的 Source 和 Sink。 -使用这个连接器,需要在工程中添加下面的依赖: - -{{< artifact flink-connector-nifi >}} - -注意这些连接器目前还没有包含在二进制发行版中。添加依赖、打包配置以及集群运行的相关信息请参考 [这里]({{< ref "docs/dev/datastream/project-configuration" >}})。 - -#### 安装 Apache NiFi - -安装 Apache NiFi 集群请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi)。 - -#### Apache NiFi Source - -该连接器提供了一个 Source 可以用来从 Apache NiFi 读取数据到 Apache Flink。 - -`NiFiSource(…)` 类有两个构造方法。 - -- `NiFiSource(SiteToSiteConfig config)` - 构造一个 `NiFiSource(…)` ,需要指定参数 SiteToSiteConfig ,采用默认的等待时间 1000 ms。 - -- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - 构造一个 `NiFiSource(…)`,需要指定参数 SiteToSiteConfig 和等待时间(单位为毫秒)。 - -示例: - -{{< tabs "44ccc35b-83c3-464f-9464-995d4981f4d9" >}} -{{< tab "Java" >}} -```java -StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - -SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data for Flink") - .requestBatchCount(5) - .buildConfig(); - -SourceFunction nifiSource = new NiFiSource(clientConfig); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() - -val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data for Flink") - .requestBatchCount(5) - .buildConfig() - -val nifiSource = new NiFiSource(clientConfig) -``` -{{< /tab >}} -{{< /tabs >}} - -数据从 Apache NiFi Output Port 读取,Apache NiFi Output Port 也被称为 "Data for Flink",是 Apache NiFi Site-to-site 协议配置的一部分。 - -#### Apache NiFi Sink - -该连接器提供了一个 Sink 可以用来把 Apache Flink 的数据写入到 Apache NiFi。 - -`NiFiSink(…)` 类只有一个构造方法。 - -- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` 构造一个 `NiFiSink(…)`,需要指定 `SiteToSiteConfig` 和 `NiFiDataPacketBuilder` 参数 ,`NiFiDataPacketBuilder` 可以将Flink数据转化成可以被NiFi识别的 `NiFiDataPacket`. - -示例: - -{{< tabs "599dbd31-e2a4-4203-a428-0a4c95c8fd07" >}} -{{< tab "Java" >}} -```java -StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - -SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data from Flink") - .requestBatchCount(5) - .buildConfig(); - -SinkFunction nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder() {...}); - -streamExecEnv.addSink(nifiSink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() - -val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data from Flink") - .requestBatchCount(5) - .buildConfig() - -val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder() {...}) - -streamExecEnv.addSink(nifiSink) -``` -{{< /tab >}} -{{< /tabs >}} - -更多关于 [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol 的信息请参考 [这里](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site)。 - -{{< top >}} diff --git a/docs/content.zh/docs/connectors/datastream/overview.md b/docs/content.zh/docs/connectors/datastream/overview.md index 6d08762541b60..911559f3aa2e7 100644 --- a/docs/content.zh/docs/connectors/datastream/overview.md +++ b/docs/content.zh/docs/connectors/datastream/overview.md @@ -45,7 +45,6 @@ under the License. * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink) * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink) * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source) - * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink) * [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source) * [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink) diff --git a/docs/content/docs/connectors/datastream/nifi.md b/docs/content/docs/connectors/datastream/nifi.md deleted file mode 100644 index 1c68ba2c24368..0000000000000 --- a/docs/content/docs/connectors/datastream/nifi.md +++ /dev/null @@ -1,141 +0,0 @@ ---- -title: NiFi -weight: 8 -type: docs -aliases: - - /dev/connectors/nifi.html ---- - - -# Apache NiFi Connector - -{{< hint warning >}} -The NiFi connector is deprecated and will be removed with Flink 1.16. -{{< /hint >}} - -This connector provides a Source and Sink that can read from and write to -[Apache NiFi](https://nifi.apache.org/). To use this connector, add the -following dependency to your project: - -{{< artifact flink-connector-nifi >}} - -Note that the streaming connectors are currently not part of the binary -distribution. See -[here]({{< ref "docs/dev/configuration/overview" >}}) -for information about how to package the program with the libraries for -cluster execution. - -#### Installing Apache NiFi - -Instructions for setting up a Apache NiFi cluster can be found -[here](https://nifi.apache.org/docs/nifi-docs/html/administration-guide.html#how-to-install-and-start-nifi). - -#### Apache NiFi Source - -The connector provides a Source for reading data from Apache NiFi to Apache Flink. - -The class `NiFiSource(…)` provides 2 constructors for reading data from NiFi. - -- `NiFiSource(SiteToSiteConfig config)` - Constructs a `NiFiSource(…)` given the client's SiteToSiteConfig and a - default wait time of 1000 ms. - -- `NiFiSource(SiteToSiteConfig config, long waitTimeMs)` - Constructs a `NiFiSource(…)` given the client's - SiteToSiteConfig and the specified wait time (in milliseconds). - -Example: - -{{< tabs "14bce3dd-fcc4-4c98-bdd8-ed7819b7f0c4" >}} -{{< tab "Java" >}} -```java -StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - -SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data for Flink") - .requestBatchCount(5) - .buildConfig(); - -SourceFunction nifiSource = new NiFiSource(clientConfig); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() - -val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data for Flink") - .requestBatchCount(5) - .buildConfig() - -val nifiSource = new NiFiSource(clientConfig) -``` -{{< /tab >}} -{{< /tabs >}} - -Here data is read from the Apache NiFi Output Port called "Data for Flink" which is part of Apache NiFi -Site-to-site protocol configuration. - -#### Apache NiFi Sink - -The connector provides a Sink for writing data from Apache Flink to Apache NiFi. - -The class `NiFiSink(…)` provides a constructor for instantiating a `NiFiSink`. - -- `NiFiSink(SiteToSiteClientConfig, NiFiDataPacketBuilder)` constructs a `NiFiSink(…)` given the client's `SiteToSiteConfig` and a `NiFiDataPacketBuilder` that converts data from Flink to `NiFiDataPacket` to be ingested by NiFi. - -Example: - -{{< tabs "bcf46513-edfb-4b41-b588-51009eb9f59a" >}} -{{< tab "Java" >}} -```java -StreamExecutionEnvironment streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment(); - -SiteToSiteClientConfig clientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data from Flink") - .requestBatchCount(5) - .buildConfig(); - -SinkFunction nifiSink = new NiFiSink<>(clientConfig, new NiFiDataPacketBuilder() {...}); - -streamExecEnv.addSink(nifiSink); -``` -{{< /tab >}} -{{< tab "Scala" >}} -```scala -val streamExecEnv = StreamExecutionEnvironment.getExecutionEnvironment() - -val clientConfig: SiteToSiteClientConfig = new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data from Flink") - .requestBatchCount(5) - .buildConfig() - -val nifiSink: NiFiSink[NiFiDataPacket] = new NiFiSink[NiFiDataPacket](clientConfig, new NiFiDataPacketBuilder() {...}) - -streamExecEnv.addSink(nifiSink) -``` -{{< /tab >}} -{{< /tabs >}} - -More information about [Apache NiFi](https://nifi.apache.org) Site-to-Site Protocol can be found [here](https://nifi.apache.org/docs/nifi-docs/html/user-guide.html#site-to-site) - -{{< top >}} diff --git a/docs/content/docs/connectors/datastream/overview.md b/docs/content/docs/connectors/datastream/overview.md index 42cf647f7744e..591cd894dde7b 100644 --- a/docs/content/docs/connectors/datastream/overview.md +++ b/docs/content/docs/connectors/datastream/overview.md @@ -46,7 +46,6 @@ Connectors provide code for interfacing with various third-party systems. Curren * [RabbitMQ]({{< ref "docs/connectors/datastream/rabbitmq" >}}) (source/sink) * [Google PubSub]({{< ref "docs/connectors/datastream/pubsub" >}}) (source/sink) * [Hybrid Source]({{< ref "docs/connectors/datastream/hybridsource" >}}) (source) - * [Apache NiFi]({{< ref "docs/connectors/datastream/nifi" >}}) (source/sink) * [Apache Pulsar]({{< ref "docs/connectors/datastream/pulsar" >}}) (source) * [JDBC]({{< ref "docs/connectors/datastream/jdbc" >}}) (sink) diff --git a/flink-architecture-tests/flink-architecture-tests-production/pom.xml b/flink-architecture-tests/flink-architecture-tests-production/pom.xml index ce5d53c87db64..2b858b0255812 100644 --- a/flink-architecture-tests/flink-architecture-tests-production/pom.xml +++ b/flink-architecture-tests/flink-architecture-tests-production/pom.xml @@ -182,11 +182,6 @@ under the License. flink-connector-kinesis - - org.apache.flink - flink-connector-nifi - - org.apache.flink flink-connector-pulsar diff --git a/flink-architecture-tests/pom.xml b/flink-architecture-tests/pom.xml index 243afe70afbbf..6fb1e7f075a56 100644 --- a/flink-architecture-tests/pom.xml +++ b/flink-architecture-tests/pom.xml @@ -231,13 +231,6 @@ under the License. test - - org.apache.flink - flink-connector-nifi - ${project.version} - test - - org.apache.flink flink-connector-pulsar diff --git a/flink-connectors/flink-connector-nifi/pom.xml b/flink-connectors/flink-connector-nifi/pom.xml deleted file mode 100644 index 0a284d9af27a9..0000000000000 --- a/flink-connectors/flink-connector-nifi/pom.xml +++ /dev/null @@ -1,82 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-connectors - 1.16-SNAPSHOT - .. - - - flink-connector-nifi - Flink : Connectors : Nifi - - jar - - - - 1.14.0 - - - - - org.apache.nifi - nifi-site-to-site-client - ${nifi.version} - - - org.apache.flink - flink-streaming-java - ${project.version} - provided - - - org.apache.flink - flink-streaming-java - ${project.version} - test - test-jar - - - org.apache.flink - flink-tests - ${project.version} - test - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - - 3 - - - - - - diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java deleted file mode 100644 index 01e77d33c022a..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacket.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi; - -import java.util.Map; - -/** - * The NiFiDataPacket provides a packaging around a NiFi FlowFile. It wraps both a FlowFile's - * content and its attributes so that they can be processed by Flink. - */ -public interface NiFiDataPacket { - - /** @return the contents of a NiFi FlowFile */ - byte[] getContent(); - - /** @return a Map of attributes that are associated with the NiFi FlowFile */ - Map getAttributes(); -} diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java deleted file mode 100644 index 492b0ec7a5ab5..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiDataPacketBuilder.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.RuntimeContext; - -import java.io.Serializable; - -/** - * A function that can create a NiFiDataPacket from an incoming instance of the given type. - * - * @param - */ -public interface NiFiDataPacketBuilder extends Function, Serializable { - - NiFiDataPacket createNiFiDataPacket(T t, RuntimeContext ctx); -} diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java deleted file mode 100644 index 0921c2808a74a..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSink.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; - -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; - -/** - * A sink that delivers data to Apache NiFi using the NiFi Site-to-Site client. The sink requires a - * NiFiDataPacketBuilder which can create instances of NiFiDataPacket from the incoming data. - * - * @deprecated The NiFi Sink has been deprecated and will be removed in a future Flink release. - */ -@Deprecated -public class NiFiSink extends RichSinkFunction { - - private SiteToSiteClient client; - private SiteToSiteClientConfig clientConfig; - private NiFiDataPacketBuilder builder; - - /** - * Construct a new NiFiSink with the given client config and NiFiDataPacketBuilder. - * - * @param clientConfig the configuration for building a NiFi SiteToSiteClient - * @param builder a builder to produce NiFiDataPackets from incoming data - */ - public NiFiSink(SiteToSiteClientConfig clientConfig, NiFiDataPacketBuilder builder) { - this.clientConfig = clientConfig; - this.builder = builder; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - this.client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - } - - @Override - public void invoke(T value) throws Exception { - final NiFiDataPacket niFiDataPacket = - builder.createNiFiDataPacket(value, getRuntimeContext()); - - final Transaction transaction = client.createTransaction(TransferDirection.SEND); - if (transaction == null) { - throw new IllegalStateException("Unable to create a NiFi Transaction to send data"); - } - - transaction.send(niFiDataPacket.getContent(), niFiDataPacket.getAttributes()); - transaction.confirm(); - transaction.complete(); - } - - @Override - public void close() throws Exception { - super.close(); - client.close(); - } -} diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java deleted file mode 100644 index a2ec6c73ff7cc..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/NiFiSource.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; - -import org.apache.nifi.remote.Transaction; -import org.apache.nifi.remote.TransferDirection; -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; -import org.apache.nifi.remote.protocol.DataPacket; -import org.apache.nifi.stream.io.StreamUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -/** - * A source that pulls data from Apache NiFi using the NiFi Site-to-Site client. This source - * produces NiFiDataPackets which encapsulate the content and attributes of a NiFi FlowFile. - * - * @deprecated The NiFi Source has been deprecated and will be removed in a future Flink release. - */ -@Deprecated -public class NiFiSource extends RichParallelSourceFunction { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = LoggerFactory.getLogger(NiFiSource.class); - - private static final long DEFAULT_WAIT_TIME_MS = 1000; - - // ------------------------------------------------------------------------ - - private final SiteToSiteClientConfig clientConfig; - - private final long waitTimeMs; - - private transient SiteToSiteClient client; - - private volatile boolean isRunning = true; - - /** - * Constructs a new NiFiSource using the given client config and the default wait time of 1000 - * ms. - * - * @param clientConfig the configuration for building a NiFi SiteToSiteClient - */ - public NiFiSource(SiteToSiteClientConfig clientConfig) { - this(clientConfig, DEFAULT_WAIT_TIME_MS); - } - - /** - * Constructs a new NiFiSource using the given client config and wait time. - * - * @param clientConfig the configuration for building a NiFi SiteToSiteClient - * @param waitTimeMs the amount of time to wait (in milliseconds) if no data is available to - * pull from NiFi - */ - public NiFiSource(SiteToSiteClientConfig clientConfig, long waitTimeMs) { - this.clientConfig = clientConfig; - this.waitTimeMs = waitTimeMs; - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - client = new SiteToSiteClient.Builder().fromConfig(clientConfig).build(); - } - - @Override - public void run(SourceContext ctx) throws Exception { - while (isRunning) { - final Transaction transaction = client.createTransaction(TransferDirection.RECEIVE); - if (transaction == null) { - LOG.warn("A transaction could not be created, waiting and will try again..."); - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ignored) { - - } - continue; - } - - DataPacket dataPacket = transaction.receive(); - if (dataPacket == null) { - transaction.confirm(); - transaction.complete(); - - LOG.debug("No data available to pull, waiting and will try again..."); - try { - Thread.sleep(waitTimeMs); - } catch (InterruptedException ignored) { - - } - continue; - } - - final List niFiDataPackets = new ArrayList<>(); - do { - // Read the data into a byte array and wrap it along with the attributes - // into a NiFiDataPacket. - final InputStream inStream = dataPacket.getData(); - final byte[] data = new byte[(int) dataPacket.getSize()]; - StreamUtils.fillBuffer(inStream, data); - - final Map attributes = dataPacket.getAttributes(); - - niFiDataPackets.add(new StandardNiFiDataPacket(data, attributes)); - dataPacket = transaction.receive(); - } while (dataPacket != null); - - // Confirm transaction to verify the data - transaction.confirm(); - - for (NiFiDataPacket dp : niFiDataPackets) { - ctx.collect(dp); - } - - transaction.complete(); - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public void close() throws Exception { - super.close(); - client.close(); - } -} diff --git a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java b/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java deleted file mode 100644 index c460f128bf174..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/main/java/org/apache/flink/streaming/connectors/nifi/StandardNiFiDataPacket.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi; - -import java.io.Serializable; -import java.util.Map; - -/** An implementation of NiFiDataPacket. */ -public class StandardNiFiDataPacket implements NiFiDataPacket, Serializable { - private static final long serialVersionUID = 6364005260220243322L; - - private final byte[] content; - private final Map attributes; - - public StandardNiFiDataPacket(final byte[] content, final Map attributes) { - this.content = content; - this.attributes = attributes; - } - - @Override - public byte[] getContent() { - return content; - } - - @Override - public Map getAttributes() { - return attributes; - } -} diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java deleted file mode 100644 index 9927c69a149f8..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSinkTopologyExample.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi.examples; - -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.streaming.api.datastream.DataStreamSink; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket; -import org.apache.flink.streaming.connectors.nifi.NiFiDataPacketBuilder; -import org.apache.flink.streaming.connectors.nifi.NiFiSink; -import org.apache.flink.streaming.connectors.nifi.StandardNiFiDataPacket; - -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; - -import java.util.HashMap; - -/** An example topology that sends data to a NiFi input port named "Data from Flink". */ -public class NiFiSinkTopologyExample { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SiteToSiteClientConfig clientConfig = - new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data from Flink") - .buildConfig(); - - DataStreamSink dataStream = - env.fromElements("one", "two", "three", "four", "five", "q") - .addSink( - new NiFiSink<>( - clientConfig, - new NiFiDataPacketBuilder() { - @Override - public NiFiDataPacket createNiFiDataPacket( - String s, RuntimeContext ctx) { - return new StandardNiFiDataPacket( - s.getBytes(ConfigConstants.DEFAULT_CHARSET), - new HashMap()); - } - })); - - env.execute(); - } -} diff --git a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java b/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java deleted file mode 100644 index 4d8dbfb979ae8..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/test/java/org/apache/flink/streaming/connectors/nifi/examples/NiFiSourceTopologyExample.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.nifi.examples; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.source.SourceFunction; -import org.apache.flink.streaming.connectors.nifi.NiFiDataPacket; -import org.apache.flink.streaming.connectors.nifi.NiFiSource; - -import org.apache.nifi.remote.client.SiteToSiteClient; -import org.apache.nifi.remote.client.SiteToSiteClientConfig; - -import java.nio.charset.Charset; - -/** An example topology that pulls data from a NiFi output port named "Data for Flink". */ -public class NiFiSourceTopologyExample { - - public static void main(String[] args) throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - SiteToSiteClientConfig clientConfig = - new SiteToSiteClient.Builder() - .url("http://localhost:8080/nifi") - .portName("Data for Flink") - .requestBatchCount(5) - .buildConfig(); - - SourceFunction nifiSource = new NiFiSource(clientConfig); - DataStream streamSource = env.addSource(nifiSource).setParallelism(2); - - DataStream dataStream = - streamSource.map( - new MapFunction() { - @Override - public String map(NiFiDataPacket value) throws Exception { - return new String(value.getContent(), Charset.defaultCharset()); - } - }); - - dataStream.print(); - env.execute(); - } -} diff --git a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml b/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml deleted file mode 100644 index d373d6369e4d1..0000000000000 --- a/flink-connectors/flink-connector-nifi/src/test/resources/NiFi_Flink.xml +++ /dev/null @@ -1,16 +0,0 @@ - - -