Skip to content

Commit

Permalink
[FLINK-23306][table] Reduce usages of legacy TableSchema
Browse files Browse the repository at this point in the history
This closes apache#16425.
  • Loading branch information
twalthr committed Jul 9, 2021
1 parent 31fcb6c commit f37cf25
Show file tree
Hide file tree
Showing 7 changed files with 9 additions and 7 deletions.
3 changes: 2 additions & 1 deletion docs/content.zh/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
Expand Down
3 changes: 2 additions & 1 deletion docs/content/docs/dev/table/sourcesSinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

// create and return dynamic table source
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {

// derive the produced data type (excluding computed columns) from the catalog table
final DataType producedDataType =
context.getCatalogTable().getSchema().toPhysicalRowDataType();
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

// create and return dynamic table source
return new SocketDynamicTableSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
helper.validate();
ReadableConfig options = helper.getOptions();
return new PrintSink(
context.getCatalogTable().getSchema().toPhysicalRowDataType(),
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
options.get(PRINT_IDENTIFIER),
options.get(STANDARD_ERROR),
options.getOptional(FactoryUtil.SINK_PARALLELISM).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class FlinkRelMdUniqueKeys private extends MetadataHandler[BuiltInMetadata.Uniqu
case act: CatalogTable =>
val builder = ImmutableSet.builder[ImmutableBitSet]()

val schema = act.getSchema
val schema = act.getResolvedSchema
if (schema.getPrimaryKey.isPresent) {
// use relOptTable's type which may be projected based on original schema
val columns = relOptTable.getRowType.getFieldNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class StreamPhysicalSinkRule extends ConverterRule(
val dynamicPartFields = sink.catalogTable.getPartitionKeys
.filter(!sink.staticPartitions.contains(_))
val fieldNames = sink.catalogTable
.getSchema
.getResolvedSchema
.toPhysicalRowDataType
.getLogicalType.asInstanceOf[RowType]
.getFieldNames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class StreamPhysicalTableSourceScanRule
isSourceChangeEventsDuplicate(table.catalogTable, table.tableSource, config)) {
// generate changelog normalize node
// primary key has been validated in CatalogSourceTable
val primaryKey = table.catalogTable.getSchema.getPrimaryKey.get()
val primaryKey = table.catalogTable.getResolvedSchema.getPrimaryKey.get()
val keyFields = primaryKey.getColumns
val inputFieldNames = newScan.getRowType.getFieldNames
val primaryKeyIndices = ScanUtil.getPrimaryKeyIndices(inputFieldNames, keyFields)
Expand Down

0 comments on commit f37cf25

Please sign in to comment.