Skip to content

Commit

Permalink
[FLINK-35195][table] Convert CatalogMaterializedTable to CatalogTable…
Browse files Browse the repository at this point in the history
… to generate execution plan for planner
  • Loading branch information
lsyldliu committed May 7, 2024
1 parent b037f56 commit d8491c0
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,20 @@ public <T extends CatalogBaseTable> T getTable() {
return (T) resolvedTable.getOrigin();
}

/**
* Convert the {@link ResolvedCatalogMaterializedTable} in {@link ContextResolvedTable} to
* {@link ResolvedCatalogTable }.
*/
public ContextResolvedTable toCatalogTable() {
if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
return ContextResolvedTable.permanent(
objectIdentifier,
catalog,
((ResolvedCatalogMaterializedTable) resolvedTable).toResolvedCatalogTable());
}
return this;
}

/**
* Copy the {@link ContextResolvedTable}, replacing the underlying {@link CatalogTable} options.
*/
Expand All @@ -150,6 +164,12 @@ public ContextResolvedTable copy(Map<String, String> newOptions) {
throw new ValidationException(
String.format("View '%s' cannot be enriched with new options.", this));
}
if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
return ContextResolvedTable.permanent(
objectIdentifier,
catalog,
((ResolvedCatalogMaterializedTable) resolvedTable).copy(newOptions));
}
return new ContextResolvedTable(
objectIdentifier,
catalog,
Expand All @@ -159,6 +179,12 @@ public ContextResolvedTable copy(Map<String, String> newOptions) {

/** Copy the {@link ContextResolvedTable}, replacing the underlying {@link ResolvedSchema}. */
public ContextResolvedTable copy(ResolvedSchema newSchema) {
if (resolvedTable.getTableKind() == CatalogBaseTable.TableKind.MATERIALIZED_TABLE) {
throw new ValidationException(
String.format(
"Materialized table '%s' cannot be copied with new schema %s.",
this, newSchema));
}
return new ContextResolvedTable(
objectIdentifier,
catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,4 +182,17 @@ public String toString() {
+ resolvedSchema
+ '}';
}

/** Convert this object to a {@link ResolvedCatalogTable} object for planner optimize query. */
public ResolvedCatalogTable toResolvedCatalogTable() {
return new ResolvedCatalogTable(
CatalogTable.newBuilder()
.schema(getUnresolvedSchema())
.comment(getComment())
.partitionKeys(getPartitionKeys())
.options(getOptions())
.snapshot(getSnapshot().orElse(null))
.build(),
getResolvedSchema());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Table getTable(String tableName) {
return table.map(
lookupResult ->
new CatalogSchemaTable(
lookupResult,
lookupResult.toCatalogTable(),
getStatistic(lookupResult, identifier),
isStreamingMode))
.orElse(null);
Expand All @@ -102,6 +102,7 @@ private FlinkStatistic getStatistic(
contextResolvedTable.getResolvedTable();
switch (resolvedBaseTable.getTableKind()) {
case TABLE:
case MATERIALIZED_TABLE:
return FlinkStatistic.unknown(resolvedBaseTable.getResolvedSchema())
.tableStats(extractTableStats(contextResolvedTable, identifier))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,9 @@ private Operation convertSqlInsert(RichSqlInsert insert) {

UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of(targetTablePath);
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
ContextResolvedTable contextResolvedTable = catalogManager.getTableOrError(identifier);
// If it is materialized table, convert it to catalog table for query optimize
ContextResolvedTable contextResolvedTable =
catalogManager.getTableOrError(identifier).toCatalogTable();

PlannerQueryOperation query =
(PlannerQueryOperation)
Expand Down

0 comments on commit d8491c0

Please sign in to comment.