From 9fa74a8a7061af3c518853fc10bfa7d7abf5ea6d Mon Sep 17 00:00:00 2001 From: Weijie Guo Date: Thu, 21 Mar 2024 18:39:50 +0800 Subject: [PATCH] [FLINK-34548][API] Introduce stream interface and move KeySelector to flink-core-api --- .../flink/api/java/functions/KeySelector.java | 0 .../api/stream/BroadcastStream.java | 76 +++++++ .../datastream/api/stream/DataStream.java | 25 +++ .../datastream/api/stream/GlobalStream.java | 95 ++++++++ .../api/stream/KeyedPartitionStream.java | 211 ++++++++++++++++++ .../api/stream/NonKeyedPartitionStream.java | 117 ++++++++++ pom.xml | 1 + 7 files changed, 525 insertions(+) rename {flink-core => flink-core-api}/src/main/java/org/apache/flink/api/java/functions/KeySelector.java (100%) create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/BroadcastStream.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/DataStream.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java create mode 100644 flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java diff --git a/flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java b/flink-core-api/src/main/java/org/apache/flink/api/java/functions/KeySelector.java similarity index 100% rename from flink-core/src/main/java/org/apache/flink/api/java/functions/KeySelector.java rename to flink-core-api/src/main/java/org/apache/flink/api/java/functions/KeySelector.java diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/BroadcastStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/BroadcastStream.java new file mode 100644 index 0000000000000..f8856df23264d --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/BroadcastStream.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.stream; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; + +/** This interface represents a stream that each parallel task processes the same data. */ +@Experimental +public interface BroadcastStream extends DataStream { + /** + * Apply a two input operation to this and other {@link KeyedPartitionStream}. + * + *

Generally, concatenating {@link BroadcastStream} and {@link KeyedPartitionStream} will + * result in a {@link NonKeyedPartitionStream}, and you can manually generate a {@link + * KeyedPartitionStream} via keyBy partitioning. In some cases, you can guarantee that the + * partition on which the data is processed will not change, then you can use {@link + * #connectAndProcess(KeyedPartitionStream, TwoInputBroadcastStreamProcessFunction, + * KeySelector)} to avoid shuffling. + * + * @param other {@link KeyedPartitionStream} to perform operation with two input. + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + NonKeyedPartitionStream connectAndProcess( + KeyedPartitionStream other, + TwoInputBroadcastStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link NonKeyedPartitionStream}. + * + * @param other {@link NonKeyedPartitionStream} to perform operation with two input. + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + NonKeyedPartitionStream connectAndProcess( + NonKeyedPartitionStream other, + TwoInputBroadcastStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link KeyedPartitionStream}. + * + *

This method is used to avoid shuffle after applying the process function. It is required + * that for the record from non-broadcast input, the new {@link KeySelector} must extract the + * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. Otherwise, + * the partition of data will be messy. As for the record from broadcast input, the output key + * from keyed partition itself instead of the new key selector, so the data it outputs will not + * affect the partition. + * + * @param other {@link KeyedPartitionStream} to perform operation with two input. + * @param processFunction to perform operation. + * @param newKeySelector to select the key after process. + * @return new {@link KeyedPartitionStream} with this operation. + */ + KeyedPartitionStream connectAndProcess( + KeyedPartitionStream other, + TwoInputBroadcastStreamProcessFunction processFunction, + KeySelector newKeySelector); +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/DataStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/DataStream.java new file mode 100644 index 0000000000000..c782b374c4041 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/DataStream.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.stream; + +import org.apache.flink.annotation.Experimental; + +/** This is the topmost base interface of all streams of DataStream V2 API. */ +@Experimental +public interface DataStream {} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java new file mode 100644 index 0000000000000..cb8e0b83dd7c4 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/GlobalStream.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.stream; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; + +/** This interface represents a stream that force single parallelism. */ +@Experimental +public interface GlobalStream extends DataStream { + /** + * Apply an operation to this {@link GlobalStream}. + * + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + GlobalStream process(OneInputStreamProcessFunction processFunction); + + /** + * Apply a two output operation to this {@link GlobalStream}. + * + * @param processFunction to perform two output operation. + * @return new stream with this operation. + */ + TwoGlobalStreams process( + TwoOutputStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link GlobalStream}. + * + * @param other {@link GlobalStream} to perform operation with two input. + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + GlobalStream connectAndProcess( + GlobalStream other, + TwoInputNonBroadcastStreamProcessFunction processFunction); + + /** + * Transform this stream to a {@link KeyedPartitionStream}. + * + * @param keySelector to decide how to map data to partition. + * @return the transformed stream partitioned by key. + */ + KeyedPartitionStream keyBy(KeySelector keySelector); + + /** + * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between + * these two streams. + * + * @return the transformed stream after shuffle. + */ + NonKeyedPartitionStream shuffle(); + + /** + * Transform this stream to a new {@link BroadcastStream}. + * + * @return the transformed {@link BroadcastStream}. + */ + BroadcastStream broadcast(); + + // TODO add toSink method. + + /** + * This class represents a combination of two {@link GlobalStream}. It will be used as the + * return value of operation with two output. + */ + @Experimental + interface TwoGlobalStreams { + /** Get the first stream. */ + GlobalStream getFirst(); + + /** Get the second stream. */ + GlobalStream getSecond(); + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java new file mode 100644 index 0000000000000..40e15f08646e8 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/KeyedPartitionStream.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.stream; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream.TwoNonKeyedPartitionStreams; + +/** + * This interface represents a kind of partitioned data stream. For this stream, each key is a + * partition, and the partition to which the data belongs is deterministic. + */ +@Experimental +public interface KeyedPartitionStream extends DataStream { + /** + * Apply an operation to this {@link KeyedPartitionStream}. + * + *

This method is used to avoid shuffle after applying the process function. It is required + * that for the same record, the new {@link KeySelector} must extract the same key as the + * original {@link KeySelector} on this {@link KeyedPartitionStream}. Otherwise, the partition + * of data will be messy. + * + * @param processFunction to perform operation. + * @param newKeySelector to select the key after process. + * @return new {@link KeyedPartitionStream} with this operation. + */ + KeyedPartitionStream process( + OneInputStreamProcessFunction processFunction, + KeySelector newKeySelector); + + /** + * Apply an operation to this {@link KeyedPartitionStream}. + * + *

Generally, apply an operation to a {@link KeyedPartitionStream} will result in a {@link + * NonKeyedPartitionStream}, and you can manually generate a {@link KeyedPartitionStream} via + * keyBy partitioning. In some cases, you can guarantee that the partition on which the data is + * processed will not change, then you can use {@link #process(OneInputStreamProcessFunction, + * KeySelector)} to avoid shuffling. + * + * @param processFunction to perform operation. + * @return new {@link NonKeyedPartitionStream} with this operation. + */ + NonKeyedPartitionStream process( + OneInputStreamProcessFunction processFunction); + + /** + * Apply a two output operation to this {@link KeyedPartitionStream}. + * + *

This method is used to avoid shuffle after applying the process function. It is required + * that for the same record, these new two {@link KeySelector}s must extract the same key as the + * original {@link KeySelector}s on this {@link KeyedPartitionStream}. Otherwise, the partition + * of data will be messy. + * + * @param processFunction to perform two output operation. + * @param keySelector1 to select the key of first output. + * @param keySelector2 to select the key of second output. + * @return new {@link TwoKeyedPartitionStreams} with this operation. + */ + TwoKeyedPartitionStreams process( + TwoOutputStreamProcessFunction processFunction, + KeySelector keySelector1, + KeySelector keySelector2); + + /** + * Apply a two output operation to this {@link KeyedPartitionStream}. + * + * @param processFunction to perform two output operation. + * @return new {@link TwoNonKeyedPartitionStreams} with this operation. + */ + TwoNonKeyedPartitionStreams process( + TwoOutputStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link KeyedPartitionStream}. The two keyed + * streams must have the same partitions, otherwise it makes no sense to connect them. + * + *

Generally, concatenating two {@link KeyedPartitionStream} will result in a {@link + * NonKeyedPartitionStream}, and you can manually generate a {@link KeyedPartitionStream} via + * keyBy partitioning. In some cases, you can guarantee that the partition on which the data is + * processed will not change, then you can use {@link #connectAndProcess(KeyedPartitionStream, + * TwoInputNonBroadcastStreamProcessFunction, KeySelector)} to avoid shuffling. + * + * @param other {@link KeyedPartitionStream} to perform operation with two input. + * @param processFunction to perform operation. + * @return new {@link NonKeyedPartitionStream} with this operation. + */ + NonKeyedPartitionStream connectAndProcess( + KeyedPartitionStream other, + TwoInputNonBroadcastStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link KeyedPartitionStream}.The two keyed + * streams must have the same partitions, otherwise it makes no sense to connect them. + * + *

This method is used to avoid shuffle after applying the process function. It is required + * that for the same record, the new {@link KeySelector} must extract the same key as the + * original {@link KeySelector}s on these two {@link KeyedPartitionStream}s. Otherwise, the + * partition of data will be messy. + * + * @param other {@link KeyedPartitionStream} to perform operation with two input. + * @param processFunction to perform operation. + * @param newKeySelector to select the key after process. + * @return new {@link KeyedPartitionStream} with this operation. + */ + KeyedPartitionStream connectAndProcess( + KeyedPartitionStream other, + TwoInputNonBroadcastStreamProcessFunction processFunction, + KeySelector newKeySelector); + + /** + * Apply a two input operation to this and other {@link BroadcastStream}. + * + *

Generally, concatenating {@link KeyedPartitionStream} and {@link BroadcastStream} will + * result in a {@link NonKeyedPartitionStream}, and you can manually generate a {@link + * KeyedPartitionStream} via keyBy partitioning. In some cases, you can guarantee that the + * partition on which the data is processed will not change, then you can use {@link + * #connectAndProcess(BroadcastStream, TwoInputBroadcastStreamProcessFunction, KeySelector)} to + * avoid shuffling. + * + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + NonKeyedPartitionStream connectAndProcess( + BroadcastStream other, + TwoInputBroadcastStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link BroadcastStream}. + * + *

This method is used to avoid shuffle after applying the process function. It is required + * that for the record from non-broadcast input, the new {@link KeySelector} must extract the + * same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. Otherwise, + * the partition of data will be messy. As for the record from broadcast input, the output key + * from keyed partition itself instead of the new key selector, so the data it outputs will not + * affect the partition. + * + * @param other {@link BroadcastStream} to perform operation with two input. + * @param processFunction to perform operation. + * @param newKeySelector to select the key after process. + * @return new {@link KeyedPartitionStream} with this operation. + */ + KeyedPartitionStream connectAndProcess( + BroadcastStream other, + TwoInputBroadcastStreamProcessFunction processFunction, + KeySelector newKeySelector); + + /** + * Coalesce this stream to a {@link GlobalStream}. + * + * @return the coalesced global stream. + */ + GlobalStream global(); + + /** + * Transform this stream to a new {@link KeyedPartitionStream}. + * + * @param keySelector to decide how to map data to partition. + * @return the transformed stream partitioned by key. + */ + KeyedPartitionStream keyBy(KeySelector keySelector); + + /** + * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between + * these two streams. + * + * @return the transformed stream after shuffle. + */ + NonKeyedPartitionStream shuffle(); + + /** + * Transform this stream to a new {@link BroadcastStream}. + * + * @return the transformed {@link BroadcastStream}. + */ + BroadcastStream broadcast(); + + // TODO add toSink method. + + /** + * This class represents a combination of two {@link KeyedPartitionStream}. It will be used as + * the return value of operation with two output. + */ + @Experimental + interface TwoKeyedPartitionStreams { + /** Get the first stream. */ + KeyedPartitionStream getFirst(); + + /** Get the second stream. */ + KeyedPartitionStream getSecond(); + } +} diff --git a/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java new file mode 100644 index 0000000000000..dd4a1ee2855d6 --- /dev/null +++ b/flink-datastream-api/src/main/java/org/apache/flink/datastream/api/stream/NonKeyedPartitionStream.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.datastream.api.stream; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction; +import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction; + +/** + * This interface represents a kind of partitioned data stream. For this stream, each parallelism is + * a partition, and the partition to which the data belongs is random. + */ +@Experimental +public interface NonKeyedPartitionStream extends DataStream { + /** + * Apply an operation to this {@link NonKeyedPartitionStream}. + * + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + NonKeyedPartitionStream process( + OneInputStreamProcessFunction processFunction); + + /** + * Apply a two output operation to this {@link NonKeyedPartitionStream}. + * + * @param processFunction to perform two output operation. + * @return new stream with this operation. + */ + TwoNonKeyedPartitionStreams process( + TwoOutputStreamProcessFunction processFunction); + + /** + * Apply to a two input operation on this and other {@link NonKeyedPartitionStream}. + * + * @param other {@link NonKeyedPartitionStream} to perform operation with two input. + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + NonKeyedPartitionStream connectAndProcess( + NonKeyedPartitionStream other, + TwoInputNonBroadcastStreamProcessFunction processFunction); + + /** + * Apply a two input operation to this and other {@link BroadcastStream}. + * + * @param processFunction to perform operation. + * @return new stream with this operation. + */ + NonKeyedPartitionStream connectAndProcess( + BroadcastStream other, + TwoInputBroadcastStreamProcessFunction processFunction); + + /** + * Coalesce this stream to a {@link GlobalStream}. + * + * @return the coalesced global stream. + */ + GlobalStream global(); + + /** + * Transform this stream to a {@link KeyedPartitionStream}. + * + * @param keySelector to decide how to map data to partition. + * @return the transformed stream partitioned by key. + */ + KeyedPartitionStream keyBy(KeySelector keySelector); + + /** + * Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between + * these two streams. + * + * @return the transformed stream after shuffle. + */ + NonKeyedPartitionStream shuffle(); + + /** + * Transform this stream to a new {@link BroadcastStream}. + * + * @return the transformed {@link BroadcastStream}. + */ + BroadcastStream broadcast(); + + // TODO add toSink method. + + /** + * This interface represents a combination of two {@link NonKeyedPartitionStream}. It will be + * used as the return value of operation with two output. + */ + @Experimental + interface TwoNonKeyedPartitionStreams { + /** Get the first stream. */ + NonKeyedPartitionStream getFirst(); + + /** Get the second stream. */ + NonKeyedPartitionStream getSecond(); + } +} diff --git a/pom.xml b/pom.xml index aa8eaa4bbc61c..e6d167129e475 100644 --- a/pom.xml +++ b/pom.xml @@ -2363,6 +2363,7 @@ under the License. @org.apache.flink.annotation.Internal org.apache.flink.api.common.functions.Function + org.apache.flink.api.java.functions.KeySelector public