diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java index e20fce7fdd8a6..1a427ac758a11 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecTableSourceScan.java @@ -85,10 +85,14 @@ protected Transformation translateToPlanInternal(PlannerBase planner) { } else if (provider instanceof SourceProvider) { Source source = ((SourceProvider) provider).createSource(); // TODO: Push down watermark strategy to source scan - return env.fromSource(source, WatermarkStrategy.noWatermarks(), operatorName) + return env.fromSource( + source, WatermarkStrategy.noWatermarks(), operatorName, outputTypeInfo) .getTransformation(); } else if (provider instanceof DataStreamScanProvider) { - return ((DataStreamScanProvider) provider).produceDataStream(env).getTransformation(); + Transformation transformation = + ((DataStreamScanProvider) provider).produceDataStream(env).getTransformation(); + transformation.setOutputType(outputTypeInfo); + return transformation; } else { throw new UnsupportedOperationException( provider.getClass().getSimpleName() + " is unsupported now."); diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java index 7bc7727ec262b..33484f2321e6a 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestFileFactory.java @@ -31,7 +31,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.connector.sink.DynamicTableSink; import org.apache.flink.table.connector.sink.SinkProvider; @@ -45,7 +44,6 @@ import org.apache.flink.table.factories.DynamicTableSinkFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; import org.apache.flink.table.filesystem.FileSystemOptions; -import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; import java.io.BufferedReader; import java.io.IOException; @@ -198,7 +196,11 @@ public void close() throws IOException { @Override public TypeInformation getProducedType() { - return InternalTypeInfo.ofFields(DataTypes.STRING().getLogicalType()); + // For ScanTableSource, the output type is determined by the planner, + // and the result of this method will not be used. + // The purpose of returning null is to verify that the planner can + // handle the output type correctly. + return null; } } diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index 7f5237c63f002..979c32ebbdb10 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -725,7 +725,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon @Override public DataStream produceDataStream( StreamExecutionEnvironment execEnv) { - return execEnv.addSource(function, type); + return execEnv.addSource(function); } @Override