Skip to content

Commit

Permalink
[FLINK-34548][API] Introduce stream interface and move KeySelector to…
Browse files Browse the repository at this point in the history
… flink-core-api
  • Loading branch information
reswqa committed Apr 1, 2024
1 parent cedbcce commit 9fa74a8
Show file tree
Hide file tree
Showing 7 changed files with 525 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;

/** This interface represents a stream that each parallel task processes the same data. */
@Experimental
public interface BroadcastStream<T> extends DataStream {
/**
* Apply a two input operation to this and other {@link KeyedPartitionStream}.
*
* <p>Generally, concatenating {@link BroadcastStream} and {@link KeyedPartitionStream} will
* result in a {@link NonKeyedPartitionStream}, and you can manually generate a {@link
* KeyedPartitionStream} via keyBy partitioning. In some cases, you can guarantee that the
* partition on which the data is processed will not change, then you can use {@link
* #connectAndProcess(KeyedPartitionStream, TwoInputBroadcastStreamProcessFunction,
* KeySelector)} to avoid shuffling.
*
* @param other {@link KeyedPartitionStream} to perform operation with two input.
* @param processFunction to perform operation.
* @return new stream with this operation.
*/
<K, T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
KeyedPartitionStream<K, T_OTHER> other,
TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction);

/**
* Apply a two input operation to this and other {@link NonKeyedPartitionStream}.
*
* @param other {@link NonKeyedPartitionStream} to perform operation with two input.
* @param processFunction to perform operation.
* @return new stream with this operation.
*/
<T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
NonKeyedPartitionStream<T_OTHER> other,
TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction);

/**
* Apply a two input operation to this and other {@link KeyedPartitionStream}.
*
* <p>This method is used to avoid shuffle after applying the process function. It is required
* that for the record from non-broadcast input, the new {@link KeySelector} must extract the
* same key as the original {@link KeySelector}s on the {@link KeyedPartitionStream}. Otherwise,
* the partition of data will be messy. As for the record from broadcast input, the output key
* from keyed partition itself instead of the new key selector, so the data it outputs will not
* affect the partition.
*
* @param other {@link KeyedPartitionStream} to perform operation with two input.
* @param processFunction to perform operation.
* @param newKeySelector to select the key after process.
* @return new {@link KeyedPartitionStream} with this operation.
*/
<K, T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
KeyedPartitionStream<K, T_OTHER> other,
TwoInputBroadcastStreamProcessFunction<T_OTHER, T, OUT> processFunction,
KeySelector<OUT, K> newKeySelector);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;

/** This is the topmost base interface of all streams of DataStream V2 API. */
@Experimental
public interface DataStream {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;

/** This interface represents a stream that force single parallelism. */
@Experimental
public interface GlobalStream<T> extends DataStream {
/**
* Apply an operation to this {@link GlobalStream}.
*
* @param processFunction to perform operation.
* @return new stream with this operation.
*/
<OUT> GlobalStream<OUT> process(OneInputStreamProcessFunction<T, OUT> processFunction);

/**
* Apply a two output operation to this {@link GlobalStream}.
*
* @param processFunction to perform two output operation.
* @return new stream with this operation.
*/
<OUT1, OUT2> TwoGlobalStreams<OUT1, OUT2> process(
TwoOutputStreamProcessFunction<T, OUT1, OUT2> processFunction);

/**
* Apply a two input operation to this and other {@link GlobalStream}.
*
* @param other {@link GlobalStream} to perform operation with two input.
* @param processFunction to perform operation.
* @return new stream with this operation.
*/
<T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
GlobalStream<T_OTHER> other,
TwoInputNonBroadcastStreamProcessFunction<T, T_OTHER, OUT> processFunction);

/**
* Transform this stream to a {@link KeyedPartitionStream}.
*
* @param keySelector to decide how to map data to partition.
* @return the transformed stream partitioned by key.
*/
<K> KeyedPartitionStream<K, T> keyBy(KeySelector<T, K> keySelector);

/**
* Transform this stream to a new {@link NonKeyedPartitionStream}, data will be shuffled between
* these two streams.
*
* @return the transformed stream after shuffle.
*/
NonKeyedPartitionStream<T> shuffle();

/**
* Transform this stream to a new {@link BroadcastStream}.
*
* @return the transformed {@link BroadcastStream}.
*/
BroadcastStream<T> broadcast();

// TODO add toSink method.

/**
* This class represents a combination of two {@link GlobalStream}. It will be used as the
* return value of operation with two output.
*/
@Experimental
interface TwoGlobalStreams<T1, T2> {
/** Get the first stream. */
GlobalStream<T1> getFirst();

/** Get the second stream. */
GlobalStream<T2> getSecond();
}
}
Loading

0 comments on commit 9fa74a8

Please sign in to comment.