Skip to content

Commit

Permalink
[FLINK-34548][API] Supports sink-v2 Sink
Browse files Browse the repository at this point in the history
  • Loading branch information
reswqa committed Apr 1, 2024
1 parent 056660e commit 2876249
Show file tree
Hide file tree
Showing 16 changed files with 711 additions and 6 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.dsv2;

import org.apache.flink.annotation.Experimental;

/**
* Sink interface for DataStream api v2.
*
* <p>Note that this interface is just a placeholder because we haven't decided whether to use
* sink-v2 based sink or design a new sink connector API for DataStream V2.
*/
@Experimental
public interface Sink<T> {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.dsv2;

import org.apache.flink.annotation.Experimental;

/** Utils to create the DataStream V2 supported {@link Sink}. */
@Experimental
public class DataStreamV2SinkUtils {
/**
* Wrap a sink-v2 based sink to a DataStream V2 supported sink.
*
* @param sink The sink-v2 based sink to wrap.
* @return The DataStream V2 supported sink.
*/
public static <T> Sink<T> wrapSink(org.apache.flink.api.connector.sink2.Sink<T> sink) {
return new WrappedSink<>(sink);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.connector.dsv2;

import org.apache.flink.annotation.Internal;

/** A simple {@link Sink} implementation that wrap a sink-v2 based sink. */
@Internal
public class WrappedSink<T> implements Sink<T> {
org.apache.flink.api.connector.sink2.Sink<T> wrappedSink;

public WrappedSink(org.apache.flink.api.connector.sink2.Sink<T> wrappedSink) {
this.wrappedSink = wrappedSink;
}

public org.apache.flink.api.connector.sink2.Sink<T> getWrappedSink() {
return wrappedSink;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputNonBroadcastStreamProcessFunction;
Expand Down Expand Up @@ -78,7 +79,7 @@ <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
*/
BroadcastStream<T> broadcast();

// TODO add toSink method.
void toSink(Sink<T> sink);

/**
* This class represents a combination of two {@link GlobalStream}. It will be used as the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
Expand Down Expand Up @@ -194,7 +195,7 @@ <T_OTHER, OUT> KeyedPartitionStream<K, OUT> connectAndProcess(
*/
BroadcastStream<T> broadcast();

// TODO add toSink method.
void toSink(Sink<T> sink);

/**
* This class represents a combination of two {@link KeyedPartitionStream}. It will be used as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.api.stream;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction;
import org.apache.flink.datastream.api.function.TwoInputBroadcastStreamProcessFunction;
Expand Down Expand Up @@ -100,7 +101,7 @@ <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
*/
BroadcastStream<T> broadcast();

// TODO add toSink method.
void toSink(Sink<T> sink);

/**
* This interface represents a combination of two {@link NonKeyedPartitionStream}. It will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.runtime.translators.DataStreamV2SinkTransformationTranslator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -87,6 +88,18 @@ public class ExecutionEnvironmentImpl implements ExecutionEnvironment {
*/
private static ExecutionEnvironmentFactory contextEnvironmentFactory = null;

static {
try {
// All transformation translator must be put to a map in StreamGraphGenerator, but
// streaming-java is not depend on process-function module, using reflect to handle
// this.
DataStreamV2SinkTransformationTranslator.registerSinkTransformationTranslator();
} catch (Exception e) {
throw new RuntimeException(
"Can not register process function transformation translator.");
}
}

/**
* Create and return an instance of {@link ExecutionEnvironment}.
*
Expand Down Expand Up @@ -202,6 +215,10 @@ public void setParallelism(int parallelism) {
executionConfig.setParallelism(parallelism);
}

public CheckpointConfig getCheckpointCfg() {
return checkpointCfg;
}

// -----------------------------------------------
// Internal Methods
// -----------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -36,6 +37,7 @@
import org.apache.flink.datastream.impl.utils.StreamUtils;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
Expand Down Expand Up @@ -98,7 +100,14 @@ public <T_OTHER, OUT> GlobalStream<OUT> connectAndProcess(
return new GlobalStreamImpl<>(environment, outTransformation);
}

// TODO add toSink method.
@Override
public void toSink(Sink<T> sink) {
DataStreamV2SinkTransformation<T, T> sinkTransformation =
StreamUtils.addSinkOperator(this, sink, getType());
// Operator parallelism should always be 1 for global stream.
// parallelismConfigured should be true to avoid overwritten by AdaptiveBatchScheduler.
sinkTransformation.setParallelism(1, true);
}

// ---------------------
// Partitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -325,7 +326,10 @@ public KeySelector<V, K> getKeySelector() {
return keySelector;
}

// TODO add toSink method.
@Override
public void toSink(Sink<V> sink) {
StreamUtils.addSinkOperator(this, sink, getType());
}

// ---------------------
// Partitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.datastream.impl.stream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down Expand Up @@ -132,7 +133,10 @@ public <T_OTHER, OUT> NonKeyedPartitionStream<OUT> connectAndProcess(
return new NonKeyedPartitionStreamImpl<>(environment, outTransformation);
}

// TODO add toSink method.
@Override
public void toSink(Sink<T> sink) {
StreamUtils.addSinkOperator(this, sink, getType());
}

// ---------------------
// Partitioning
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.datastream.impl.utils;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.dsv2.Sink;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
Expand All @@ -29,10 +31,13 @@
import org.apache.flink.datastream.api.function.TwoOutputStreamProcessFunction;
import org.apache.flink.datastream.impl.stream.AbstractDataStream;
import org.apache.flink.datastream.impl.stream.KeyedPartitionStreamImpl;
import org.apache.flink.datastream.impl.stream.NonKeyedPartitionStreamImpl;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.DataStreamV2SinkTransformation;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;

Expand Down Expand Up @@ -228,4 +233,56 @@ public static <IN1, IN2, OUT> TwoInputTransformation<IN1, IN2, OUT> getTwoInputT

return transform;
}

/** Construct and return a new DataStream with one input operator. */
public static <T, R> AbstractDataStream<R> transformOneInputOperator(
String operatorName,
AbstractDataStream<T> inputStream,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
inputStream.getTransformation().getOutputType();

OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
inputStream.getTransformation(),
operatorName,
operatorFactory,
outTypeInfo,
inputStream.getEnvironment().getParallelism(),
false);

NonKeyedPartitionStreamImpl<R> returnStream =
new NonKeyedPartitionStreamImpl<>(inputStream.getEnvironment(), resultTransform);

inputStream.getEnvironment().addOperator(resultTransform);

return returnStream;
}

/** Add sink operator to the input stream. */
public static <T> DataStreamV2SinkTransformation<T, T> addSinkOperator(
AbstractDataStream<T> inputStream, Sink<T> sink, TypeInformation<T> typeInformation) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
inputStream.getTransformation().getOutputType();

if (!(sink instanceof WrappedSink)) {
throw new UnsupportedOperationException(
"Unsupported type of sink, please use DataStreamV2SinkUtils to wrap a sink-v2 sink first.");
}

org.apache.flink.api.connector.sink2.Sink<T> innerSink =
((WrappedSink<T>) sink).getWrappedSink();

DataStreamV2SinkTransformation<T, T> sinkTransformation =
new DataStreamV2SinkTransformation<>(
inputStream,
innerSink,
typeInformation,
"Sink",
inputStream.getEnvironment().getParallelism(),
false);
inputStream.getEnvironment().addOperator(sinkTransformation);
return sinkTransformation;
}
}
Loading

0 comments on commit 2876249

Please sign in to comment.