Skip to content

Commit

Permalink
[FLINK-25575][streaming] Switch the implementation of Sink operator t…
Browse files Browse the repository at this point in the history
…o V2.

Sink will now consist of two operators: WriterOperator and CommitterOperator. They are chained by default in streaming mode and behave like Sink V1 in respect to sharing committable instances.
State handling of the writer is similar to V1, however, more logic is being placed inside the SinkWriterStateHandler.
Committer state is being entirely outsourced to the previously introduced CommittableCollector.

The SinkTransformationTranslator is now responsible to dynamically
expand the one sink transformation to multiple transformation to ensure
that all topologies are properly instantiated.

Co-authored-by: Arvid Heise <[email protected]>
  • Loading branch information
fapaul and Arvid Heise committed Feb 2, 2022
1 parent 053088d commit bb92bc0
Show file tree
Hide file tree
Showing 52 changed files with 1,429 additions and 3,065 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ org.apache.flink.streaming.api.graph.StreamConfig$SourceInputConfig does not sat
org.apache.flink.streaming.api.graph.StreamGraphHasher does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.graph.StreamGraphHasherV2 does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.graph.StreamGraphUserHashHasher does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.graph.TransformationTranslator$Context does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.operators.BackendRestorerProcedure does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.operators.CountingOutput does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
org.apache.flink.streaming.api.operators.InputSelection$Builder does not satisfy: annotated with @Internal or annotated with @Experimental or annotated with @PublicEvolving or annotated with @Public or annotated with @Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.runtime.operators.sink.SinkOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Column;
Expand Down Expand Up @@ -241,16 +242,19 @@ public void testBufferedTableSink() {
sinkProvider.consumeDataStream(env.fromElements(new BinaryRowData(1)));
final StreamOperatorFactory<?> sinkOperatorFactory =
env.getStreamGraph().getStreamNodes().stream()
.filter(n -> n.getOperatorName().contains("Sink"))
.filter(n -> n.getOperatorName().contains("Writer"))
.findFirst()
.orElseThrow(
() ->
new RuntimeException(
"Expected operator with name Sink in stream graph."))
.getOperatorFactory();
assertThat(sinkOperatorFactory, instanceOf(SinkOperatorFactory.class));
assertThat(sinkOperatorFactory, instanceOf(SinkWriterOperatorFactory.class));
org.apache.flink.api.connector.sink2.Sink sink =
((SinkWriterOperatorFactory) sinkOperatorFactory).getSink();
assertThat(sink, instanceOf(SinkV1Adapter.PlainSinkAdapter.class));
assertThat(
((SinkOperatorFactory) sinkOperatorFactory).getSink(),
((SinkV1Adapter.PlainSinkAdapter) sink).getSink(),
instanceOf(ReducingUpsertSink.class));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.Utils;
import org.apache.flink.api.java.functions.KeySelector;
Expand Down Expand Up @@ -71,7 +71,6 @@
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.operators.collect.ClientAndIterator;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
Expand Down Expand Up @@ -1240,12 +1239,22 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}

StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));
return DataStreamSink.forSinkFunction(this, clean(sinkFunction));
}

DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
/**
* Adds the given {@link Sink} to this DataStream. Only streams with sinks added will be
* executed once the {@link StreamExecutionEnvironment#execute()} method is called.
*
* @param sink The user defined sink.
* @return The closed DataStream.
*/
@Experimental
public DataStreamSink<T> sinkTo(org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

getExecutionEnvironment().addOperator(sink.getLegacyTransformation());
return sink;
return DataStreamSink.forSinkV1(this, sink);
}

/**
Expand All @@ -1255,12 +1264,12 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
* @param sink The user defined sink.
* @return The closed DataStream.
*/
@Experimental
public DataStreamSink<T> sinkTo(Sink<T, ?, ?, ?> sink) {
@PublicEvolving
public DataStreamSink<T> sinkTo(Sink<T> sink) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();

return new DataStreamSink<>(this, sink);
return DataStreamSink.forSink(this, sink);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,18 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.operators.SlotSharingGroup;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A Stream Sink. This is used for emitting elements from a streaming topology.
Expand All @@ -40,27 +45,43 @@ public class DataStreamSink<T> {

private final PhysicalTransformation<T> transformation;

@SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
this.transformation =
(PhysicalTransformation<T>)
new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Unnamed",
operator,
inputStream.getExecutionEnvironment().getParallelism());
}

@SuppressWarnings("unchecked")
protected DataStreamSink(DataStream<T> inputStream, Sink<T, ?, ?, ?> sink) {
transformation =
(PhysicalTransformation<T>)
new SinkTransformation<>(
inputStream.getTransformation(),
sink,
"Unnamed",
inputStream.getExecutionEnvironment().getParallelism());
inputStream.getExecutionEnvironment().addOperator(transformation);
protected DataStreamSink(PhysicalTransformation<T> transformation) {
this.transformation = checkNotNull(transformation);
}

static <T> DataStreamSink<T> forSinkFunction(
DataStream<T> inputStream, SinkFunction<T> sinkFunction) {
StreamSink<T> sinkOperator = new StreamSink<>(sinkFunction);
final StreamExecutionEnvironment executionEnvironment =
inputStream.getExecutionEnvironment();
PhysicalTransformation<T> transformation =
new LegacySinkTransformation<>(
inputStream.getTransformation(),
"Unnamed",
sinkOperator,
executionEnvironment.getParallelism());
executionEnvironment.addOperator(transformation);
return new DataStreamSink<>(transformation);
}

static <T> DataStreamSink<T> forSink(DataStream<T> inputStream, Sink<T> sink) {
final StreamExecutionEnvironment executionEnvironment =
inputStream.getExecutionEnvironment();
SinkTransformation<T, T> transformation =
new SinkTransformation<>(
inputStream,
sink,
inputStream.getType(),
"Sink",
executionEnvironment.getParallelism());
executionEnvironment.addOperator(transformation);
return new DataStreamSink<>(transformation);
}

@Internal
public static <T> DataStreamSink<T> forSinkV1(
DataStream<T> inputStream, org.apache.flink.api.connector.sink.Sink<T, ?, ?, ?> sink) {
return forSink(inputStream, SinkV1Adapter.wrap(sink));
}

/** Returns the transformation that contains the actual sink operator of this sink. */
Expand Down Expand Up @@ -131,6 +152,10 @@ public DataStreamSink<T> uid(String uid) {
*/
@PublicEvolving
public DataStreamSink<T> setUidHash(String uidHash) {
if (!(transformation instanceof LegacySinkTransformation)) {
throw new UnsupportedOperationException(
"Cannot set a custom UID hash on a non-legacy sink");
}
transformation.setUidHash(uidHash);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2159,7 +2159,10 @@ private StreamGraphGenerator getStreamGraphGenerator(List<Transformation<?>> tra
"No operators defined in streaming topology. Cannot execute.");
}

return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
// We copy the transformation so that newly added transformations cannot intervene with the
// stream graph generation.
return new StreamGraphGenerator(
new ArrayList<>(transformations), config, checkpointCfg, configuration)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
.setSavepointDir(defaultSavepointDirectory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -925,5 +925,10 @@ public long getDefaultBufferTimeout() {
public ReadableConfig getGraphGeneratorConfig() {
return config;
}

@Override
public Collection<Integer> transform(Transformation<?> transformation) {
return streamGraphGenerator.transform(transformation);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public interface TransformationTranslator<OUT, T extends Transformation<OUT>> {
Collection<Integer> translateForStreaming(final T transformation, final Context context);

/** A context giving the necessary information for the translation of a given transformation. */
@Internal
interface Context {

/**
Expand All @@ -84,5 +85,8 @@ interface Context {

/** Retrieves additional configuration for the graph generation process. */
ReadableConfig getGraphGeneratorConfig();

/** Transforms the transformation and updates the current stream graph. */
Collection<Integer> transform(Transformation<?> transformation);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,67 +18,19 @@
package org.apache.flink.streaming.api.operators.collect;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.PhysicalTransformation;

/**
* A {@link DataStreamSink} which is used to collect results of a data stream. It completely
* overwrites {@link DataStreamSink} so that its own transformation is manipulated.
*/
@Internal
public class CollectStreamSink<T> extends DataStreamSink<T> {

private final PhysicalTransformation<T> transformation;

public CollectStreamSink(DataStream<T> inputStream, CollectSinkOperatorFactory<T> factory) {
super(inputStream, (CollectSinkOperator<T>) factory.getOperator());
this.transformation =
super(
new LegacySinkTransformation<T>(
inputStream.getTransformation(), "Collect Stream Sink", factory, 1);
}

@Override
public Transformation<T> getTransformation() {
return transformation;
}

@Override
public DataStreamSink<T> name(String name) {
transformation.setName(name);
return this;
}

@Override
public DataStreamSink<T> uid(String uid) {
transformation.setUid(uid);
return this;
}

@Override
public DataStreamSink<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}

@Override
public DataStreamSink<T> setParallelism(int parallelism) {
transformation.setParallelism(parallelism);
return this;
}

@Override
public DataStreamSink<T> disableChaining() {
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
return this;
}

@Override
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
inputStream.getTransformation(), "Collect Stream Sink", factory, 1));
}
}
Loading

0 comments on commit bb92bc0

Please sign in to comment.