diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java index 472dc28535b59..487aeeed80b5e 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java @@ -270,4 +270,8 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.newlineAndIndent(); this.asQuery.unparse(writer, leftPrec, rightPrec); } + + public String[] fullTableName() { + return tableName.names.toArray(new String[0]); + } } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java index 4c7fda752fcf5..275cf587223dd 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java @@ -83,6 +83,7 @@ import org.apache.flink.table.operations.NopOperation; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.ReplaceTableAsOperation; import org.apache.flink.table.operations.SinkModifyOperation; import org.apache.flink.table.operations.SourceQueryOperation; import org.apache.flink.table.operations.StatementSetOperation; @@ -90,6 +91,7 @@ import org.apache.flink.table.operations.command.ExecutePlanOperation; import org.apache.flink.table.operations.ddl.AnalyzeTableOperation; import org.apache.flink.table.operations.ddl.CompilePlanOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.operations.utils.OperationTreeBuilder; import org.apache.flink.table.resource.ResourceManager; import org.apache.flink.table.resource.ResourceType; @@ -784,11 +786,14 @@ public CompiledPlan compilePlan(List operations) { public TableResultInternal executeInternal(List operations) { List mapOperations = new ArrayList<>(); for (ModifyOperation modify : operations) { - // execute CREATE TABLE first for CTAS statements if (modify instanceof CreateTableASOperation) { + // execute CREATE TABLE first for CTAS statements CreateTableASOperation ctasOperation = (CreateTableASOperation) modify; executeInternal(ctasOperation.getCreateTableOperation()); mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); + } else if (modify instanceof ReplaceTableAsOperation) { + ReplaceTableAsOperation rtasOperation = (ReplaceTableAsOperation) modify; + mapOperations.add(getOperation(rtasOperation)); } else { boolean isRowLevelModification = isRowLevelModification(modify); if (isRowLevelModification) { @@ -819,6 +824,31 @@ public TableResultInternal executeInternal(List operations) { return executeInternal(transformations, sinkIdentifierNames); } + private ModifyOperation getOperation(ReplaceTableAsOperation rtasOperation) { + // rtas drop table first, then create + CreateTableOperation createTableOperation = rtasOperation.getCreateTableOperation(); + ObjectIdentifier tableIdentifier = createTableOperation.getTableIdentifier(); + try { + catalogManager.dropTable(tableIdentifier, rtasOperation.isCreateOrReplace()); + } catch (ValidationException e) { + if (String.format( + "Table with identifier '%s' does not exist.", + tableIdentifier.asSummaryString()) + .equals(e.getMessage())) { + throw new TableException( + String.format( + "The table %s to be replaced doesn't exist. " + + "You can try to use CREATE TABLE AS statement or " + + "CREATE OR REPLACE TABLE AS statement.", + tableIdentifier)); + } else { + throw e; + } + } + executeInternal(createTableOperation); + return rtasOperation.toSinkModifyOperation(catalogManager); + } + private TableResultInternal executeInternal( DeleteFromFilterOperation deleteFromFilterOperation) { Optional rows = diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java index 98ab9d60f276e..006f042b89308 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ModifyOperationVisitor.java @@ -37,4 +37,6 @@ public interface ModifyOperationVisitor { T visit(CollectModifyOperation selectOperation); T visit(CreateTableASOperation ctas); + + T visit(ReplaceTableAsOperation rtas); } diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java new file mode 100644 index 0000000000000..45f00b0f5dea0 --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/ReplaceTableAsOperation.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.operations; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.operations.ddl.CreateTableOperation; + +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.Map; + +/** Operation to describe a [CREATE OR] REPLACE TABLE AS statement. */ +@Internal +public class ReplaceTableAsOperation implements ModifyOperation { + + private final CreateTableOperation createTableOperation; + private final QueryOperation sinkModifyQuery; + private final boolean isCreateOrReplace; + + public ReplaceTableAsOperation( + CreateTableOperation createTableOperation, + QueryOperation sinkModifyQuery, + boolean isCreateOrReplace) { + this.createTableOperation = createTableOperation; + this.sinkModifyQuery = sinkModifyQuery; + this.isCreateOrReplace = isCreateOrReplace; + } + + @Override + public QueryOperation getChild() { + return sinkModifyQuery; + } + + @Override + public T accept(ModifyOperationVisitor visitor) { + return visitor.visit(this); + } + + public CreateTableOperation getCreateTableOperation() { + return createTableOperation; + } + + public boolean isCreateOrReplace() { + return isCreateOrReplace; + } + + public SinkModifyOperation toSinkModifyOperation(CatalogManager catalogManager) { + return new SinkModifyOperation( + catalogManager.getTableOrError(createTableOperation.getTableIdentifier()), + sinkModifyQuery, + Collections.emptyMap(), + null, // targetColumns + false, + Collections.emptyMap()); + } + + @Override + public String asSummaryString() { + Map params = new LinkedHashMap<>(); + params.put("catalogTable", getCreateTableOperation().getCatalogTable()); + params.put("identifier", getCreateTableOperation().getTableIdentifier()); + + return OperationUtils.formatWithChildren( + isCreateOrReplace ? "CREATE OR REPLACE TABLE AS" : "REPLACE TABLE AS", + params, + Collections.singletonList(sinkModifyQuery), + Operation::asSummaryString); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java index 7983a12caa1e5..172048e6cc00b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlCreateTableConverter.java @@ -37,11 +37,11 @@ import org.apache.flink.table.operations.ddl.CreateTableOperation; import org.apache.flink.table.planner.calcite.FlinkCalciteSqlValidator; import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.utils.OperationConverterUtils; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; -import org.apache.calcite.util.NlsString; import java.util.Collections; import java.util.HashMap; @@ -171,11 +171,7 @@ private CatalogTable createCatalogTable(SqlCreateTable sqlCreateTable) { mergingStrategies); verifyPartitioningColumnsExist(mergedSchema, partitionKeys); - String tableComment = - sqlCreateTable - .getComment() - .map(comment -> comment.getValueAs(NlsString.class).getValue()) - .orElse(null); + String tableComment = OperationConverterUtils.getTableComment(sqlCreateTable.getComment()); return catalogManager.resolveCatalogTable( CatalogTable.of( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java index ec90f42da3f26..3972b55452306 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlNodeConverters.java @@ -50,6 +50,7 @@ public class SqlNodeConverters { register(new SqlTruncateTableConverter()); register(new SqlShowFunctionsConverter()); register(new SqlShowProcedureConverter()); + register(new SqlReplaceTableAsConverter()); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java new file mode 100644 index 0000000000000..1ace0be66c61a --- /dev/null +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlReplaceTableAsConverter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations.converters; + +import org.apache.flink.sql.parser.ddl.SqlReplaceTableAs; +import org.apache.flink.sql.parser.ddl.SqlTableOption; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.catalog.CatalogManager; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.catalog.UnresolvedIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.QueryOperation; +import org.apache.flink.table.operations.ReplaceTableAsOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.planner.operations.PlannerQueryOperation; +import org.apache.flink.table.planner.utils.OperationConverterUtils; + +import org.apache.calcite.sql.SqlNode; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +/** A converter for {@link SqlReplaceTableAs}. */ +public class SqlReplaceTableAsConverter implements SqlNodeConverter { + + @Override + public Operation convertSqlNode(SqlReplaceTableAs sqlReplaceTableAs, ConvertContext context) { + CatalogManager catalogManager = context.getCatalogManager(); + UnresolvedIdentifier unresolvedIdentifier = + UnresolvedIdentifier.of(sqlReplaceTableAs.fullTableName()); + ObjectIdentifier identifier = catalogManager.qualifyIdentifier(unresolvedIdentifier); + + SqlNode asQuerySqlNode = sqlReplaceTableAs.getAsQuery(); + context.getSqlValidator().validate(asQuerySqlNode); + QueryOperation query = + new PlannerQueryOperation(context.toRelRoot(asQuerySqlNode).project()); + + // get table comment + String tableComment = + OperationConverterUtils.getTableComment(sqlReplaceTableAs.getComment()); + + // get table properties + Map properties = new HashMap<>(); + sqlReplaceTableAs + .getPropertyList() + .getList() + .forEach( + p -> + properties.put( + ((SqlTableOption) p).getKeyString(), + ((SqlTableOption) p).getValueString())); + + // get table + CatalogTable catalogTable = + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(query.getResolvedSchema()).build(), + tableComment, + Collections.emptyList(), + properties); + + CreateTableOperation createTableOperation = + new CreateTableOperation( + identifier, + catalogTable, + sqlReplaceTableAs.isIfNotExists(), + sqlReplaceTableAs.isTemporary()); + + return new ReplaceTableAsOperation( + createTableOperation, query, sqlReplaceTableAs.isCreateOrReplace()); + } +} diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java index 6ab9b5c67bca3..83dd4f202f1f9 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Optional; /** Utils methods for converting sql to operations. */ public class OperationConverterUtils { @@ -85,6 +86,10 @@ public static List buildModifyColumnChange( .orElse(null); } + public static @Nullable String getTableComment(Optional tableComment) { + return tableComment.map(comment -> comment.getValueAs(String.class)).orElse(null); + } + public static Map extractProperties(SqlNodeList propList) { Map properties = new HashMap<>(); if (propList != null) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java new file mode 100644 index 0000000000000..5c31f230cd4f8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlRTASNodeToOperationConverterTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.operations; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.SqlDialect; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectIdentifier; +import org.apache.flink.table.operations.Operation; +import org.apache.flink.table.operations.ReplaceTableAsOperation; +import org.apache.flink.table.operations.ddl.CreateTableOperation; +import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; +import org.apache.flink.table.planner.parse.CalciteParser; +import org.apache.flink.table.types.AbstractDataType; + +import org.apache.calcite.sql.SqlNode; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Test base for testing convert [CREATE OR] REPLACE TABLE AS statement to operation. */ +public class SqlRTASNodeToOperationConverterTest extends SqlNodeToOperationConversionTestBase { + + @Test + public void testReplaceTableAS() { + String tableName = "replace_table"; + String tableComment = "test table comment 表描述"; + String sql = + "REPLACE TABLE " + + tableName + + " COMMENT '" + + tableComment + + "' WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM t1"; + testCommonReplaceTableAs(sql, tableName, tableComment); + } + + @Test + public void testCreateOrReplaceTableAS() { + String tableName = "create_or_replace_table"; + String sql = + "CREATE OR REPLACE TABLE " + + tableName + + " WITH ('k1' = 'v1', 'k2' = 'v2') as SELECT * FROM t1"; + testCommonReplaceTableAs(sql, tableName, null); + } + + private void testCommonReplaceTableAs( + String sql, String tableName, @Nullable String tableComment) { + ObjectIdentifier expectedIdentifier = ObjectIdentifier.of("builtin", "default", tableName); + Operation operation = parseAndConvert(sql); + CatalogTable expectedCatalogTable = + CatalogTable.of( + getDefaultTableSchema(), + tableComment, + Collections.emptyList(), + getDefaultTableOptions()); + verifyReplaceTableAsOperation(operation, expectedIdentifier, expectedCatalogTable); + } + + private Operation parseAndConvert(String sql) { + final FlinkPlannerImpl planner = getPlannerBySqlDialect(SqlDialect.DEFAULT); + final CalciteParser parser = getParserBySqlDialect(SqlDialect.DEFAULT); + + SqlNode node = parser.parse(sql); + return SqlNodeToOperationConversion.convert(planner, catalogManager, node).get(); + } + + private void verifyReplaceTableAsOperation( + Operation operation, + ObjectIdentifier expectedTableIdentifier, + CatalogTable expectedCatalogTable) { + assertThat(operation).isInstanceOf(ReplaceTableAsOperation.class); + ReplaceTableAsOperation replaceTableAsOperation = (ReplaceTableAsOperation) operation; + CreateTableOperation createTableOperation = + replaceTableAsOperation.getCreateTableOperation(); + // verify the createTableOperation + assertThat(createTableOperation.isTemporary()).isFalse(); + assertThat(createTableOperation.isIgnoreIfExists()).isFalse(); + assertThat(createTableOperation.getTableIdentifier()).isEqualTo(expectedTableIdentifier); + // verify the catalog table to be created + verifyCatalogTable(expectedCatalogTable, createTableOperation.getCatalogTable()); + } + + private void verifyCatalogTable( + CatalogTable expectedCatalogTable, CatalogTable actualCatalogTable) { + assertThat(actualCatalogTable.getUnresolvedSchema()) + .isEqualTo(expectedCatalogTable.getUnresolvedSchema()); + assertThat(actualCatalogTable.getComment()).isEqualTo(expectedCatalogTable.getComment()); + assertThat(actualCatalogTable.getPartitionKeys()) + .isEqualTo(expectedCatalogTable.getPartitionKeys()); + assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedCatalogTable.getOptions()); + } + + private Map getDefaultTableOptions() { + Map expectedOptions = new HashMap<>(); + expectedOptions.put("k1", "v1"); + expectedOptions.put("k2", "v2"); + return expectedOptions; + } + + private Schema getDefaultTableSchema() { + return Schema.newBuilder() + .fromFields( + new String[] {"a", "b", "c", "d"}, + new AbstractDataType[] { + DataTypes.BIGINT(), + DataTypes.STRING(), + DataTypes.INT(), + DataTypes.STRING() + }) + .build(); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java new file mode 100644 index 0000000000000..3f35086e44394 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/RTASITCase.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.batch.sql; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.BatchTestBase; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.types.AbstractDataType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT Case for [CREATE OR] REPLACE TABLE AS SELECT statement. */ +class RTASITCase extends BatchTestBase { + + @BeforeEach + @Override + public void before() { + String dataId1 = TestValuesTableFactory.registerData(TestData.smallData3()); + tEnv().executeSql( + String.format( + "CREATE TABLE source(a int, b bigint, c string)" + + " WITH ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')", + dataId1)); + tEnv().executeSql( + "CREATE TABLE target(a int, b bigint, c string)" + + " WITH ('connector' = 'values')"); + } + + @Test + void testReplaceTableAS() throws Exception { + tEnv().executeSql( + "REPLACE TABLE target WITH ('connector' = 'values'," + + " 'bounded' = 'true')" + + " AS SELECT * FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResults("target").toString()) + .isEqualTo("[+I[1, 1, Hi], +I[2, 2, Hello], +I[3, 2, Hello world]]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"a", "b", "c"}, + new AbstractDataType[] { + DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING() + }); + verifyCatalogTable(expectCatalogTable, getCatalogTable("target")); + } + + @Test + void testReplaceTableASWithTableNotExist() { + assertThatThrownBy(() -> tEnv().executeSql("REPLACE TABLE t AS SELECT * FROM source")) + .isInstanceOf(TableException.class) + .hasMessage( + "The table `default_catalog`.`default_database`.`t` to be replaced doesn't exist." + + " You can try to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement."); + } + + @Test + void testCreateOrReplaceTableAS() throws Exception { + tEnv().executeSql( + "CREATE OR REPLACE TABLE target WITH ('connector' = 'values'," + + " 'bounded' = 'true')" + + " AS SELECT a, c FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResults("target").toString()) + .isEqualTo("[+I[1, Hi], +I[2, Hello], +I[3, Hello world]]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"a", "c"}, + new AbstractDataType[] {DataTypes.INT(), DataTypes.STRING()}); + verifyCatalogTable(expectCatalogTable, getCatalogTable("target")); + } + + @Test + void testCreateOrReplaceTableASWithTableNotExist() throws Exception { + tEnv().executeSql( + "CREATE OR REPLACE TABLE not_exist_target WITH ('connector' = 'values'," + + " 'bounded' = 'true')" + + " AS SELECT a, c FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResults("not_exist_target").toString()) + .isEqualTo("[+I[1, Hi], +I[2, Hello], +I[3, Hello world]]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"a", "c"}, + new AbstractDataType[] {DataTypes.INT(), DataTypes.STRING()}); + verifyCatalogTable(expectCatalogTable, getCatalogTable("not_exist_target")); + } + + private CatalogTable getExpectCatalogTable( + String[] cols, AbstractDataType[] fieldDataTypes) { + return CatalogTable.of( + Schema.newBuilder().fromFields(cols, fieldDataTypes).build(), + null, + Collections.emptyList(), + getDefaultTargetTableOptions()); + } + + private Map getDefaultTargetTableOptions() { + Map expectedOptions = new HashMap<>(); + expectedOptions.put("connector", "values"); + expectedOptions.put("bounded", "true"); + return expectedOptions; + } + + private CatalogTable getCatalogTable(String tableName) throws TableNotExistException { + return (CatalogTable) + tEnv().getCatalog("default_catalog") + .get() + .getTable(ObjectPath.fromString("default_database." + tableName)); + } + + private void verifyCatalogTable( + CatalogTable expectedCatalogTable, CatalogTable actualCatalogTable) { + assertThat(actualCatalogTable.getUnresolvedSchema()) + .isEqualTo(expectedCatalogTable.getUnresolvedSchema()); + assertThat(actualCatalogTable.getComment()).isEqualTo(expectedCatalogTable.getComment()); + assertThat(actualCatalogTable.getPartitionKeys()) + .isEqualTo(expectedCatalogTable.getPartitionKeys()); + assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedCatalogTable.getOptions()); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java new file mode 100644 index 0000000000000..ba1b2b6513523 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/RTASITCase.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.runtime.stream.sql; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.catalog.CatalogTable; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.TableNotExistException; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; +import org.apache.flink.table.planner.runtime.utils.StreamingTestBase; +import org.apache.flink.table.planner.runtime.utils.TestData; +import org.apache.flink.table.types.AbstractDataType; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** IT Case for [CREATE OR] REPLACE TABLE AS SELECT statement. */ +public class RTASITCase extends StreamingTestBase { + + @BeforeEach + @Override + public void before() throws Exception { + super.before(); + String dataId1 = TestValuesTableFactory.registerData(TestData.smallData3()); + tEnv().executeSql( + String.format( + "CREATE TABLE source(a int, b bigint, c string)" + + " WITH ('connector' = 'values', 'bounded' = 'true', 'data-id' = '%s')", + dataId1)); + tEnv().executeSql( + "CREATE TABLE target(a int, b bigint, c string)" + + " WITH ('connector' = 'values')"); + } + + @Test + void testReplaceTableAS() throws Exception { + tEnv().executeSql( + "REPLACE TABLE target WITH ('connector' = 'values'," + + " 'bounded' = 'true')" + + " AS SELECT * FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResults("target").toString()) + .isEqualTo("[+I[1, 1, Hi], +I[2, 2, Hello], +I[3, 2, Hello world]]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"a", "b", "c"}, + new AbstractDataType[] { + DataTypes.INT(), DataTypes.BIGINT(), DataTypes.STRING() + }); + verifyCatalogTable(expectCatalogTable, getCatalogTable("target")); + } + + @Test + void testReplaceTableASWithTableNotExist() { + assertThatThrownBy(() -> tEnv().executeSql("REPLACE TABLE t AS SELECT * FROM source")) + .isInstanceOf(TableException.class) + .hasMessage( + "The table `default_catalog`.`default_database`.`t` to be replaced doesn't exist." + + " You can try to use CREATE TABLE AS statement or CREATE OR REPLACE TABLE AS statement."); + } + + @Test + void testCreateOrReplaceTableAS() throws Exception { + tEnv().executeSql( + "CREATE OR REPLACE TABLE target WITH ('connector' = 'values'," + + " 'bounded' = 'true')" + + " AS SELECT a, c FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResults("target").toString()) + .isEqualTo("[+I[1, Hi], +I[2, Hello], +I[3, Hello world]]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"a", "c"}, + new AbstractDataType[] {DataTypes.INT(), DataTypes.STRING()}); + verifyCatalogTable(expectCatalogTable, getCatalogTable("target")); + } + + @Test + void testCreateOrReplaceTableASWithTableNotExist() throws Exception { + tEnv().executeSql( + "CREATE OR REPLACE TABLE not_exist_target WITH ('connector' = 'values'," + + " 'bounded' = 'true')" + + " AS SELECT a, c FROM source") + .await(); + + // verify written rows + assertThat(TestValuesTableFactory.getResults("not_exist_target").toString()) + .isEqualTo("[+I[1, Hi], +I[2, Hello], +I[3, Hello world]]"); + + // verify the table after replacing + CatalogTable expectCatalogTable = + getExpectCatalogTable( + new String[] {"a", "c"}, + new AbstractDataType[] {DataTypes.INT(), DataTypes.STRING()}); + verifyCatalogTable(expectCatalogTable, getCatalogTable("not_exist_target")); + } + + private CatalogTable getExpectCatalogTable( + String[] cols, AbstractDataType[] fieldDataTypes) { + return CatalogTable.of( + Schema.newBuilder().fromFields(cols, fieldDataTypes).build(), + null, + Collections.emptyList(), + getDefaultTargetTableOptions()); + } + + private Map getDefaultTargetTableOptions() { + Map expectedOptions = new HashMap<>(); + expectedOptions.put("connector", "values"); + expectedOptions.put("bounded", "true"); + return expectedOptions; + } + + private CatalogTable getCatalogTable(String tableName) throws TableNotExistException { + return (CatalogTable) + tEnv().getCatalog("default_catalog") + .get() + .getTable(ObjectPath.fromString("default_database." + tableName)); + } + + private void verifyCatalogTable( + CatalogTable expectedCatalogTable, CatalogTable actualCatalogTable) { + assertThat(actualCatalogTable.getUnresolvedSchema()) + .isEqualTo(expectedCatalogTable.getUnresolvedSchema()); + assertThat(actualCatalogTable.getComment()).isEqualTo(expectedCatalogTable.getComment()); + assertThat(actualCatalogTable.getPartitionKeys()) + .isEqualTo(expectedCatalogTable.getPartitionKeys()); + assertThat(actualCatalogTable.getOptions()).isEqualTo(expectedCatalogTable.getOptions()); + } +}