Skip to content

Commit

Permalink
[FLINK-32517][table] Support to execute [CREATE OR] REPLACE TABLE AS …
Browse files Browse the repository at this point in the history
…statement (apache#22949)
  • Loading branch information
Tartarus0zm authored Jul 7, 2023
1 parent 1354d2f commit 08ef36e
Show file tree
Hide file tree
Showing 11 changed files with 681 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,4 +270,8 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.newlineAndIndent();
this.asQuery.unparse(writer, leftPrec, rightPrec);
}

public String[] fullTableName() {
return tableName.names.toArray(new String[0]);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@
import org.apache.flink.table.operations.NopOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.SinkModifyOperation;
import org.apache.flink.table.operations.SourceQueryOperation;
import org.apache.flink.table.operations.StatementSetOperation;
import org.apache.flink.table.operations.TableSourceQueryOperation;
import org.apache.flink.table.operations.command.ExecutePlanOperation;
import org.apache.flink.table.operations.ddl.AnalyzeTableOperation;
import org.apache.flink.table.operations.ddl.CompilePlanOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.table.resource.ResourceType;
Expand Down Expand Up @@ -784,11 +786,14 @@ public CompiledPlan compilePlan(List<ModifyOperation> operations) {
public TableResultInternal executeInternal(List<ModifyOperation> operations) {
List<ModifyOperation> mapOperations = new ArrayList<>();
for (ModifyOperation modify : operations) {
// execute CREATE TABLE first for CTAS statements
if (modify instanceof CreateTableASOperation) {
// execute CREATE TABLE first for CTAS statements
CreateTableASOperation ctasOperation = (CreateTableASOperation) modify;
executeInternal(ctasOperation.getCreateTableOperation());
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
} else if (modify instanceof ReplaceTableAsOperation) {
ReplaceTableAsOperation rtasOperation = (ReplaceTableAsOperation) modify;
mapOperations.add(getOperation(rtasOperation));
} else {
boolean isRowLevelModification = isRowLevelModification(modify);
if (isRowLevelModification) {
Expand Down Expand Up @@ -819,6 +824,31 @@ public TableResultInternal executeInternal(List<ModifyOperation> operations) {
return executeInternal(transformations, sinkIdentifierNames);
}

private ModifyOperation getOperation(ReplaceTableAsOperation rtasOperation) {
// rtas drop table first, then create
CreateTableOperation createTableOperation = rtasOperation.getCreateTableOperation();
ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier();
try {
catalogManager.dropTable(tableIdentifier, rtasOperation.isCreateOrReplace());
} catch (ValidationException e) {
if (String.format(
"Table with identifier '%s' does not exist.",
tableIdentifier.asSummaryString())
.equals(e.getMessage())) {
throw new TableException(
String.format(
"The table %s to be replaced doesn't exist. "
+ "You can try to use CREATE TABLE AS statement or "
+ "CREATE OR REPLACE TABLE AS statement.",
tableIdentifier));
} else {
throw e;
}
}
executeInternal(createTableOperation);
return rtasOperation.toSinkModifyOperation(catalogManager);
}

private TableResultInternal executeInternal(
DeleteFromFilterOperation deleteFromFilterOperation) {
Optional<Long> rows =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,6 @@ public interface ModifyOperationVisitor<T> {
T visit(CollectModifyOperation selectOperation);

T visit(CreateTableASOperation ctas);

T visit(ReplaceTableAsOperation rtas);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.operations;

import org.apache.flink.annotation.Internal;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.operations.ddl.CreateTableOperation;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;

/** Operation to describe a [CREATE OR] REPLACE TABLE AS statement. */
@Internal
public class ReplaceTableAsOperation implements ModifyOperation {

private final CreateTableOperation createTableOperation;
private final QueryOperation sinkModifyQuery;
private final boolean isCreateOrReplace;

public ReplaceTableAsOperation(
CreateTableOperation createTableOperation,
QueryOperation sinkModifyQuery,
boolean isCreateOrReplace) {
this.createTableOperation = createTableOperation;
this.sinkModifyQuery = sinkModifyQuery;
this.isCreateOrReplace = isCreateOrReplace;
}

@Override
public QueryOperation getChild() {
return sinkModifyQuery;
}

@Override
public <T> T accept(ModifyOperationVisitor<T> visitor) {
return visitor.visit(this);
}

public CreateTableOperation getCreateTableOperation() {
return createTableOperation;
}

public boolean isCreateOrReplace() {
return isCreateOrReplace;
}

public SinkModifyOperation toSinkModifyOperation(CatalogManager catalogManager) {
return new SinkModifyOperation(
catalogManager.getTableOrError(createTableOperation.getTableIdentifier()),
sinkModifyQuery,
Collections.emptyMap(),
null, // targetColumns
false,
Collections.emptyMap());
}

@Override
public String asSummaryString() {
Map<String, Object> params = new LinkedHashMap<>();
params.put("catalogTable", getCreateTableOperation().getCatalogTable());
params.put("identifier", getCreateTableOperation().getTableIdentifier());

return OperationUtils.formatWithChildren(
isCreateOrReplace ? "CREATE OR REPLACE TABLE AS" : "REPLACE TABLE AS",
params,
Collections.singletonList(sinkModifyQuery),
Operation::asSummaryString);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.flink.table.planner.utils.OperationConverterUtils;

import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.util.NlsString;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -171,11 +171,7 @@ private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) {
mergingStrategies);
verifyPartitioningColumnsExist(mergedSchema, partitionKeys);

String tableComment =
sqlCreateTable
.getComment()
.map(comment -> comment.getValueAs(NlsString.class).getValue())
.orElse(null);
String tableComment = OperationConverterUtils.getTableComment(sqlCreateTable.getComment());

return catalogManager.resolveCatalogTable(
CatalogTable.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class SqlNodeConverters {
register(new SqlTruncateTableConverter());
register(new SqlShowFunctionsConverter());
register(new SqlShowProcedureConverter());
register(new SqlReplaceTableAsConverter());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.operations.converters;

import org.apache.flink.sql.parser.ddl.SqlReplaceTableAs;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.ReplaceTableAsOperation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.flink.table.planner.operations.PlannerQueryOperation;
import org.apache.flink.table.planner.utils.OperationConverterUtils;

import org.apache.calcite.sql.SqlNode;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

/** A converter for {@link SqlReplaceTableAs}. */
public class SqlReplaceTableAsConverter implements SqlNodeConverter<SqlReplaceTableAs> {

@Override
public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, ConvertContext context) {
CatalogManager catalogManager = context.getCatalogManager();
UnresolvedIdentifier unresolvedIdentifier =
UnresolvedIdentifier.of(sqlReplaceTableAs.fullTableName());
ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);

SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery();
context.getSqlValidator().validate(asQuerySqlNode);
QueryOperation query =
new PlannerQueryOperation(context.toRelRoot(asQuerySqlNode).project());

// get table comment
String tableComment =
OperationConverterUtils.getTableComment(sqlReplaceTableAs.getComment());

// get table properties
Map<String, String> properties = new HashMap<>();
sqlReplaceTableAs
.getPropertyList()
.getList()
.forEach(
p ->
properties.put(
((SqlTableOption) p).getKeyString(),
((SqlTableOption) p).getValueString()));

// get table
CatalogTable catalogTable =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(),
tableComment,
Collections.emptyList(),
properties);

CreateTableOperation createTableOperation =
new CreateTableOperation(
identifier,
catalogTable,
sqlReplaceTableAs.isIfNotExists(),
sqlReplaceTableAs.isTemporary());

return new ReplaceTableAsOperation(
createTableOperation, query, sqlReplaceTableAs.isCreateOrReplace());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

/** Utils methods for converting sql to operations. */
public class OperationConverterUtils {
Expand Down Expand Up @@ -85,6 +86,10 @@ public static List<TableChange> buildModifyColumnChange(
.orElse(null);
}

public static @Nullable String getTableComment(Optional<SqlCharStringLiteral> tableComment) {
return tableComment.map(comment -> comment.getValueAs(String.class)).orElse(null);
}

public static Map<String, String> extractProperties(SqlNodeList propList) {
Map<String, String> properties = new HashMap<>();
if (propList != null) {
Expand Down
Loading

0 comments on commit 08ef36e

Please sign in to comment.