Skip to content

Commit

Permalink
[FLINK-21679][table] Set output type for transformations from SourceP…
Browse files Browse the repository at this point in the history
…rovider and DataStreamScanProvider in CommonExecTableSourceScan

This closes apache#15120
  • Loading branch information
WeiZhong94 authored Mar 10, 2021
1 parent a3ffaa8 commit d97c68b
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,14 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
} else if (provider instanceof SourceProvider) {
Source<RowData, ?, ?> source = ((SourceProvider) provider).createSource();
// TODO: Push down watermark strategy to source scan
return env.fromSource(source, WatermarkStrategy.noWatermarks(), operatorName)
return env.fromSource(
source, WatermarkStrategy.noWatermarks(), operatorName, outputTypeInfo)
.getTransformation();
} else if (provider instanceof DataStreamScanProvider) {
return ((DataStreamScanProvider) provider).produceDataStream(env).getTransformation();
Transformation<RowData> transformation =
((DataStreamScanProvider) provider).produceDataStream(env).getTransformation();
transformation.setOutputType(outputTypeInfo);
return transformation;
} else {
throw new UnsupportedOperationException(
provider.getClass().getSimpleName() + " is unsupported now.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkProvider;
Expand All @@ -45,7 +44,6 @@
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.filesystem.FileSystemOptions;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;

import java.io.BufferedReader;
import java.io.IOException;
Expand Down Expand Up @@ -198,7 +196,11 @@ public void close() throws IOException {

@Override
public TypeInformation<RowData> getProducedType() {
return InternalTypeInfo.ofFields(DataTypes.STRING().getLogicalType());
// For ScanTableSource, the output type is determined by the planner,
// and the result of this method will not be used.
// The purpose of returning null is to verify that the planner can
// handle the output type correctly.
return null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -725,7 +725,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
@Override
public DataStream<RowData> produceDataStream(
StreamExecutionEnvironment execEnv) {
return execEnv.addSource(function, type);
return execEnv.addSource(function);
}

@Override
Expand Down

0 comments on commit d97c68b

Please sign in to comment.