Skip to content

Commit

Permalink
[FLINK-31507][table] Move execution logic of DescribeTableOperation o…
Browse files Browse the repository at this point in the history
…ut from TableEnvironmentImpl

This closes apache#22215
  • Loading branch information
wuchong committed Mar 21, 2023
1 parent 96b17a6 commit 452986d
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 123 deletions.
4 changes: 2 additions & 2 deletions flink-table/flink-sql-client/src/test/resources/sql/table.q
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ org.apache.flink.table.api.ValidationException: Table with identifier 'default_c

describe non_exist;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist.
!error
desc non_exist;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist.
!error

alter table non_exist rename to non_exist2;
Expand Down
4 changes: 2 additions & 2 deletions flink-table/flink-sql-gateway/src/test/resources/sql/table.q
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ org.apache.flink.table.api.ValidationException: Table with identifier 'default_c

describe non_exist;
!output
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist.
!error
desc non_exist;
!output
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist
org.apache.flink.table.api.ValidationException: Tables or views with the identifier 'default_catalog.default_database.non_exist' doesn't exist.
!error

alter table non_exist rename to non_exist2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.apache.flink.table.catalog.ResolvedCatalogTable;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
Expand All @@ -79,7 +78,6 @@
import org.apache.flink.table.operations.CompileAndExecutePlanOperation;
import org.apache.flink.table.operations.CreateTableASOperation;
import org.apache.flink.table.operations.DeleteFromFilterOperation;
import org.apache.flink.table.operations.DescribeTableOperation;
import org.apache.flink.table.operations.ExecutableOperation;
import org.apache.flink.table.operations.ExplainOperation;
import org.apache.flink.table.operations.LoadModuleOperation;
Expand All @@ -106,7 +104,6 @@
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.utils.DataTypeUtils;
import org.apache.flink.table.utils.print.PrintStyle;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -970,18 +967,6 @@ public TableResultInternal executeInternal(Operation operation) {
.schema(ResolvedSchema.of(Column.physical("result", DataTypes.STRING())))
.data(Collections.singletonList(Row.of(explanation)))
.build();
} else if (operation instanceof DescribeTableOperation) {
DescribeTableOperation describeTableOperation = (DescribeTableOperation) operation;
Optional<ContextResolvedTable> result =
catalogManager.getTable(describeTableOperation.getSqlIdentifier());
if (result.isPresent()) {
return buildDescribeResult(result.get().getResolvedSchema());
} else {
throw new ValidationException(
String.format(
"Tables or views with the identifier '%s' doesn't exist",
describeTableOperation.getSqlIdentifier().asSummaryString()));
}
} else if (operation instanceof QueryOperation) {
return executeQueryOperation((QueryOperation) operation);
} else if (operation instanceof ExecutePlanOperation) {
Expand Down Expand Up @@ -1068,109 +1053,6 @@ private TableResultInternal unloadModule(UnloadModuleOperation operation) {
}
}

private TableResultInternal buildDescribeResult(ResolvedSchema schema) {
Object[][] rows = buildTableColumns(schema);
boolean nonComments = isSchemaNonColumnComments(schema);
return buildResult(
generateTableColumnsNames(nonComments),
generateTableColumnsDataTypes(nonComments),
rows);
}

private DataType[] generateTableColumnsDataTypes(boolean nonComments) {
final ArrayList<DataType> result =
new ArrayList<>(
Arrays.asList(
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.BOOLEAN(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()));
if (!nonComments) {
result.add(DataTypes.STRING());
}
return result.toArray(new DataType[0]);
}

private String[] generateTableColumnsNames(boolean nonComments) {
final ArrayList<String> result =
new ArrayList<>(
Arrays.asList("name", "type", "null", "key", "extras", "watermark"));
if (!nonComments) {
result.add("comment");
}
return result.toArray(new String[0]);
}

private Object[][] buildTableColumns(ResolvedSchema schema) {
Map<String, String> fieldToWatermark =
schema.getWatermarkSpecs().stream()
.collect(
Collectors.toMap(
WatermarkSpec::getRowtimeAttribute,
spec -> spec.getWatermarkExpression().asSummaryString()));

Map<String, String> fieldToPrimaryKey = new HashMap<>();
schema.getPrimaryKey()
.ifPresent(
(p) -> {
List<String> columns = p.getColumns();
columns.forEach(
(c) ->
fieldToPrimaryKey.put(
c,
String.format(
"PRI(%s)",
String.join(", ", columns))));
});
boolean nonComments = isSchemaNonColumnComments(schema);
return schema.getColumns().stream()
.map(
(c) -> {
final LogicalType logicalType = c.getDataType().getLogicalType();
final ArrayList<Object> result =
new ArrayList<>(
Arrays.asList(
c.getName(),
logicalType.copy(true).asSummaryString(),
logicalType.isNullable(),
fieldToPrimaryKey.getOrDefault(
c.getName(), null),
c.explainExtras().orElse(null),
fieldToWatermark.getOrDefault(
c.getName(), null)));
if (!nonComments) {
result.add(c.getComment().orElse(null));
}
return result.toArray();
})
.toArray(Object[][]::new);
}

private boolean isSchemaNonColumnComments(ResolvedSchema schema) {
return schema.getColumns().stream().map(Column::getComment).noneMatch(Optional::isPresent);
}

private TableResultInternal buildResult(String[] headers, DataType[] types, Object[][] rows) {
ResolvedSchema schema = ResolvedSchema.physical(headers, types);
ResultProvider provider =
new StaticResultProvider(
Arrays.stream(rows).map(Row::of).collect(Collectors.toList()));
return TableResultImpl.builder()
.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.schema(ResolvedSchema.physical(headers, types))
.resultProvider(provider)
.setPrintStyle(
PrintStyle.tableauWithDataInferredColumnWidths(
schema,
provider.getRowDataStringConverter(),
Integer.MAX_VALUE,
true,
false))
.build();
}

/**
* extract sink identifier names from {@link ModifyOperation}s and deduplicate them with {@link
* #deduplicateSinkIdentifierNames(List)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.table.operations;

import org.apache.flink.table.api.internal.TableResultInternal;
import org.apache.flink.table.catalog.ObjectIdentifier;

import java.util.Collections;
Expand All @@ -28,7 +29,7 @@
* Operation to describe a DESCRIBE [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier
* statement.
*/
public class DescribeTableOperation implements Operation {
public class DescribeTableOperation implements Operation, ExecutableOperation {

private final ObjectIdentifier sqlIdentifier;
private final boolean isExtended;
Expand All @@ -54,4 +55,12 @@ public String asSummaryString() {
return OperationUtils.formatWithChildren(
"DESCRIBE", params, Collections.emptyList(), Operation::asSummaryString);
}

@Override
public TableResultInternal execute(Context ctx) {
// DESCRIBE <table> is a synonym for SHOW COLUMNS without LIKE pattern.
ShowColumnsOperation showColumns =
new ShowColumnsOperation(sqlIdentifier, null, false, false, "FROM");
return showColumns.execute(ctx);
}
}

0 comments on commit 452986d

Please sign in to comment.