Skip to content

Commit

Permalink
[FLINK-28322][table-api] DataStreamScan(Sink)Provider's new method is…
Browse files Browse the repository at this point in the history
… not compatible

This closes apache#20119
  • Loading branch information
JingsongLi committed Jul 6, 2022
1 parent ef9ce85 commit 03a1a42
Show file tree
Hide file tree
Showing 8 changed files with 127 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,13 @@ public class FileSystemTableSink extends AbstractFileSystemTable

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context sinkContext) {
return (DataStreamSinkProvider)
(providerContext, dataStream) -> consume(providerContext, dataStream, sinkContext);
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return consume(providerContext, dataStream, sinkContext);
}
};
}

private DataStreamSink<?> consume(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,13 @@ private HiveTableSink(
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
DataStructureConverter converter =
context.createDataStructureConverter(tableSchema.toRowDataType());
return (DataStreamSinkProvider)
(providerContext, dataStream) ->
consume(providerContext, dataStream, context.isBounded(), converter);
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return consume(providerContext, dataStream, context.isBounded(), converter);
}
};
}

private DataStreamSink<?> consume(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand Down Expand Up @@ -215,34 +217,32 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
upsertMode))
.build();
if (flushMode.isEnabled() && upsertMode) {
return (DataStreamSinkProvider)
(providerContext, dataStream) -> {
final boolean objectReuse =
dataStream
.getExecutionEnvironment()
.getConfig()
.isObjectReuseEnabled();
final ReducingUpsertSink<?> sink =
new ReducingUpsertSink<>(
kafkaSink,
physicalDataType,
keyProjection,
flushMode,
objectReuse
? createRowDataTypeSerializer(
context,
dataStream.getExecutionConfig())
::copy
: rowData -> rowData);
final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
providerContext
.generateUid(UPSERT_KAFKA_TRANSFORMATION)
.ifPresent(end::uid);
if (parallelism != null) {
end.setParallelism(parallelism);
}
return end;
};
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
final boolean objectReuse =
dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled();
final ReducingUpsertSink<?> sink =
new ReducingUpsertSink<>(
kafkaSink,
physicalDataType,
keyProjection,
flushMode,
objectReuse
? createRowDataTypeSerializer(
context,
dataStream.getExecutionConfig())
::copy
: rowData -> rowData);
final DataStreamSink<RowData> end = dataStream.sinkTo(sink);
providerContext.generateUid(UPSERT_KAFKA_TRANSFORMATION).ifPresent(end::uid);
if (parallelism != null) {
end.setParallelism(parallelism);
}
return end;
}
};
}
return SinkV2Provider.of(kafkaSink, parallelism);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.flink.table.utils;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -53,8 +56,14 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final DataStructureConverter converter =
context.createDataStructureConverter(rowDataType);
return (DataStreamSinkProvider)
(providerContext, dataStream) -> dataStream.addSink(new RowSink(converter));
return new DataStreamSinkProvider() {

@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return dataStream.addSink(new RowSink(converter));
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ public interface DataStreamSinkProvider
*
* @see SingleOutputStreamOperator#uid(String)
*/
DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream);
default DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
return consumeDataStream(dataStream);
}

/**
* Consumes the given Java {@link DataStream} and returns the sink transformation {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ public interface DataStreamScanProvider extends ScanTableSource.ScanRuntimeProvi
*
* @see SingleOutputStreamOperator#uid(String)
*/
DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv);
default DataStream<RowData> produceDataStream(
ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
return produceDataStream(execEnv);
}

/** Creates a scan Java {@link DataStream} from a {@link StreamExecutionEnvironment}. */
@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
Expand All @@ -32,6 +34,7 @@
import org.apache.flink.table.api.internal.ResultProvider;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand Down Expand Up @@ -95,40 +98,40 @@ public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {

@Override
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
return (DataStreamSinkProvider)
(providerContext, inputStream) -> {
final CheckpointConfig checkpointConfig =
inputStream.getExecutionEnvironment().getCheckpointConfig();
final ExecutionConfig config = inputStream.getExecutionConfig();

final TypeSerializer<RowData> externalSerializer =
InternalTypeInfo.<RowData>of(consumedDataType.getLogicalType())
.createSerializer(config);
final String accumulatorName = tableIdentifier.getObjectName();

final CollectSinkOperatorFactory<RowData> factory =
new CollectSinkOperatorFactory<>(
externalSerializer,
accumulatorName,
maxBatchSize,
socketTimeout);
final CollectSinkOperator<RowData> operator =
(CollectSinkOperator<RowData>) factory.getOperator();

iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
externalSerializer,
accumulatorName,
checkpointConfig);
converter = context.createDataStructureConverter(consumedDataType);
converter.open(RuntimeConverter.Context.create(classLoader));

final CollectStreamSink<RowData> sink =
new CollectStreamSink<>(inputStream, factory);
providerContext.generateUid(COLLECT_TRANSFORMATION).ifPresent(sink::uid);
return sink.name("Collect table sink");
};
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> inputStream) {
final CheckpointConfig checkpointConfig =
inputStream.getExecutionEnvironment().getCheckpointConfig();
final ExecutionConfig config = inputStream.getExecutionConfig();

final TypeSerializer<RowData> externalSerializer =
InternalTypeInfo.<RowData>of(consumedDataType.getLogicalType())
.createSerializer(config);
final String accumulatorName = tableIdentifier.getObjectName();

final CollectSinkOperatorFactory<RowData> factory =
new CollectSinkOperatorFactory<>(
externalSerializer, accumulatorName, maxBatchSize, socketTimeout);
final CollectSinkOperator<RowData> operator =
(CollectSinkOperator<RowData>) factory.getOperator();

iterator =
new CollectResultIterator<>(
operator.getOperatorIdFuture(),
externalSerializer,
accumulatorName,
checkpointConfig);
converter = context.createDataStructureConverter(consumedDataType);
converter.open(RuntimeConverter.Context.create(classLoader));

final CollectStreamSink<RowData> sink =
new CollectStreamSink<>(inputStream, factory);
providerContext.generateUid(COLLECT_TRANSFORMATION).ifPresent(sink::uid);
return sink.name("Collect table sink");
}
};
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

package org.apache.flink.table.planner.plan.nodes.exec.common;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -33,6 +35,7 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
Expand Down Expand Up @@ -149,8 +152,14 @@ public void invoke(RowData value, Context context) {
@Override
public DataStreamSinkProvider getSinkRuntimeProvider(
DynamicTableSink.Context context) {
return (providerContext, dataStream) ->
dataStream.addSink(sinkFunction);
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext,
DataStream<RowData> dataStream) {
return dataStream.addSink(sinkFunction);
}
};
}
})
.build();
Expand Down Expand Up @@ -443,8 +452,14 @@ public void writeWatermark(
@Override
public DataStreamSinkProvider getSinkRuntimeProvider(
DynamicTableSink.Context context) {
return (providerContext, dataStream) ->
dataStream.addSink(sinkFunction);
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext,
DataStream<RowData> dataStream) {
return dataStream.addSink(sinkFunction);
}
};
}
})
.build();
Expand Down Expand Up @@ -494,12 +509,17 @@ private TableFactoryHarness.SinkBase buildDataStreamSinkProvider(
return new TableFactoryHarness.SinkBase() {
@Override
public DataStreamSinkProvider getSinkRuntimeProvider(Context context) {
return (providerContext, dataStream) -> {
TestSink<RowData> sink = buildRecordWriterTestSink(new RecordWriter(fetched));
if (useSinkV2) {
return dataStream.sinkTo(SinkV1Adapter.wrap(sink));
return new DataStreamSinkProvider() {
@Override
public DataStreamSink<?> consumeDataStream(
ProviderContext providerContext, DataStream<RowData> dataStream) {
TestSink<RowData> sink =
buildRecordWriterTestSink(new RecordWriter(fetched));
if (useSinkV2) {
return dataStream.sinkTo(SinkV1Adapter.wrap(sink));
}
return dataStream.sinkTo(sink);
}
return dataStream.sinkTo(sink);
};
}
};
Expand Down

0 comments on commit 03a1a42

Please sign in to comment.