Skip to content

Commit

Permalink
[FLINK-23920][table-common] Keep primary key when inferring schema wi…
Browse files Browse the repository at this point in the history
…th SchemaTranslator

This closes apache#16944.
  • Loading branch information
twalthr committed Aug 25, 2021
1 parent 3510e26 commit cd52056
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public final class SchemaTranslator {
*
* <ul>
* <li>1. Derive physical columns from the input schema.
* <li>2. Derive physical columns from the input schema but enrich with metadata column.
* <li>2. Derive physical columns from the input schema but enrich with metadata column and
* primary key.
* <li>3. Entirely use declared schema.
* </ul>
*/
Expand All @@ -85,15 +86,15 @@ public static ProducingResult createProducingResult(
final List<UnresolvedColumn> declaredColumns = declaredSchema.getColumns();

// the declared schema does not contain physical information,
// thus, it only replaces physical columns with metadata rowtime
// thus, it only replaces physical columns with metadata rowtime or adds a primary key
if (declaredColumns.stream().noneMatch(SchemaTranslator::isPhysical)) {
// go through data type to erase time attributes
final DataType sourceDataType = inputSchema.toSourceRowDataType();
final DataType physicalDataType =
patchDataTypeWithoutMetadataRowtime(sourceDataType, declaredColumns);
final Schema.Builder builder = Schema.newBuilder();
builder.fromRowDataType(physicalDataType);
builder.fromColumns(declaredColumns);
builder.fromSchema(declaredSchema);
return new ProducingResult(null, builder.build(), null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ public void testOutputToEmptySchema() {
}

@Test
public void testOutputToMetadataSchema() {
public void testOutputToPartialSchema() {
final ResolvedSchema tableSchema =
ResolvedSchema.of(
Column.physical("id", DataTypes.BIGINT()),
Column.physical("id", DataTypes.BIGINT().notNull()),
Column.physical("name", DataTypes.STRING()),
Column.metadata("rowtime", DataTypes.TIMESTAMP_LTZ(3), null, false));

Expand All @@ -349,14 +349,16 @@ public void testOutputToMetadataSchema() {
Schema.newBuilder()
.columnByExpression("computed", "f1 + 42")
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3))
.primaryKey("id")
.build());

assertEquals(
Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.column("id", DataTypes.BIGINT().notNull())
.column("name", DataTypes.STRING())
.columnByExpression("computed", "f1 + 42")
.columnByMetadata("rowtime", DataTypes.TIMESTAMP_LTZ(3)) // becomes metadata
.primaryKey("id")
.build(),
result.getSchema());
}
Expand Down

0 comments on commit cd52056

Please sign in to comment.